pycycling.fitness_machine_service module

Interact with a Fitness Machine Service (FTMS) Bluetooth LE device.

Example

This example demonstrates all cycling-related functionalities of a FTMS indoor cycling device.

Please see also information on obtaining the Bluetooth address of your device.

First, it prints all ‘read’ characteristics:
  • Supported resistance level range

  • Supported power range

  • Fitness machine features

Then, it starts ‘notify’ characteristics, which stream data from the device:
  • Indoor bike data (speed, cadence, distance, resistance level, power, time)

Finally, it modifies ‘write’ characteristics with some time in between:
  • Resistance level

  • Target power (automatically adjusts resistance level based on cadence to maintain same power)

import asyncio
from bleak import BleakClient
from pycycling.fitness_machine_service import FitnessMachineService

async def run(address):
    async with BleakClient(address, timeout=10) as client:
        ftms = FitnessMachineService(client)

        # Print 'read' characteristics

        ### Fitness Machine Features
        fitness_machine_features, target_setting_features = await ftms.get_fitness_machine_feature()
        fitness_machine_features = fitness_machine_features._asdict()
        target_setting_features = target_setting_features._asdict()

        def print_features(features):
            for key, value in features.items():
                print(f"{key}: {value}")
            print()
        
        print("Fitness machine feature:")
        print_features(fitness_machine_features)

        print("Target setting features:")
        print_features(target_setting_features)

        print(target_setting_features)


        if target_setting_features["resistance_target_setting_supported"]:
            supported_resistance_level_range = (
                await ftms.get_supported_resistance_level_range()
            )
            print("Supported resistance level range:")
            print(supported_resistance_level_range)
            print()
            max_resistance = supported_resistance_level_range.maximum_resistance

        if target_setting_features["power_target_setting_supported"]:
            supported_power_range = await ftms.get_supported_power_range()
            print("Supported power range:")
            print(supported_power_range)
            print()
            max_power = supported_power_range.maximum_power

        # Start receiving and printing 'notify' characteristics
        def print_indoor_bike_data(data):
            print("Received indoor bike data:")
            print(data)
            print()

        ftms.set_indoor_bike_data_handler(print_indoor_bike_data)
        await ftms.enable_indoor_bike_data_notify()

        def print_fitness_machine_status(data):
            print("Received fitness machine status:")
            print("\t" + str(data))
            print()

        ftms.set_fitness_machine_status_handler(print_fitness_machine_status)
        await ftms.enable_fitness_machine_status_notify()

        def print_training_status(data):
            print("Received training status:")
            print(data)
            print()

        ftms.set_training_status_handler(print_training_status)
        await ftms.enable_training_status_notify()

        # Write to 'write' characteristics
        # IMPORTANT: Before being able to write, the client (this script) must

        # 1. Start receiving 'indicate' notifications from the control point characteristic
        def print_control_point_response(message):
            print("Received control point response:")
            print(message)
            print()

        ftms.set_control_point_response_handler(print_control_point_response)
        await ftms.enable_control_point_indicate()
        # 2. 'write' a request to control the fitness machine
        await ftms.request_control()
        # 3. (recommended) 'write' a reset command
        await ftms.reset()

        # Set target resistance level
        if target_setting_features["resistance_target_setting_supported"]:
            print("Setting target resistance level to 25 percent of maximum resistance level...")
            await ftms.set_target_resistance_level(max_resistance * 0.25)

            await asyncio.sleep(5)

            print("Increasing target resistance level to 50 percent of maximum resistance level...")
            await ftms.set_target_resistance_level(max_resistance * 0.5)

            await asyncio.sleep(5)

            # Reset target resistance level
            print("Resetting target resistance level...")

            await ftms.reset()

        # Set target power
        if target_setting_features["power_target_setting_supported"]:
            power_level = 4 / 100 * max_power
            print(f"Increasing target power to 4 percent of maximum power ({power_level}W).")
            print("The trainer will automatically adjust resistance based on your leg speed.")
            print(f"Try pedaling above {power_level}W to feel decreasing resistance, and vice versa.")
            await ftms.set_target_power(power_level)

            await asyncio.sleep(30)

            # Reset
            print("Resetting target power...")
            await ftms.reset()

        # Set simulation parameters
        if target_setting_features["indoor_bike_simulation_parameters_supported"]:
            print("Setting indoor bike simulation parameters to 0")
            await ftms.set_simulation_parameters(0, 0, 0, 0)
            await asyncio.sleep(5)
            print("Setting indoor bike simulation grade to 10%")
            print("if connected to a compatible machine (like elite rizer), this should set its grade to +10%")
            await ftms.set_simulation_parameters(0, 1000, 0, 0)
            await asyncio.sleep(5)

            print("Resetting indoor bike simulation parameters...")
            await ftms.reset()


