2015-04-02 00:41:59

by Arman Uguray

[permalink] [raw]
Subject: [PATCH] test: Add Python GATT client example

This patch introduces test/example-gatt-client which implements a simple
D-Bus client application for a remote Heart Rate service.
---
test/example-gatt-client | 218 +++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 218 insertions(+)
create mode 100755 test/example-gatt-client

diff --git a/test/example-gatt-client b/test/example-gatt-client
new file mode 100755
index 0000000..329df35
--- /dev/null
+++ b/test/example-gatt-client
@@ -0,0 +1,218 @@
+#!/usr/bin/python
+
+import argparse
+import dbus
+import gobject
+import sys
+
+from dbus.mainloop.glib import DBusGMainLoop
+
+bus = None
+mainloop = None
+
+BLUEZ_SERVICE_NAME = 'org.bluez'
+DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
+DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
+
+GATT_SERVICE_IFACE = 'org.bluez.GattService1'
+GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
+
+HR_SVC_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
+HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
+BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
+HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'
+
+# The objects that we interact with.
+hr_service = None
+hr_msrmt_chrc = None
+body_snsr_loc_chrc = None
+hr_ctrl_pt_chrc = None
+
+
+def generic_error_cb(error):
+ print('D-Bus call failed: ' + str(error))
+ mainloop.quit()
+
+
+def body_sensor_val_to_str(val):
+ if val == 0:
+ return 'Other'
+ if val == 1:
+ return 'Chest'
+ if val == 2:
+ return 'Wrist'
+ if val == 3:
+ return 'Finger'
+ if val == 4:
+ return 'Hand'
+ if val == 5:
+ return 'Ear Lobe'
+ if val == 6:
+ return 'Foot'
+
+ return 'Reserved value'
+
+
+def sensor_contact_val_to_str(val):
+ if val == 0 or val == 1:
+ return 'not supported'
+ if val == 2:
+ return 'no contact detected'
+ if val == 3:
+ return 'contact detected'
+
+ return 'invalid value'
+
+
+def body_sensor_val_cb(value):
+ if len(value) != 1:
+ print('Invalid body sensor location value: ' + repr(value))
+ return
+
+ print('Body sensor location value: ' + body_sensor_val_to_str(value[0]))
+
+
+def hr_msrmt_start_notify_cb():
+ print('HR Measurement notifications enabled')
+
+
+def hr_msrmt_changed_cb(iface, changed_props, invalidated_props):
+ if iface != GATT_CHRC_IFACE:
+ return
+
+ if not len(changed_props):
+ return
+
+ value = changed_props.get('Value', None)
+ if not value:
+ return
+
+ print('New HR Measurement')
+
+ flags = value[0]
+ value_format = flags & 0x01
+ sc_status = (flags >> 1) & 0x03
+ ee_status = flags & 0x08
+
+ if value_format == 0x00:
+ hr_msrmt = value[1]
+ next_ind = 2
+ else:
+ hr_msrmt = value[1] | (value[2] << 8)
+ next_ind = 3
+
+ print('\tHR: ' + str(int(hr_msrmt)))
+ print('\tSensor Contact status: ' +
+ sensor_contact_val_to_str(sc_status))
+
+ if ee_status:
+ print('\tEnergy Expended: ' + str(int(value[next_ind])))
+
+
+def start_client():
+ # Read the Body Sensor Location value and print it asynchronously.
+ body_snsr_loc_chrc[0].ReadValue(reply_handler=body_sensor_val_cb,
+ error_handler=generic_error_cb,
+ dbus_interface=GATT_CHRC_IFACE)
+
+ # Listen to PropertiesChanged signals from the Heart Measurement
+ # Characteristic.
+ hr_msrmt_prop_iface = dbus.Interface(hr_msrmt_chrc[0], DBUS_PROP_IFACE)
+ hr_msrmt_prop_iface.connect_to_signal("PropertiesChanged",
+ hr_msrmt_changed_cb)
+
+ # Subscribe to Heart Rate Measurement notifications.
+ hr_msrmt_chrc[0].StartNotify(reply_handler=hr_msrmt_start_notify_cb,
+ error_handler=generic_error_cb,
+ dbus_interface=GATT_CHRC_IFACE)
+
+
+def process_chrc(chrc_path):
+ chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
+ chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
+ dbus_interface=DBUS_PROP_IFACE)
+
+ uuid = chrc_props['UUID']
+
+ if uuid == HR_MSRMT_UUID:
+ global hr_msrmt_chrc
+ hr_msrmt_chrc = (chrc, chrc_props)
+ elif uuid == BODY_SNSR_LOC_UUID:
+ global body_snsr_loc_chrc
+ body_snsr_loc_chrc = (chrc, chrc_props)
+ elif uuid == HR_CTRL_PT_UUID:
+ global hr_ctrl_pt_chrc
+ hr_ctrl_pt_chrc = (chrc, chrc_props)
+ else:
+ print('Unrecognized characteristic: ' + uuid)
+
+ return True
+
+
+def process_hr_service(service_path):
+ service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
+ service_props = service.GetAll(GATT_SERVICE_IFACE,
+ dbus_interface=DBUS_PROP_IFACE)
+
+ uuid = service_props['UUID']
+
+ if uuid != HR_SVC_UUID:
+ print('Service is not a Heart Rate Service: ' + uuid)
+ return False
+
+ # Process the characteristics.
+ chrc_paths = service_props['Characteristics']
+ for chrc_path in chrc_paths:
+ process_chrc(chrc_path)
+
+ global hr_service
+ hr_service = (service, service_props, service_path)
+
+ return True
+
+
+def interfaces_removed_cb(object_path, interfaces):
+ if not hr_service:
+ return
+
+ if object_path == hr_service[2]:
+ print('Service was removed')
+ mainloop.quit()
+
+
+def main():
+ # Prase the service path from the arguments.
+ parser = argparse.ArgumentParser(
+ description='D-Bus Heart Rate Service client example')
+ parser.add_argument('service_path', metavar='<service-path>',
+ type=dbus.ObjectPath, nargs=1,
+ help='GATT service object path')
+ args = parser.parse_args()
+ service_path = args.service_path[0]
+
+ # Set up the main loop.
+ DBusGMainLoop(set_as_default=True)
+ global bus
+ bus = dbus.SystemBus()
+ global mainloop
+ mainloop = gobject.MainLoop()
+
+ om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
+ om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
+
+ try:
+ if not process_hr_service(service_path):
+ sys.exit(1)
+ except dbus.DBusException as e:
+ print e.message
+ sys.exit(1)
+
+ print 'Heart Rate Service ready'
+
+ start_client()
+
+ mainloop.run()
+
+
+if __name__ == '__main__':
+ main()
--
2.2.0.rc0.207.ga3a616c



