Source code for pycycling.rear_view_radar

""" A module for interacting with Bluetooth LE devices which support the Radar (RDR) service

Jason Sohn 2022

This service is tested on Garmin Varia RVR315.
Other models which are expected to support RDR service are:

* Garmin RTR515, RTR516 (German market version), and RCT715
* Bryton Gardia R300
* Magene L508

Example
=======
This example prints radar information broadcast from the Bluetooth device to the console. Please see also
information on :ref:`obtaining the Bluetooth address of your device <obtaining_device_address>`.

.. literalinclude:: ../examples/rear_view_radar_example.py
"""

from collections import namedtuple

radar_characteristic_id = '6a4e3203-667b-11e3-949a-0800200c9a66'

RadarMeasurement = namedtuple('RadarMeasurement', [
    'threat_id',
    'speed',
    'distance',
])

def _parse_radar_measurement(data: bytearray) -> RadarMeasurement:
    """
    Characteristic payload in bytes: 1+3i where i is number of threats (cars)

    byte 0: probably some kind of packet identifier (can be used in case of multiple packets)
    byte 1: threat identifier
    byte 2 (5, 8, ...): distance to threat in meters
    byte 3 (6, 9, ...): speed of threat in km/h

    See source for this reverse-engineering in repo README
    """
    radar_measurements = []
    try:
        for i in range(1, len(data), 3):
            threat_id = int(data[i])
            distance = int(data[i+1])
            speed = int(data[i+2])
            radar_measurements.append(RadarMeasurement(threat_id, speed, distance))
    except IndexError:
        print('pycycling:rear_view_radar.py IndexError: probably starting up and not all data is available yet')
        return None
    return radar_measurements

[docs] class RearViewRadarService: def __init__(self, client): self._client = client self._radar_measurement_callback = None
[docs] async def enable_radar_measurement_notifications(self): await self._client.start_notify(radar_characteristic_id, self._radar_measurement_notification_handler)
[docs] async def disable_radar_measurement_notifications(self): await self._client.stop_notify(radar_characteristic_id)
[docs] def set_radar_measurement_handler(self, callback): self._radar_measurement_callback = callback
def _radar_measurement_notification_handler(self, sender, data): # pylint: disable=unused-argument if self._radar_measurement_callback is not None: self._radar_measurement_callback(_parse_radar_measurement(data))