"""
A module for interacting with Bluetooth devices which support the Cycling Power Service.
This service is supported by most power meters and some turbo trainers.
Example
=======
This example prints cycling power measurements broadcast from the Bluetooth device to the console. Please see also
information on :ref:`obtaining the Bluetooth address of your device <obtaining_device_address>`.
.. literalinclude:: ../examples/cycling_power_service_example.py
"""
from collections import namedtuple
from enum import Enum
cycling_power_measurement_tx_id = '00002a63-0000-1000-8000-00805f9b34fb'
cycling_power_vector_tx_id = '00002a64-0000-1000-8000-00805f9b34fb'
cycling_power_feature_tx_id = '00002a65-0000-1000-8000-00805f9b34fb'
sensor_location_tx_id = '00002a5d-0000-1000-8000-00805f9b34fb'
SensorMeasurementContext = Enum('SensorMeasurementContext', 'force_based torque_based')
DistributeSystemSupport = Enum('DistributeSystemSupport',
'unspecified no_distributed_system_support distributed_system_support rfu')
SensorLocation = Enum('SensorLocation',
'other top_of_shoe in_shoe hip front_wheel left_crank right_crank left_pedal right_pedal '
'front_hub rear_dropout chainstay rear_wheel rear_hub chest spider chain_ring')
InstantaneousMeasurementDirection = Enum('InstantaneousMeasurementDirection',
'unknown tangential_component radial_component lateral_component')
CyclingPowerMeasurement = namedtuple('CyclingPowerMeasurement',
['instantaneous_power', 'accumulated_energy', 'pedal_power_balance',
'accumulated_torque', 'cumulative_wheel_revs', 'last_wheel_event_time',
'cumulative_crank_revs', 'last_crank_event_time', 'maximum_force_magnitude',
'minimum_force_magnitude', 'maximum_torque_magnitude', 'minimum_torque_magnitude',
'top_dead_spot_angle', 'bottom_dead_spot_angle'])
CyclingPowerFeature = namedtuple('CyclingPowerFeature',
['pedal_power_balance_supported', 'accumulated_torque_supported',
'wheel_rev_supported', 'crank_rev_supported', 'extreme_magnitudes_supported',
'dead_spot_angles_supported', 'accumulated_energy_supported',
'offset_compensation_supported',
'cycling_power_measurement_content_masking_supported', 'multiple_locations_supported',
'crank_length_adjustment_supported', 'chain_length_adjustment_supported',
'chain_weight_adjustment_supported', 'span_length_adjustment_supported',
'sensor_measurement_context', 'instantaneous_measurement_direction_supported',
'factory_calibration_date_supported', 'enhanced_offset_compensation_supported',
'distribute_system_support'])
CyclingPowerVector = namedtuple('CyclingPowerVector',
['instantaneous_measurement_direction', 'cumulative_crank_revs',
'last_crank_event_time', 'first_crank_measurement_angle',
'instantaneous_force_magnitudes', 'instantaneous_torque_magnitudes'])
def _parse_sensor_location(measurement):
value = int.from_bytes(measurement, 'little')
if value >= len(SensorLocation):
return None
else:
return SensorLocation(value + 1)
def _parse_cycling_power_feature(measurement):
value = int.from_bytes(measurement, byteorder='little')
pedal_power_balance_supported = bool(value & (1))
accumulated_torque_supported = bool(value & (1 << 1))
wheel_rev_supported = bool(value & (1 << 2))
crank_rev_supported = bool(value & (1 << 3))
extreme_magnitudes_supported = bool(value & (1 << 4))
dead_spot_angles_supported = bool(value & (1 << 5))
accumulated_energy_supported = bool(value & (1 << 6))
offset_compensation_supported = bool(value & (1 << 7))
cycling_power_measurement_content_masking_supported = bool(value & (1 << 8))
multiple_locations_supported = bool(value & (1 << 9))
crank_length_adjustment_supported = bool(value & (1 << 10))
chain_length_adjustment_supported = bool(value & (1 << 11))
chain_weight_adjustment_supported = bool(value & (1 << 12))
span_length_adjustment_supported = bool(value & (1 << 13))
sensor_measurement_context_value = bool(value & (1 << 14))
sensor_measurement_context = SensorMeasurementContext.force_based
if sensor_measurement_context_value:
sensor_measurement_context = SensorMeasurementContext.torque_based
instantaneous_measurement_direction_supported = bool(value & (1 << 15))
factory_calibration_date_supported = bool(value & (1 << 16))
enhanced_offset_compensation_supported = bool(value & (1 << 17))
distribute_system_support_value = (value & 0b11000000000000000000) >> 20
distribute_system_support = DistributeSystemSupport.unspecified
if distribute_system_support_value == 1:
distribute_system_support = DistributeSystemSupport.no_distributed_system_support
elif distribute_system_support_value == 2:
distribute_system_support = DistributeSystemSupport.distributed_system_support
elif distribute_system_support_value == 3:
distribute_system_support = DistributeSystemSupport.rfu
return CyclingPowerFeature(
pedal_power_balance_supported=pedal_power_balance_supported,
accumulated_torque_supported=accumulated_torque_supported,
wheel_rev_supported=wheel_rev_supported,
crank_rev_supported=crank_rev_supported,
extreme_magnitudes_supported=extreme_magnitudes_supported,
dead_spot_angles_supported=dead_spot_angles_supported,
accumulated_energy_supported=accumulated_energy_supported,
offset_compensation_supported=offset_compensation_supported,
cycling_power_measurement_content_masking_supported=cycling_power_measurement_content_masking_supported,
multiple_locations_supported=multiple_locations_supported,
crank_length_adjustment_supported=crank_length_adjustment_supported,
chain_length_adjustment_supported=chain_length_adjustment_supported,
chain_weight_adjustment_supported=chain_weight_adjustment_supported,
span_length_adjustment_supported=span_length_adjustment_supported,
sensor_measurement_context=sensor_measurement_context,
instantaneous_measurement_direction_supported=instantaneous_measurement_direction_supported,
factory_calibration_date_supported=factory_calibration_date_supported,
enhanced_offset_compensation_supported=enhanced_offset_compensation_supported,
distribute_system_support=distribute_system_support
)
def _parse_cycling_power_measurement(data):
flags = int.from_bytes(data[0:2], 'little')
pedal_power_balance_included_flag = 1
pedal_power_balance_reference_flag = 1 << 1 # pylint: disable=unused-variable
accumulated_torque_present = 1 << 2
accumulated_torque_source = 1 << 3 # pylint: disable=unused-variable
wheel_rev_included_flag = 1 << 4
crank_rev_included_flag = 1 << 5
extreme_force_included_flag = 1 << 6
extreme_torque_included_flag = 1 << 7
extreme_angles_included_flag = 1 << 8
top_dead_spot_included_flag = 1 << 9
bottom_dead_spot_included_flag = 1 << 10
accumulated_energy_included_flag = 1 << 11
offset_compensation_indicator_flag = 1 << 12 # pylint: disable=unused-variable
byte_offset = 2
instantaneous_power = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
pedal_power_balance = None
accumulated_torque = None
cumulative_wheel_revs = None
last_wheel_event_time = None
cumulative_crank_revs = None
last_crank_event_time = None
maximum_force_magnitude = None
minimum_force_magnitude = None
maximum_torque_magnitude = None
minimum_torque_magnitude = None
top_dead_spot_angle = None
bottom_dead_spot_angle = None
accumulated_energy = None
byte_offset += 2
if flags & pedal_power_balance_included_flag:
pedal_power_balance = data[byte_offset]
byte_offset += 1
if flags & accumulated_torque_present:
accumulated_torque = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
byte_offset += 2
if flags & wheel_rev_included_flag:
cumulative_wheel_revs = int.from_bytes(data[0 + byte_offset:4 + byte_offset], 'little')
byte_offset += 4
last_wheel_event_time = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
byte_offset += 2
if flags & crank_rev_included_flag:
cumulative_crank_revs = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
byte_offset += 2
last_crank_event_time = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
byte_offset += 2
if flags & extreme_force_included_flag:
maximum_force_magnitude = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
byte_offset += 2
minimum_force_magnitude = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
byte_offset += 2
if flags & extreme_torque_included_flag:
maximum_torque_magnitude = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
byte_offset += 2
minimum_torque_magnitude = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
byte_offset += 2
if flags & extreme_angles_included_flag:
# TODO: Implement extreme angles
byte_offset += 3
if flags & top_dead_spot_included_flag:
top_dead_spot_angle = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
byte_offset += 2
if flags & bottom_dead_spot_included_flag:
bottom_dead_spot_angle = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
byte_offset += 2
if flags & accumulated_energy_included_flag:
accumulated_energy = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
return CyclingPowerMeasurement(instantaneous_power=instantaneous_power,
accumulated_energy=accumulated_energy,
pedal_power_balance=pedal_power_balance,
accumulated_torque=accumulated_torque,
cumulative_wheel_revs=cumulative_wheel_revs,
last_wheel_event_time=last_wheel_event_time,
cumulative_crank_revs=cumulative_crank_revs,
last_crank_event_time=last_crank_event_time,
maximum_force_magnitude=maximum_force_magnitude,
minimum_force_magnitude=minimum_force_magnitude,
maximum_torque_magnitude=maximum_torque_magnitude,
minimum_torque_magnitude=minimum_torque_magnitude,
top_dead_spot_angle=top_dead_spot_angle,
bottom_dead_spot_angle=bottom_dead_spot_angle)
def _parse_cycling_power_vector(data):
flags = data[0]
crank_revolutions_present = bool(flags & 0b1)
first_crank_measurement_angle_present = bool(flags & 0b10)
instantaneous_force_array_present = bool(flags & 0b100)
instantaneous_torque_array_present = bool(flags & 0b1000)
instantaneous_measurement_direction_value = (flags & 0b110000) >> 4
instantaneous_measurement_direction = InstantaneousMeasurementDirection.unknown
if instantaneous_measurement_direction_value == 1:
instantaneous_measurement_direction = InstantaneousMeasurementDirection.tangential_component
elif instantaneous_measurement_direction_value == 2:
instantaneous_measurement_direction = InstantaneousMeasurementDirection.radial_component
elif instantaneous_measurement_direction_value == 3:
instantaneous_measurement_direction = InstantaneousMeasurementDirection.lateral_component
byte_offset = 1
cumulative_crank_revs = None
last_crank_event_time = None
first_crank_measurement_angle = None
instantaneous_force_magnitudes = []
instantaneous_torque_magnitudes = []
if crank_revolutions_present:
cumulative_crank_revs = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
byte_offset += 2
last_crank_event_time = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
byte_offset += 2
if first_crank_measurement_angle_present:
first_crank_measurement_angle = int.from_bytes(data[0 + byte_offset:2 + byte_offset], 'little')
byte_offset += 2
for i in range(byte_offset, len(data), 2):
element = int.from_bytes(data[i:i + 2], 'little', signed=True)
if instantaneous_force_array_present:
instantaneous_force_magnitudes.append(element)
elif instantaneous_torque_array_present:
instantaneous_torque_magnitudes.append(element)
return CyclingPowerVector(instantaneous_measurement_direction=instantaneous_measurement_direction,
cumulative_crank_revs=cumulative_crank_revs,
last_crank_event_time=last_crank_event_time,
first_crank_measurement_angle=first_crank_measurement_angle,
instantaneous_force_magnitudes=instantaneous_force_magnitudes,
instantaneous_torque_magnitudes=instantaneous_torque_magnitudes)
[docs]
class CyclingPowerService:
def __init__(self, client):
self._client = client
self._cycling_power_measurement_callback = None
self._cycling_power_vector_callback = None
[docs]
async def enable_cycling_power_measurement_notifications(self):
await self._client.start_notify(cycling_power_measurement_tx_id,
self._cycling_power_measurement_notification_handler)
[docs]
async def disable_cycling_power_measurement_notifications(self):
await self._client.stop_notify(cycling_power_measurement_tx_id)
[docs]
def set_cycling_power_measurement_handler(self, callback):
self._cycling_power_measurement_callback = callback
[docs]
async def enable_cycling_power_vector_notifications(self):
await self._client.start_notify(cycling_power_vector_tx_id,
self._cycling_power_vector_notification_handler)
[docs]
async def disable_cycling_power_vector_notifications(self):
await self._client.stop_notify(cycling_power_vector_tx_id)
[docs]
def set_cycling_power_vector_handler(self, callback):
self._cycling_power_vector_callback = callback
[docs]
async def get_sensor_location(self):
measurement = await self._client.read_gatt_char(sensor_location_tx_id)
return _parse_sensor_location(measurement)
[docs]
async def get_cycling_power_feature(self):
measurement = await self._client.read_gatt_char(cycling_power_feature_tx_id)
return _parse_cycling_power_feature(measurement)
def _cycling_power_measurement_notification_handler(self, sender, data): # pylint: disable=unused-argument
if self._cycling_power_measurement_callback is not None:
self._cycling_power_measurement_callback(_parse_cycling_power_measurement(data))
def _cycling_power_vector_notification_handler(self, sender, data): # pylint: disable=unused-argument
if self._cycling_power_vector_callback is not None:
self._cycling_power_vector_callback(_parse_cycling_power_vector(data))