-
Notifications
You must be signed in to change notification settings - Fork 300
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added support for Google´s Bumble Bluetooth Controller stack
The backend supports direct use with Bumble. The HCI Controller is managed by the Bumble stack and the transport layer can be defined by the user (e.g. VHCI, Serial, TCP, android-netsim).
- Loading branch information
Showing
14 changed files
with
725 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ Contributors | |
* David Johansen <[email protected]> | ||
* JP Hutchins <[email protected]> | ||
* Bram Duvigneau <[email protected]> | ||
* Victor Chavez <[email protected]> | ||
|
||
Sponsors | ||
-------- | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# SPDX-License-Identifier: MIT | ||
# Copyright (c) 2024 Victor Chavez | ||
"""Bumble backend.""" | ||
|
||
from bumble.controller import Controller | ||
from bumble.link import LocalLink | ||
from bumble.transport import open_transport | ||
|
||
transports = {} | ||
link = LocalLink() | ||
|
||
|
||
def get_default_transport(): | ||
return "tcp-server:_:1234" | ||
|
||
|
||
async def start_transport(transport: str): | ||
if transport not in transports.keys(): | ||
transports[transport] = await open_transport(transport) | ||
Controller( | ||
"ext", | ||
host_source=transports[transport].source, | ||
host_sink=transports[transport].sink, | ||
link=link, | ||
) | ||
|
||
return transports[transport] | ||
|
||
|
||
def get_link(): | ||
# Assume all transports are linked | ||
return link |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
# SPDX-License-Identifier: MIT | ||
# Copyright (c) 2024 Victor Chavez | ||
|
||
from typing import Callable, List, Union | ||
from uuid import UUID | ||
|
||
from bumble.gatt_client import CharacteristicProxy, ServiceProxy | ||
|
||
from ... import normalize_uuid_str | ||
from ..characteristic import BleakGATTCharacteristic | ||
from ..descriptor import BleakGATTDescriptor | ||
from .utils import uuid_bytes_to_str | ||
|
||
|
||
class BleakGATTCharacteristicBumble(BleakGATTCharacteristic): | ||
"""GATT Characteristic implementation for the Bumble backend.""" | ||
|
||
def __init__( | ||
self, | ||
obj: CharacteristicProxy, | ||
max_write_without_response_size: Callable[[], int], | ||
svc: ServiceProxy, | ||
): | ||
super().__init__(obj, max_write_without_response_size) | ||
self.__descriptors = [] | ||
self.__props = [str(prop) for prop in obj.properties] | ||
self.__svc = svc | ||
uuid = uuid_bytes_to_str(obj.uuid.uuid_bytes) | ||
self.__uuid = normalize_uuid_str(uuid) | ||
|
||
@property | ||
def service_uuid(self) -> str: | ||
"""The uuid of the Service containing this characteristic""" | ||
return self.__svc.uuid | ||
|
||
@property | ||
def service_handle(self) -> int: | ||
"""The integer handle of the Service containing this characteristic""" | ||
return self.__svc.handle | ||
|
||
@property | ||
def handle(self) -> int: | ||
"""The handle of this characteristic""" | ||
return int(self.obj.handle) | ||
|
||
@property | ||
def uuid(self) -> str: | ||
"""The uuid of this characteristic""" | ||
return self.__uuid | ||
|
||
@property | ||
def properties(self) -> List[str]: | ||
"""Properties of this characteristic""" | ||
return self.__props | ||
|
||
@property | ||
def descriptors(self) -> List[BleakGATTDescriptor]: | ||
"""List of descriptors for this characteristic""" | ||
return self.__descriptors | ||
|
||
def get_descriptor( | ||
self, specifier: Union[int, str, UUID] | ||
) -> Union[BleakGATTDescriptor, None]: | ||
"""Get a descriptor by handle (int) or UUID (str or uuid.UUID)""" | ||
try: | ||
if isinstance(specifier, int): | ||
return next(filter(lambda x: x.handle == specifier, self.descriptors)) | ||
else: | ||
return next( | ||
filter(lambda x: x.uuid == str(specifier), self.descriptors) | ||
) | ||
except StopIteration: | ||
return None | ||
|
||
def add_descriptor(self, descriptor: BleakGATTDescriptor): | ||
"""Add a :py:class:`~BleakGATTDescriptor` to the characteristic. | ||
Should not be used by end user, but rather by `bleak` itself. | ||
""" | ||
self.__descriptors.append(descriptor) |
Oops, something went wrong.