Source code for pycycling.cycling_speed_cadence_service

from collections import namedtuple

csc_measurement_tx_id = '00002a5b-0000-1000-8000-00805f9b34fb'
csc_feature_tx_id = '00002a5c-0000-1000-8000-00805f9b34fb'

CSCMeasurement = namedtuple('CSCMeasurement',
                            ['cumulative_wheel_revs', 'last_wheel_event_time', 'cumulative_crank_revs',
                             'last_crank_event_time'])

CSCFeature = namedtuple('CSCFeature', ['wheel_rev_supported', 'crank_rev_supported', 'multiple_locations_supported'])


def _parse_csc_feature(measurement):
    value = int.from_bytes(measurement, byteorder='little')
    wheel_rev_supported = bool(value & 0b1)
    crank_rev_supported = bool(value & 0b10)
    multiple_locations_supported = bool(value & 0b100)
    return CSCFeature(wheel_rev_supported=wheel_rev_supported,
                      crank_rev_supported=crank_rev_supported,
                      multiple_locations_supported=multiple_locations_supported)


def _parse_csc_measurement(data):
    flags = data[0]

    wheel_rev_included_flag = 1
    crank_rev_included_flag = 2

    cumulative_wheel_revs = None
    last_wheel_event_time = None
    cumulative_crank_revs = None
    last_crank_event_time = None

    byte_offset = 1
    if flags & wheel_rev_included_flag:
        cumulative_wheel_revs = int.from_bytes(data[0 + byte_offset:4 + byte_offset], 'little')
        last_wheel_event_time = int.from_bytes(data[4 + byte_offset:6 + byte_offset], 'little')
        byte_offset += 6

    if flags & crank_rev_included_flag:
        cumulative_crank_revs = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
        last_crank_event_time = int.from_bytes(data[2 + byte_offset:4 + byte_offset], 'little')

    return CSCMeasurement(cumulative_wheel_revs=cumulative_wheel_revs,
                          last_wheel_event_time=last_wheel_event_time,
                          cumulative_crank_revs=cumulative_crank_revs,
                          last_crank_event_time=last_crank_event_time)


[docs] class CyclingSpeedCadenceService: def __init__(self, client): self._client = client self._csc_measurement_callback = None
[docs] async def enable_csc_measurement_notifications(self): await self._client.start_notify(csc_measurement_tx_id, self._csc_measurement_notification_handler)
[docs] async def disable_csc_measurement_notifications(self): await self._client.stop_notify(csc_measurement_tx_id)
[docs] def set_csc_measurement_handler(self, callback): self._csc_measurement_callback = callback
[docs] async def get_csc_feature(self): measurement = await self._client.read_gatt_char(csc_feature_tx_id) return _parse_csc_feature(measurement)
def _csc_measurement_notification_handler(self, sender, data): # pylint: disable=unused-argument if self._csc_measurement_callback is not None: self._csc_measurement_callback(_parse_csc_measurement(data))