if __name__ == "__main__":
    import os

    os.environ["PYTHONASYNCIODEBUG"] = str(1)

    device_address = "DEVICE_ADDRESS HERE"
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(device_address))

    # Unresolved intermittent bug when writing to the control point characteristic:
    # bleak.exc.BleakDBusError: [org.bluez.Error.Failed] Operation failed with ATT error: 0x80 (Unknown code)
    #
    # To work around this, just retry on error, like:
    #
    # import bleak
    # while True:
    #     try:
    #         loop.run_until_complete(run(device_address))
    #     except bleak.exc.BleakDBusError as e:
    #         print("BleakDBusError, retrying...")
    #         print(e)
    #         continue
    #     except KeyboardInterrupt:
    #         break
class pycycling.fitness_machine_service.FitnessMachineService(client)[source]

Bases: object

async disable_control_point_indicate()[source]
async disable_fitness_machine_status_notify()[source]
async disable_indoor_bike_data_notify()[source]
async disable_training_status_notify()[source]
async enable_control_point_indicate() None[source]
async enable_fitness_machine_status_notify() None[source]
async enable_indoor_bike_data_notify() None[source]
async enable_training_status_notify() None[source]
async get_fitness_machine_feature() FitnessMachineFeature[source]
async get_supported_power_range() SupportedPowerRange[source]
async get_supported_resistance_level_range() SupportedResistanceLevelRange[source]
async request_control() None[source]
async reset() None[source]
set_control_point_response_handler(callback)[source]
set_fitness_machine_status_handler(callback)[source]
set_indoor_bike_data_handler(callback)[source]
async set_simulation_parameters(wind_speed: int, grade: int, crr: int, cw: int) None[source]
async set_spin_down_control(control: int) None[source]
async set_target_heart_rate(heart_rate: int) None[source]
async set_target_incline(inclination: int) None[source]
async set_target_power(power: int) None[source]
async set_target_resistance_level(level: int) None[source]
async set_target_speed(speed: int) None[source]
async set_targeted_cadence(cadence: int) None[source]
async set_targeted_distance(distance: int) None[source]
async set_targeted_expended_energy(energy: int) None[source]
async set_targeted_number_of_steps(steps: int) None[source]
async set_targeted_number_of_strides(strides: int) None[source]
async set_targeted_time_in_five_heart_rate_zones(times: list) None[source]
async set_targeted_time_in_three_heart_rate_zones(times: list) None[source]
async set_targeted_time_in_two_heart_rate_zones(times: list) None[source]
async set_targeted_training_time(time: int) None[source]
set_training_status_handler(callback)[source]
async set_wheel_circumference(circumference: int) None[source]
async start_or_resume() None[source]
async stop_or_pause(pause: bool) None[source]
class pycycling.fitness_machine_service.SupportedPowerRange(minimum_power, maximum_power, minimum_increment)

Bases: tuple

maximum_power

Alias for field number 1

minimum_increment

Alias for field number 2

minimum_power

Alias for field number 0

class pycycling.fitness_machine_service.SupportedResistanceLevelRange(minimum_resistance, maximum_resistance, minimum_increment)

Bases: tuple

maximum_resistance

Alias for field number 1

minimum_increment

Alias for field number 2

minimum_resistance

Alias for field number 0