"""
Interact with a Fitness Machine Service (FTMS) Bluetooth LE device.
Example
=======
This example demonstrates all cycling-related functionalities of a FTMS indoor cycling device.
Please see also information on :ref:`obtaining the Bluetooth address of your device <obtaining_device_address>`.
First, it prints all 'read' characteristics:
+ Supported resistance level range
+ Supported power range
+ Fitness machine features
Then, it starts 'notify' characteristics, which stream data from the device:
+ Indoor bike data (speed, cadence, distance, resistance level, power, time)
Finally, it modifies 'write' characteristics with some time in between:
+ Resistance level
+ Target power (automatically adjusts resistance level based on cadence to maintain same power)
.. literalinclude:: ../examples/fitness_machine_service_example.py
"""
from collections import namedtuple
from pycycling.ftms_parsers import (
parse_fitness_machine_status,
parse_indoor_bike_data,
parse_all_features,
parse_training_status,
parse_control_point_response,
form_ftms_control_command,
FTMSControlPointOpCode,
FitnessMachineFeature,
)
# read: Supported Resistance Level Range
ftms_supported_resistance_level_range_characteristic_id = (
"00002ad6-0000-1000-8000-00805f9b34fb"
)
# read: Supported Power Range
ftms_supported_power_range_characteristic_id = "00002ad8-0000-1000-8000-00805f9b34fb"
# (read): Fitness Machine Feature
ftms_fitness_machine_feature_characteristic_id = "00002acc-0000-1000-8000-00805f9b34fb"
# notify: Indoor Bike Data
ftms_indoor_bike_data_characteristic_id = "00002ad2-0000-1000-8000-00805f9b34fb"
# notify: Fitness Machine Status
ftms_fitness_machine_status_characteristic_id = "00002ada-0000-1000-8000-00805f9b34fb"
# notify: Training Status
ftms_training_status_characteristic_id = "00002ad3-0000-1000-8000-00805f9b34fb"
# (write, indicate): Fitness Machine Control Point
ftms_fitness_machine_control_point_characteristic_id = (
"00002ad9-0000-1000-8000-00805f9b34fb"
)
SupportedResistanceLevelRange = namedtuple(
"SupportedResistanceLevelRange",
["minimum_resistance", "maximum_resistance", "minimum_increment"],
)
def _parse_supported_resistance_level_range(
message: bytearray,
) -> SupportedResistanceLevelRange:
minimum_resistance = int.from_bytes(message[0:2], "little")
maximum_resistance = int.from_bytes(message[2:4], "little")
minimum_increment = int.from_bytes(message[4:6], "little")
return SupportedResistanceLevelRange(
minimum_resistance, maximum_resistance, minimum_increment
)
SupportedPowerRange = namedtuple(
"SupportedPowerRange",
["minimum_power", "maximum_power", "minimum_increment"],
)
def _parse_supported_power_range(message: bytearray) -> SupportedPowerRange:
minimum_power = int.from_bytes(message[0:2], "little")
maximum_power = int.from_bytes(message[2:4], "little")
minimum_increment = int.from_bytes(message[4:6], "little")
return SupportedPowerRange(minimum_power, maximum_power, minimum_increment)
[docs]
class FitnessMachineService:
def __init__(self, client):
self._client = client
self._control_point_response_callback = None
self._indoor_bike_data_callback = None
self._fitness_machine_status_callback = None
self._training_status_callback = None
# === READ Characteristics ===
[docs]
async def get_supported_resistance_level_range(
self,
) -> SupportedResistanceLevelRange:
message = await self._client.read_gatt_char(
ftms_supported_resistance_level_range_characteristic_id
)
return _parse_supported_resistance_level_range(message)
[docs]
async def get_supported_power_range(self) -> SupportedPowerRange:
message = await self._client.read_gatt_char(
ftms_supported_power_range_characteristic_id
)
return _parse_supported_power_range(message)
[docs]
async def get_fitness_machine_feature(self) -> FitnessMachineFeature:
message = await self._client.read_gatt_char(
ftms_fitness_machine_feature_characteristic_id
)
return parse_all_features(message)
# === NOTIFY Characteristics ===
# ====== Indoor Bike Data ======
[docs]
async def enable_indoor_bike_data_notify(self) -> None:
await self._client.start_notify(
ftms_indoor_bike_data_characteristic_id,
self._indoor_bike_data_notification_handler,
)
[docs]
async def disable_indoor_bike_data_notify(self):
await self._client.stop_notify(ftms_indoor_bike_data_characteristic_id)
[docs]
def set_indoor_bike_data_handler(self, callback):
self._indoor_bike_data_callback = callback
def _indoor_bike_data_notification_handler(
self, sender, data
): # pylint: disable=unused-argument
if self._indoor_bike_data_callback is not None:
self._indoor_bike_data_callback(parse_indoor_bike_data(data))
# ====== Fitness Machine Status ======
[docs]
async def enable_fitness_machine_status_notify(self) -> None:
await self._client.start_notify(
ftms_fitness_machine_status_characteristic_id,
self._fitness_machine_status_notification_handler,
)
[docs]
async def disable_fitness_machine_status_notify(self):
await self._client.stop_notify(ftms_fitness_machine_status_characteristic_id)
[docs]
def set_fitness_machine_status_handler(self, callback):
self._fitness_machine_status_callback = callback
def _fitness_machine_status_notification_handler(
self, sender, data
): # pylint: disable=unused-argument
if self._fitness_machine_status_callback is not None:
self._fitness_machine_status_callback(parse_fitness_machine_status(data))
# ====== Training Status ======
[docs]
async def enable_training_status_notify(self) -> None:
await self._client.start_notify(
ftms_training_status_characteristic_id,
self._training_status_notification_handler,
)
[docs]
async def disable_training_status_notify(self):
await self._client.stop_notify(ftms_training_status_characteristic_id)
[docs]
def set_training_status_handler(self, callback):
self._training_status_callback = callback
def _training_status_notification_handler(
self, sender, data
): # pylint: disable=unused-argument
if self._training_status_callback is not None:
self._training_status_callback(parse_training_status(data))
# === WRITE/INDICATE Characteristics ===
# ====== Fitness Machine Control Point ======
[docs]
async def enable_control_point_indicate(self) -> None:
await self._client.start_notify(
ftms_fitness_machine_control_point_characteristic_id,
self._control_point_response_handler,
)
[docs]
async def disable_control_point_indicate(self):
await self._client.stop_notify(
ftms_fitness_machine_control_point_characteristic_id
)
[docs]
def set_control_point_response_handler(self, callback):
self._control_point_response_callback = callback
def _control_point_response_handler(
self, sender, data
): # pylint: disable=unused-argument
if self._control_point_response_callback is not None:
self._control_point_response_callback(parse_control_point_response(data))
# ====== Control Point Commands ======
[docs]
async def request_control(self) -> None:
message = form_ftms_control_command(FTMSControlPointOpCode.REQUEST_CONTROL)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def reset(self) -> None:
message = form_ftms_control_command(FTMSControlPointOpCode.RESET)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_target_speed(self, speed: int) -> None:
if speed < 0:
raise ValueError("Speed must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGET_SPEED, speed
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_target_incline(self, inclination: int) -> None:
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGET_INCLINE, inclination
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_target_resistance_level(self, level: int) -> None:
if level < 0:
raise ValueError("Resistance level must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGET_RESISTANCE_LEVEL, level
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_target_power(self, power: int) -> None:
if power < 0:
raise ValueError("Power must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGET_POWER, power
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_target_heart_rate(self, heart_rate: int) -> None:
if heart_rate < 0:
raise ValueError("Heart rate must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGET_HEART_RATE, heart_rate
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def start_or_resume(self) -> None:
message = form_ftms_control_command(FTMSControlPointOpCode.START_OR_RESUME)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def stop_or_pause(self, pause: bool) -> None:
message = form_ftms_control_command(
FTMSControlPointOpCode.STOP_OR_PAUSE, 0x02 if pause else 0x01
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_targeted_expended_energy(self, energy: int) -> None:
if energy < 0:
raise ValueError("Energy must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_EXPENDED_ENERGY, energy
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_targeted_number_of_steps(self, steps: int) -> None:
if steps < 0:
raise ValueError("Steps must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_NUMBER_OF_STEPS, steps
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_targeted_number_of_strides(self, strides: int) -> None:
if strides < 0:
raise ValueError("Strides must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_NUMBER_OF_STRIDES, strides
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_targeted_distance(self, distance: int) -> None:
if distance < 0:
raise ValueError("Distance must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_DISTANCE, distance
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_targeted_training_time(self, time: int) -> None:
if time < 0:
raise ValueError("Time must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_TRAINING_TIME, time
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_targeted_time_in_two_heart_rate_zones(self, times: list) -> None:
if len(times) != 2:
raise ValueError("Times must be a list of 2 elements")
if times[0] < 0 or times[1] < 0:
raise ValueError("Times must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_TIME_IN_TWO_HEART_RATE_ZONES, times
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_targeted_time_in_three_heart_rate_zones(self, times: list) -> None:
if len(times) != 3:
raise ValueError("Times must be a list of 3 elements")
if times[0] < 0 or times[1] < 0 or times[2] < 0:
raise ValueError("Times must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_TIME_IN_THREE_HEART_RATE_ZONES, times
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_targeted_time_in_five_heart_rate_zones(self, times: list) -> None:
if len(times) != 5:
raise ValueError("Times must be a list of 5 elements")
if times[0] < 0 or times[1] < 0 or times[2] < 0 or times[3] < 0 or times[4] < 0:
raise ValueError("Times must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_TIME_IN_FIVE_HEART_RATE_ZONES, times
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_simulation_parameters(
self, wind_speed: int, grade: int, crr: int, cw: int
) -> None:
if crr < 0:
raise ValueError("Crr must be non-negative")
if cw < 0:
raise ValueError("Cw must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_INDOOR_BIKE_SIMULATION_PARAMETERS,
[wind_speed, grade, crr, cw],
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_wheel_circumference(self, circumference: int) -> None:
if circumference < 0:
raise ValueError("Circumference must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_WHEEL_CIRCUMFERENCE, circumference
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_spin_down_control(self, control: int) -> None:
if control < 0:
raise ValueError("Control must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_SPIN_DOWN_CONTROL, control
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)
[docs]
async def set_targeted_cadence(self, cadence: int) -> None:
if cadence < 0:
raise ValueError("Cadence must be non-negative")
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGETED_CADENCE, cadence
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, True
)