Multiprocessing / multithreading with BLEAK to avoid asyncio data loss and connection failure #570
-
DescriptionDescribe what you were trying to get done: Tell us what happened, what went wrong, and what you expected to happen: I am trying to find a solution that: (1) maintains the integrity of the It seems like What I Didfrom pylsl import StreamInfo, StreamOutlet
import asyncio
import aioconsole
import os
import signal
import sys
import getopt
import math
import time
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
from bleak import BleakClient
from bleak.uuids import uuid16_dict
""" Predefined UUID (Universal Unique Identifier) mappings are based on Heart Rate GATT service Protocol that most
Fitness/Heart Rate device manufacturers follow (Polar H10 in this case) to obtain a specific response input from
the device acting as an API """
uuid16_dict = {v: k for k, v in uuid16_dict.items()}
## This is the device MAC ID (WIN) or CFD (MAC), please update with your device ID
ADDRESS = "3E44F50E-A858-4CC2-BF9F-461FC6D25A93"
STREAMNAME = 'PolarBand'
## UUID for model number ##
MODEL_NBR_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(
uuid16_dict.get("Model Number String")
)
## UUID for manufacturer name ##
MANUFACTURER_NAME_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(
uuid16_dict.get("Manufacturer Name String")
)
## UUID for battery level ##
BATTERY_LEVEL_UUID = "0000{0:x}-0000-1000-8000-00805f9b34fb".format(
uuid16_dict.get("Battery Level")
)
## UUID for connection establsihment with device ##
PMD_SERVICE = "FB005C80-02E7-F387-1CAD-8ACD2D8DF0C8"
## UUID for Request of stream settings ##
PMD_CONTROL = "FB005C81-02E7-F387-1CAD-8ACD2D8DF0C8"
## UUID for Request of start stream ##
PMD_DATA = "FB005C82-02E7-F387-1CAD-8ACD2D8DF0C8"
## UUID for Request of ECG Stream ##
ECG_WRITE = bytearray([0x02, 0x00, 0x00, 0x01, 0x82, 0x00, 0x01, 0x01, 0x0E, 0x00])
## For Polar H10 sampling frequency ##
ECG_SAMPLING_FREQ = 130
# for plotting using python
ecg_session_data = []
ecg_session_time = []
ecg = []
# for streaming to LSL
OUTLET = []
# define LSL stream
def StartStream(STREAMNAME):
info = StreamInfo(STREAMNAME, 'ECG', 1,ECG_SAMPLING_FREQ, 'float32', 'myuid2424')
info.desc().append_child_value("manufacturer", "Polar")
channels = info.desc().append_child("channels")
for c in ["ECG"]:
channels.append_child("channel")\
.append_child_value("name", c)\
.append_child_value("unit", "microvolts")\
.append_child_value("type", "ECG")
# next make an outlet; we set the transmission chunk size to 74 samples and
# the outgoing buffer size to 360 seconds (max.)
return StreamOutlet(info, 74, 360)
## Bit conversion of the Hexadecimal stream
def data_conv(sender, data):
if data[0] == 0x00:
timestamp = convert_to_unsigned_long(data, 1, 8)
step = 3
samples = data[10:]
offset = 0
while offset < len(samples):
ecg = convert_array_to_signed_int(samples, offset, step)
offset += step
OUTLET.push_sample([ecg])
ecg_session_data.extend([ecg])
ecg_session_time.extend([timestamp])
def convert_array_to_signed_int(data, offset, length):
return int.from_bytes(
bytearray(data[offset : offset + length]), byteorder="little", signed=True,
)
def convert_to_unsigned_long(data, offset, length):
return int.from_bytes(
bytearray(data[offset : offset + length]), byteorder="little", signed=False,
)
## Aynchronous task to start the data stream for ECG ##
async def run(client, debug=False):
print("---------Looking for Device------------ ", flush=True)
await client.connect()
print("---------Device connected--------------")
#model_number = await client.read_gatt_char(MODEL_NBR_UUID)
#print("Model Number: {0}".format("".join(map(chr, model_number))), flush=True)
#manufacturer_name = await client.read_gatt_char(MANUFACTURER_NAME_UUID)
#print("Manufacturer Name: {0}".format("".join(map(chr, manufacturer_name))))
#battery_level = await client.read_gatt_char(BATTERY_LEVEL_UUID)
#print("Battery Level: {0}%".format(int(battery_level[0])))
await client.read_gatt_char(PMD_CONTROL)
await client.write_gatt_char(PMD_CONTROL, ECG_WRITE)
## ECG stream started
await client.start_notify(PMD_DATA, data_conv)
print("Collecting ECG data...")
await aioconsole.ainput('Running: Press a key to quit')
await client.stop_notify(PMD_DATA)
await client.disconnect()
print("Stopping ECG data...", flush=True)
print("[CLOSED] application closed.", flush=True)
sys.exit(0)
async def liveplot():
print('liveplotting')
return
async def main():
client = BleakClient(ADDRESS)
tasks = [
asyncio.ensure_future(run(client, True)),
asyncio.create_task(liveplot())
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
# start LSL stream
OUTLET = StartStream(STREAMNAME)
os.environ["PYTHONASYNCIODEBUG"] = str(1)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main()) Error received:
Terminal output:
From the terminal output, this results from the following:
Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
I felt that this was more of a question on how to implement something with Bleak than an issue. If the non-interrupted stream is the top priority, you should definitely run it in a separate process, and pass the data from it to another process handling the plotting. Do this through an external queue, e.g. RabbitMQ or Mosquitto is my recommendation. In the asyncio framing, the live plottning will most surely hinder the notifications from arriving in time. |
Beta Was this translation helpful? Give feedback.
I felt that this was more of a question on how to implement something with Bleak than an issue.
If the non-interrupted stream is the top priority, you should definitely run it in a separate process, and pass the data from it to another process handling the plotting. Do this through an external queue, e.g. RabbitMQ or Mosquitto is my recommendation.
In the asyncio framing, the live plottning will most surely hinder the notifications from arriving in time.