Return-Path: From: Arman Uguray To: linux-bluetooth@vger.kernel.org Cc: Arman Uguray Subject: [PATCH BlueZ v1 17/17] tools: Added a Python example for GATT server API Date: Wed, 25 Feb 2015 21:13:30 -0800 Message-Id: <1424927610-26226-18-git-send-email-armansito@chromium.org> In-Reply-To: <1424927610-26226-1-git-send-email-armansito@chromium.org> References: <1424927610-26226-1-git-send-email-armansito@chromium.org> Sender: linux-bluetooth-owner@vger.kernel.org List-ID: This patch introduces tools/gatt-example, which demonstrates how an external application can use the GattManager1 API to register GATT services. This example simulates a fake Heart Rate service while also providing a test service with a non-SIG UUID that exercises various API behavior. --- tools/gatt-example | 463 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 463 insertions(+) create mode 100755 tools/gatt-example diff --git a/tools/gatt-example b/tools/gatt-example new file mode 100755 index 0000000..8847e4e --- /dev/null +++ b/tools/gatt-example @@ -0,0 +1,463 @@ +#!/usr/bin/python + +import dbus +import dbus.exceptions +import dbus.mainloop.glib +import dbus.service + +import array +import gobject + +from random import randint + +mainloop = None + +BLUEZ_SERVICE_NAME = 'org.bluez' +GATT_MANAGER_IFACE = 'org.bluez.GattManager1' +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' + +GATT_SERVICE_IFACE = 'org.bluez.GattService1' +GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' +GATT_DESC_IFACE = 'org.bluez.GattDescriptor1' + +class InvalidArgsException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' + +class NotSupportedException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.NotSupported' + +class NotPermittedException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.NotPermitted' + +class InvalidValueLengthException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.InvalidValueLength' + +class FailedException(dbus.exceptions.DBusException): + _dbus_error_name = 'org.bluez.Error.Failed' + + +class Service(dbus.service.Object): + PATH_BASE = '/org/bluez/example/service' + + def __init__(self, bus, index, uuid, primary): + self.path = self.PATH_BASE + str(index) + self.bus = bus + self.uuid = uuid + self.primary = primary + self.characteristics = [] + dbus.service.Object.__init__(self, bus, self.path) + + def get_properties(self): + return { + GATT_SERVICE_IFACE: { + 'UUID': self.uuid, + 'Primary': self.primary, + 'Characteristics': dbus.Array( + self.get_characteristic_paths(), + signature='o') + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_characteristic(self, characteristic): + self.characteristics.append(characteristic) + + def get_characteristic_paths(self): + result = [] + for chrc in self.characteristics: + result.append(chrc.get_path()) + return result + + def get_characteristics(self): + return self.characteristics + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_SERVICE_IFACE: + raise InvalidArgsException() + + return self.get_properties[GATT_SERVICE_IFACE] + + @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') + def GetManagedObjects(self): + response = {} + print 'GetManagedObjects' + + response[self.get_path()] = self.get_properties() + chrcs = self.get_characteristics() + for chrc in chrcs: + response[chrc.get_path()] = chrc.get_properties() + descs = chrc.get_descriptors() + for desc in descs: + response[desc.get_path()] = desc.get_properties() + + return response + + +class Characteristic(dbus.service.Object): + def __init__(self, bus, index, uuid, flags, service): + self.path = service.path + '/char' + str(index) + self.bus = bus + self.uuid = uuid + self.service = service + self.flags = flags + self.descriptors = [] + dbus.service.Object.__init__(self, bus, self.path) + + def get_properties(self): + return { + GATT_CHRC_IFACE: { + 'Service': self.service.get_path(), + 'UUID': self.uuid, + 'Flags': self.flags, + 'Descriptors': dbus.Array( + self.get_descriptor_paths(), + signature='o') + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + def add_descriptor(self, descriptor): + self.descriptors.append(descriptor) + + def get_descriptor_paths(self): + result = [] + for desc in self.descriptors: + result.append(desc.get_path()) + return result + + def get_descriptors(self): + return self.descriptors + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_CHRC_IFACE: + raise InvalidArgsException() + + return self.get_properties[GATT_CHRC_IFACE] + + @dbus.service.method(GATT_CHRC_IFACE, out_signature='ay') + def ReadValue(self): + print 'Default ReadValue called, returning error' + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE, in_signature='ay') + def WriteValue(self, value): + print 'Default WriteValue called, returning error' + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE) + def StartNotify(self): + print 'Default StartNotify called, returning error' + raise NotSupportedException() + + @dbus.service.method(GATT_CHRC_IFACE) + def StopNotify(self): + print 'Default StopNotify called, returning error' + raise NotSupportedException() + + @dbus.service.signal(DBUS_PROP_IFACE, + signature='sa{sv}as') + def PropertiesChanged(self, interface, changed, invalidated): + pass + + +class Descriptor(dbus.service.Object): + def __init__(self, bus, index, uuid, characteristic): + self.path = characteristic.path + '/desc' + str(index) + self.bus = bus + self.uuid = uuid + self.chrc = characteristic + dbus.service.Object.__init__(self, bus, self.path) + + def get_properties(self): + return { + GATT_DESC_IFACE: { + 'Characteristic': self.chrc.get_path(), + 'UUID': self.uuid, + } + } + + def get_path(self): + return dbus.ObjectPath(self.path) + + @dbus.service.method(DBUS_PROP_IFACE, + in_signature='s', + out_signature='a{sv}') + def GetAll(self, interface): + if interface != GATT_DESC_IFACE: + raise InvalidArgsException() + + return self.get_properties[GATT_CHRC_IFACE] + + @dbus.service.method(GATT_DESC_IFACE, out_signature='ay') + def ReadValue(self): + print 'Default ReadValue called, returning error' + raise NotSupportedException() + + @dbus.service.method(GATT_DESC_IFACE, in_signature='ay') + def WriteValue(self, value): + print 'Default WriteValue called, returning error' + raise NotSupportedException() + + +class HeartRateService(Service): + """ + Fake Heart Rate Service that simulates a fake heart beat and control point + behavior. + + """ + HR_UUID = '0000180d-0000-1000-8000-00805f9b34fb' + + def __init__(self, bus, index): + Service.__init__(self, bus, index, self.HR_UUID, True) + self.add_characteristic(HeartRateMeasurementChrc(bus, 0, self)) + self.add_characteristic(BodySensorLocationChrc(bus, 1, self)) + self.add_characteristic(HeartRateControlPointChrc(bus, 2, self)) + self.energy_expended = 0 + + +class HeartRateMeasurementChrc(Characteristic): + HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb' + + def __init__(self, bus, index, service): + Characteristic.__init__( + self, bus, index, + self.HR_MSRMT_UUID, + ['notify'], + service) + self.notifying = False; + self.hr_ee_count = 0 + + def hr_msrmt_cb(self): + value = [] + value.append(dbus.Byte(0x06)) + + value.append(dbus.Byte(randint(90, 130))) + + if self.hr_ee_count % 10 == 0: + value[0] = dbus.Byte(value[0] | 0x08) + value.append(dbus.Byte(self.service.energy_expended & 0xff)) + value.append(dbus.Byte((self.service.energy_expended >> 8) & 0xff)) + + self.service.energy_expended = \ + min(0xffff, self.service.energy_expended + 1) + self.hr_ee_count += 1 + + print 'Updating value: ' + repr(value) + + self.PropertiesChanged(GATT_CHRC_IFACE, { 'Value': value }, []) + + return self.notifying + + def _update_hr_msrmt_simulation(self): + print 'Update HR Measurement Simulation' + + if not self.notifying: + return + + gobject.timeout_add(1000, self.hr_msrmt_cb) + + def StartNotify(self): + if self.notifying: + print 'Already notifying, nothing to do' + return + + self.notifying = True + self._update_hr_msrmt_simulation() + + def StopNotify(self): + if not self.notifying: + print 'Not notifying, nothing to do' + return + + self.notifying = False + self._update_hr_msrmt_simulation() + + +class BodySensorLocationChrc(Characteristic): + BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb' + + def __init__(self, bus, index, service): + Characteristic.__init__( + self, bus, index, + self.BODY_SNSR_LOC_UUID, + ['read'], + service) + + def ReadValue(self): + # Return 'Chest' as the sensor location. + return [ 0x01 ] + +class HeartRateControlPointChrc(Characteristic): + HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb' + + def __init__(self, bus, index, service): + Characteristic.__init__( + self, bus, index, + self.HR_CTRL_PT_UUID, + ['write'], + service) + + def WriteValue(self, value): + print 'Heart Rate Control Point WriteValue called' + + if len(value) != 1: + raise InvalidValueLengthException() + + byte = value[0] + print 'Control Point value: ' + repr(byte) + + if byte != 1: + raise FailedException("0x80") + + print 'Energy Expended field reset!' + self.service.energy_expended = 0 + + +class TestService(Service): + """ + Dummy test service that provides characteristics and descriptors that + exercise various API functionality. + + """ + TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0' + + def __init__(self, bus, index): + Service.__init__(self, bus, index, self.TEST_SVC_UUID, False) + self.add_characteristic(TestCharacteristic(bus, 0, self)) + + +class TestCharacteristic(Characteristic): + """ + Dummy test characteristic. Allows writing arbitrary bytes to its value, and + contains "extended properties", as well as a test descriptor. + + """ + TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1' + + def __init__(self, bus, index, service): + Characteristic.__init__( + self, bus, index, + self.TEST_CHRC_UUID, + ['read', 'write', 'writable-auxiliaries'], + service) + self.value = [] + self.add_descriptor(TestDescriptor(bus, 0, self)) + self.add_descriptor( + CharacteristicUserDescriptionDescriptor(bus, 1, self)) + + def ReadValue(self): + print 'TestCharacteristic Read: ' + repr(self.value) + return self.value + + def WriteValue(self, value): + print 'TestCharacteristic Write: ' + repr(value) + self.value = value + + +class TestDescriptor(Descriptor): + """ + Dummy test descriptor. Returns a static value. + + """ + TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef2' + + def __init__(self, bus, index, characteristic): + Descriptor.__init__( + self, bus, index, + self.TEST_DESC_UUID, + characteristic) + + def ReadValue(self): + return [ + dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t') + ] + + +class CharacteristicUserDescriptionDescriptor(Descriptor): + """ + Writable CUD descriptor. + + """ + CUD_UUID = '2901' + + def __init__(self, bus, index, characteristic): + self.writable = 'writable-auxiliaries' in characteristic.flags + self.value = array.array('B', 'This is characteristic is for testing') + self.value = self.value.tolist() + Descriptor.__init__( + self, bus, index, + self.CUD_UUID, + characteristic) + + def ReadValue(self): + return self.value + + def WriteValue(self, value): + if not self.writable: + raise NotPermittedException() + self.value = value + + +def register_service_cb(): + print 'GATT service registered' + + +def register_service_error_cb(error): + print 'Failed to register service: ' + str(error) + mainloop.quit() + + +def find_adapter(bus): + remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), + DBUS_OM_IFACE) + objects = remote_om.GetManagedObjects() + + for o, props in objects.iteritems(): + if props.has_key(GATT_MANAGER_IFACE): + return o + + return None + +def main(): + global mainloop + + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + bus = dbus.SystemBus() + + adapter = find_adapter(bus) + if not adapter: + print 'GattManager1 interface not found' + return + + service_manager = dbus.Interface( + bus.get_object(BLUEZ_SERVICE_NAME, adapter), + GATT_MANAGER_IFACE) + + hr_service = HeartRateService(bus, 0) + test_service = TestService(bus, 1) + + mainloop = gobject.MainLoop() + + service_manager.RegisterService(hr_service.get_path(), {}, + reply_handler=register_service_cb, + error_handler=register_service_error_cb) + service_manager.RegisterService(test_service.get_path(), {}, + reply_handler=register_service_cb, + error_handler=register_service_error_cb) + + mainloop.run() + +if __name__ == '__main__': + main() -- 2.2.0.rc0.207.ga3a616c