Source code for pycycling.sterzo

import sys
import asyncio
import struct
import importlib.resources
import pycycling.data

sterzo_measurement_id = '347b0030-7635-408b-8918-8ff3949ce592'
sterzo_control_point_id = '347b0031-7635-408b-8918-8ff3949ce592'
sterzo_challenge_code_id = '347b0032-7635-408b-8918-8ff3949ce592'


[docs] class Sterzo: def __init__(self, client): self._client = client self._steering_measurement_callback = None self._latest_challenge = None
[docs] async def enable_steering_measurement_notifications(self): await self._client.start_notify(sterzo_challenge_code_id, self._challenge_code_indication_handler) await self._client.start_notify(sterzo_measurement_id, self._steering_measurement_notification_handler) await self._client.write_gatt_char(sterzo_control_point_id, bytearray([0x03, 0x10])) while self._latest_challenge is None: await asyncio.sleep(2) await self._activate_steering_measurements()
async def _activate_steering_measurements(self): # importlib.resources.path is deprecated since 3.11 if sys.version_info >= (3, 11): challenge_file = importlib.resources.files(pycycling.data).joinpath('sterzo-challenge-codes.dat').open('rb') else: # legacy support < 3.9 challenge_file = importlib.resources.open_binary(pycycling.data, 'sterzo-challenge-codes.dat') # pylint: disable=deprecated-method with challenge_file: challenge_file.seek(self._latest_challenge * 2, 1) code_1 = int.from_bytes(challenge_file.read(1), 'little') code_2 = int.from_bytes(challenge_file.read(1), 'little') byte_array = bytearray([0x03, 0x11, code_1, code_2]) await self._client.write_gatt_char(sterzo_control_point_id, byte_array) await asyncio.sleep(1) await self._client.write_gatt_char(sterzo_control_point_id, bytearray([0x02, 0x02]))
[docs] async def disable_steering_measurement_notifications(self): await self._client.stop_notify(sterzo_measurement_id)
[docs] def set_steering_measurement_callback(self, callback): self._steering_measurement_callback = callback
def _challenge_code_indication_handler(self, sender, data): # pylint: disable=unused-argument self._latest_challenge = int.from_bytes(data[2:4], 'big') def _steering_measurement_notification_handler(self, sender, data): # pylint: disable=unused-argument [steering_angle] = struct.unpack('<f', data) self._steering_measurement_callback(steering_angle)