Source code for pycycling.heart_rate_service
from collections import namedtuple
heart_rate_measurement_characteristic_id = '00002a37-0000-1000-8000-00805f9b34fb'
HeartRateMeasurement = namedtuple('HeartRateMeasurement', ['sensor_contact', 'bpm', 'rr_interval', 'energy_expended'])
def _parse_hr_measurement(data):
flags = data[0]
is_uint16_measurement_mask = 0x01
is_contact_detected_mask = 0x06
is_energy_expended_present_mask = 0x08
is_rr_interval_present_mask = 0x10
sensor_contact = None
bpm = None
rr_interval = []
energy_expended = None
measurement_byte_offset = 1
sensor_contact = bool(flags & is_contact_detected_mask)
if flags & is_uint16_measurement_mask:
bpm = int.from_bytes(data[measurement_byte_offset:measurement_byte_offset + 2], 'little')
measurement_byte_offset += 2
else:
bpm = data[measurement_byte_offset]
measurement_byte_offset += 1
if flags & is_energy_expended_present_mask:
energy_expended = int.from_bytes(data[measurement_byte_offset:measurement_byte_offset + 2], 'little')
measurement_byte_offset += 2
if flags & is_rr_interval_present_mask:
while len(data[measurement_byte_offset:]) >= 2:
rr_interval.append(int.from_bytes(data[measurement_byte_offset:measurement_byte_offset + 2], 'little'))
measurement_byte_offset += 2
return HeartRateMeasurement(sensor_contact=sensor_contact,
bpm=bpm,
rr_interval=rr_interval,
energy_expended=energy_expended)
[docs]
class HeartRateService:
def __init__(self, client):
self._client = client
self._hr_measurement_callback = None
[docs]
async def enable_hr_measurement_notifications(self):
await self._client.start_notify(heart_rate_measurement_characteristic_id,
self._hr_measurement_notification_handler)
[docs]
async def disable_hr_measurement_notifications(self):
await self._client.stop_notify(heart_rate_measurement_characteristic_id)
[docs]
def set_hr_measurement_handler(self, callback):
self._hr_measurement_callback = callback
def _hr_measurement_notification_handler(self, sender, data): # pylint: disable=unused-argument
if self._hr_measurement_callback is not None:
self._hr_measurement_callback(_parse_hr_measurement(data))