2015-04-02 01:48:12

by Arman Uguray

[permalink] [raw]
Subject: Re: [PATCH] test: Add Python GATT client example

Hi,

> On Wed, Apr 1, 2015 at 5:41 PM, Arman Uguray <[email protected]> wrote:
> This patch introduces test/example-gatt-client which implements a simple
> D-Bus client application for a remote Heart Rate service.
> ---
> test/example-gatt-client | 218 +++++++++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 218 insertions(+)
> create mode 100755 test/example-gatt-client
>
> diff --git a/test/example-gatt-client b/test/example-gatt-client
> new file mode 100755
> index 0000000..329df35
> --- /dev/null
> +++ b/test/example-gatt-client
> @@ -0,0 +1,218 @@
> +#!/usr/bin/python
> +
> +import argparse
> +import dbus
> +import gobject
> +import sys
> +
> +from dbus.mainloop.glib import DBusGMainLoop
> +
> +bus = None
> +mainloop = None
> +
> +BLUEZ_SERVICE_NAME = 'org.bluez'
> +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
> +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
> +
> +GATT_SERVICE_IFACE = 'org.bluez.GattService1'
> +GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
> +
> +HR_SVC_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
> +HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
> +BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
> +HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'
> +
> +# The objects that we interact with.
> +hr_service = None
> +hr_msrmt_chrc = None
> +body_snsr_loc_chrc = None
> +hr_ctrl_pt_chrc = None
> +
> +
> +def generic_error_cb(error):
> + print('D-Bus call failed: ' + str(error))
> + mainloop.quit()
> +
> +
> +def body_sensor_val_to_str(val):
> + if val == 0:
> + return 'Other'
> + if val == 1:
> + return 'Chest'
> + if val == 2:
> + return 'Wrist'
> + if val == 3:
> + return 'Finger'
> + if val == 4:
> + return 'Hand'
> + if val == 5:
> + return 'Ear Lobe'
> + if val == 6:
> + return 'Foot'
> +
> + return 'Reserved value'
> +
> +
> +def sensor_contact_val_to_str(val):
> + if val == 0 or val == 1:
> + return 'not supported'
> + if val == 2:
> + return 'no contact detected'
> + if val == 3:
> + return 'contact detected'
> +
> + return 'invalid value'
> +
> +
> +def body_sensor_val_cb(value):
> + if len(value) != 1:
> + print('Invalid body sensor location value: ' + repr(value))
> + return
> +
> + print('Body sensor location value: ' + body_sensor_val_to_str(value[0]))
> +
> +
> +def hr_msrmt_start_notify_cb():
> + print('HR Measurement notifications enabled')
> +
> +
> +def hr_msrmt_changed_cb(iface, changed_props, invalidated_props):
> + if iface != GATT_CHRC_IFACE:
> + return
> +
> + if not len(changed_props):
> + return
> +
> + value = changed_props.get('Value', None)
> + if not value:
> + return
> +
> + print('New HR Measurement')
> +
> + flags = value[0]
> + value_format = flags & 0x01
> + sc_status = (flags >> 1) & 0x03
> + ee_status = flags & 0x08
> +
> + if value_format == 0x00:
> + hr_msrmt = value[1]
> + next_ind = 2
> + else:
> + hr_msrmt = value[1] | (value[2] << 8)
> + next_ind = 3
> +
> + print('\tHR: ' + str(int(hr_msrmt)))
> + print('\tSensor Contact status: ' +
> + sensor_contact_val_to_str(sc_status))
> +
> + if ee_status:
> + print('\tEnergy Expended: ' + str(int(value[next_ind])))
> +
> +
> +def start_client():
> + # Read the Body Sensor Location value and print it asynchronously.
> + body_snsr_loc_chrc[0].ReadValue(reply_handler=body_sensor_val_cb,
> + error_handler=generic_error_cb,
> + dbus_interface=GATT_CHRC_IFACE)
> +
> + # Listen to PropertiesChanged signals from the Heart Measurement
> + # Characteristic.
> + hr_msrmt_prop_iface = dbus.Interface(hr_msrmt_chrc[0], DBUS_PROP_IFACE)
> + hr_msrmt_prop_iface.connect_to_signal("PropertiesChanged",
> + hr_msrmt_changed_cb)
> +
> + # Subscribe to Heart Rate Measurement notifications.
> + hr_msrmt_chrc[0].StartNotify(reply_handler=hr_msrmt_start_notify_cb,
> + error_handler=generic_error_cb,
> + dbus_interface=GATT_CHRC_IFACE)
> +
> +
> +def process_chrc(chrc_path):
> + chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
> + chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
> + dbus_interface=DBUS_PROP_IFACE)
> +
> + uuid = chrc_props['UUID']
> +
> + if uuid == HR_MSRMT_UUID:
> + global hr_msrmt_chrc
> + hr_msrmt_chrc = (chrc, chrc_props)
> + elif uuid == BODY_SNSR_LOC_UUID:
> + global body_snsr_loc_chrc
> + body_snsr_loc_chrc = (chrc, chrc_props)
> + elif uuid == HR_CTRL_PT_UUID:
> + global hr_ctrl_pt_chrc
> + hr_ctrl_pt_chrc = (chrc, chrc_props)
> + else:
> + print('Unrecognized characteristic: ' + uuid)
> +
> + return True
> +
> +
> +def process_hr_service(service_path):
> + service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
> + service_props = service.GetAll(GATT_SERVICE_IFACE,
> + dbus_interface=DBUS_PROP_IFACE)
> +
> + uuid = service_props['UUID']
> +
> + if uuid != HR_SVC_UUID:
> + print('Service is not a Heart Rate Service: ' + uuid)
> + return False
> +
> + # Process the characteristics.
> + chrc_paths = service_props['Characteristics']
> + for chrc_path in chrc_paths:
> + process_chrc(chrc_path)
> +
> + global hr_service
> + hr_service = (service, service_props, service_path)
> +
> + return True
> +
> +
> +def interfaces_removed_cb(object_path, interfaces):
> + if not hr_service:
> + return
> +
> + if object_path == hr_service[2]:
> + print('Service was removed')
> + mainloop.quit()
> +
> +
> +def main():
> + # Prase the service path from the arguments.
> + parser = argparse.ArgumentParser(
> + description='D-Bus Heart Rate Service client example')
> + parser.add_argument('service_path', metavar='<service-path>',
> + type=dbus.ObjectPath, nargs=1,
> + help='GATT service object path')
> + args = parser.parse_args()
> + service_path = args.service_path[0]
> +
> + # Set up the main loop.
> + DBusGMainLoop(set_as_default=True)
> + global bus
> + bus = dbus.SystemBus()
> + global mainloop
> + mainloop = gobject.MainLoop()
> +
> + om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
> + om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
> +
> + try:
> + if not process_hr_service(service_path):
> + sys.exit(1)
> + except dbus.DBusException as e:
> + print e.message
> + sys.exit(1)
> +
> + print 'Heart Rate Service ready'
> +
> + start_client()
> +
> + mainloop.run()
> +
> +
> +if __name__ == '__main__':
> + main()
> --
> 2.2.0.rc0.207.ga3a616c
>

Pushed.

Thanks,
Arman