2020-12-08 23:00:30

by Manish Mandlik

[permalink] [raw]
Subject: [bluez PATCH v1] test: add test app for Advertisement Monitor API

The python test app to test working of the Advertisement Monitor API.

This app:
- registers itself with bluez by invoking the RegisterMonitor with the
app root path
- exposes multiple monitor objects with both valid and invalid monitor
parameters
- implements Activate/Release/DeviceFound/DeviceLost methods on the
monitor object

Signed-off-by: Manish Mandlik <[email protected]>
Reviewed-by: [email protected]
Reviewed-by: [email protected]
Reviewed-by: [email protected]
---

test/example-adv-monitor | 402 +++++++++++++++++++++++++++++++++++++++
1 file changed, 402 insertions(+)
create mode 100644 test/example-adv-monitor

diff --git a/test/example-adv-monitor b/test/example-adv-monitor
new file mode 100644
index 000000000..6fe8a3058
--- /dev/null
+++ b/test/example-adv-monitor
@@ -0,0 +1,402 @@
+#!/usr/bin/python
+# SPDX-License-Identifier: LGPL-2.1-or-later
+
+import argparse
+import dbus
+import dbus.mainloop.glib
+import dbus.service
+import json
+import time
+
+from threading import Thread
+
+try:
+ from gi.repository import GObject # python3
+except ImportError:
+ import gobject as GObject # python2
+
+DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
+DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
+
+BLUEZ_SERVICE_NAME = 'org.bluez'
+
+ADV_MONITOR_MANAGER_IFACE = 'org.bluez.AdvertisementMonitorManager1'
+ADV_MONITOR_IFACE = 'org.bluez.AdvertisementMonitor1'
+ADV_MONITOR_APP_BASE_PATH = '/org/bluez/example/adv_monitor_app'
+
+
+class AdvMonitor(dbus.service.Object):
+
+ # Indexes of the Monitor object parameters in a monitor data list.
+ MONITOR_TYPE = 0
+ RSSI_FILTER = 1
+ PATTERNS = 2
+
+ # Indexes of the RSSI filter parameters in a monitor data list.
+ RSSI_H_THRESH = 0
+ RSSI_H_TIMEOUT = 1
+ RSSI_L_THRESH = 2
+ RSSI_L_TIMEOUT = 3
+
+ # Indexes of the Patterns filter parameters in a monitor data list.
+ PATTERN_START_POS = 0
+ PATTERN_AD_TYPE = 1
+ PATTERN_DATA = 2
+
+ def __init__(self, bus, app_path, monitor_id, monitor_data):
+ self.path = app_path + '/monitor' + str(monitor_id)
+ self.bus = bus
+
+ self._set_type(monitor_data[self.MONITOR_TYPE])
+ self._set_rssi(monitor_data[self.RSSI_FILTER])
+ self._set_patterns(monitor_data[self.PATTERNS])
+
+ super(AdvMonitor, self).__init__(self.bus, self.path)
+
+
+ def get_path(self):
+ return dbus.ObjectPath(self.path)
+
+
+ def get_properties(self):
+ properties = dict()
+ properties['Type'] = dbus.String(self.monitor_type)
+ properties['RSSIThresholdsAndTimers'] = dbus.Struct(self.rssi,
+ signature='nqnq')
+ properties['Patterns'] = dbus.Array(self.patterns, signature='(yyay)')
+ return {ADV_MONITOR_IFACE: properties}
+
+
+ def _set_type(self, monitor_type):
+ self.monitor_type = monitor_type
+
+
+ def _set_rssi(self, rssi):
+ h_thresh = dbus.Int16(rssi[self.RSSI_H_THRESH])
+ h_timeout = dbus.UInt16(rssi[self.RSSI_H_TIMEOUT])
+ l_thresh = dbus.Int16(rssi[self.RSSI_L_THRESH])
+ l_timeout = dbus.UInt16(rssi[self.RSSI_L_TIMEOUT])
+ self.rssi = (h_thresh, h_timeout, l_thresh, l_timeout)
+
+
+ def _set_patterns(self, patterns):
+ self.patterns = []
+ for pattern in patterns:
+ start_pos = dbus.Byte(pattern[self.PATTERN_START_POS])
+ ad_type = dbus.Byte(pattern[self.PATTERN_AD_TYPE])
+ ad_data = []
+ for byte in pattern[self.PATTERN_DATA]:
+ ad_data.append(dbus.Byte(byte))
+ adv_pattern = dbus.Struct((start_pos, ad_type, ad_data),
+ signature='yyay')
+ self.patterns.append(adv_pattern)
+
+
+ def remove_monitor(self):
+ self.remove_from_connection()
+
+
+ @dbus.service.method(DBUS_PROP_IFACE,
+ in_signature='s',
+ out_signature='a{sv}')
+ def GetAll(self, interface):
+ print('{}: {} GetAll'.format(self.path, interface))
+ if interface != ADV_MONITOR_IFACE:
+ print('{}: GetAll: Invalid arg {}'.format(self.path, interface))
+ return {}
+
+ return self.get_properties()[ADV_MONITOR_IFACE]
+
+
+ @dbus.service.method(ADV_MONITOR_IFACE,
+ in_signature='',
+ out_signature='')
+ def Activate(self):
+ print('{}: Monitor Activated'.format(self.path))
+
+
+ @dbus.service.method(ADV_MONITOR_IFACE,
+ in_signature='',
+ out_signature='')
+ def Release(self):
+ print('{}: Monitor Released'.format(self.path))
+
+
+ @dbus.service.method(ADV_MONITOR_IFACE,
+ in_signature='o',
+ out_signature='')
+ def DeviceFound(self, device):
+ print('{}: {} Device Found'.format(self.path, device))
+
+
+ @dbus.service.method(ADV_MONITOR_IFACE,
+ in_signature='o',
+ out_signature='')
+ def DeviceLost(self, device):
+ print('{}: {} Device Lost'.format(self.path, device))
+
+
+class AdvMonitorApp(dbus.service.Object):
+
+ def __init__(self, bus, advmon_manager, app_id):
+ self.bus = bus
+ self.advmon_mgr = advmon_manager
+ self.app_path = ADV_MONITOR_APP_BASE_PATH + str(app_id)
+
+ self.monitors = dict()
+
+ super(AdvMonitorApp, self).__init__(self.bus, self.app_path)
+
+
+ def get_app_path(self):
+ return dbus.ObjectPath(self.app_path)
+
+
+ def add_monitor(self, monitor_data):
+ monitor_id = 0
+ while monitor_id in self.monitors:
+ monitor_id += 1
+
+ monitor = AdvMonitor(self.bus, self.app_path, monitor_id, monitor_data)
+
+ # Emit the InterfacesAdded signal once the Monitor object is created.
+ self.InterfacesAdded(monitor.get_path(), monitor.get_properties())
+
+ self.monitors[monitor_id] = monitor
+
+ return monitor_id
+
+
+ def remove_monitor(self, monitor_id):
+ monitor = self.monitors.pop(monitor_id, None)
+ if not monitor:
+ return False
+
+ # Emit the InterfacesRemoved signal before removing the Monitor object.
+ self.InterfacesRemoved(monitor.get_path(),
+ monitor.get_properties().keys())
+
+ monitor.remove_monitor()
+
+ return True
+
+
+ def register_app(self):
+ self.register_successful = None
+
+ def register_cb():
+ print('{}: RegisterMonitor successful'.format(self.app_path))
+ self.register_successful = True
+
+ def register_error_cb(error):
+ print('{}: RegisterMonitor failed: {}'.format(self.app_path,
+ str(error)))
+ self.register_successful = False
+
+ self.advmon_mgr.RegisterMonitor(self.get_app_path(),
+ reply_handler=register_cb,
+ error_handler=register_error_cb)
+
+ # Wait for the reply.
+ while self.register_successful is None:
+ pass
+
+ return self.register_successful
+
+
+ def unregister_app(self):
+ self.unregister_successful = None
+
+ def unregister_cb():
+ print('{}: UnregisterMonitor successful'.format(self.app_path))
+ self.unregister_successful = True
+
+ def unregister_error_cb(error):
+ print('{}: UnregisterMonitor failed: {}'.format(self.app_path,
+ str(error)))
+ self.unregister_successful = False
+
+ self.advmon_mgr.UnregisterMonitor(self.get_app_path(),
+ reply_handler=unregister_cb,
+ error_handler=unregister_error_cb)
+
+ # Wait for the reply.
+ while self.unregister_successful is None:
+ pass
+
+ return self.unregister_successful
+
+
+ @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
+ def GetManagedObjects(self):
+ print('{}: GetManagedObjects'.format(self.app_path))
+ objects = dict()
+ for monitor_id in self.monitors:
+ monitor = self.monitors[monitor_id]
+ objects[monitor.get_path()] = monitor.get_properties()
+
+ return objects
+
+
+ @dbus.service.signal(DBUS_OM_IFACE, signature='oa{sa{sv}}')
+ def InterfacesAdded(self, object_path, interfaces_and_properties):
+ # Invoking this method emits the InterfacesAdded signal,
+ # nothing needs to be done here.
+ return
+
+
+ @dbus.service.signal(DBUS_OM_IFACE, signature='oas')
+ def InterfacesRemoved(self, object_path, interfaces):
+ # Invoking this method emits the InterfacesRemoved signal,
+ # nothing needs to be done here.
+ return
+
+
+def read_adapter_supported_monitor_types(adapter_props):
+ types = json.dumps(adapter_props.Get(ADV_MONITOR_MANAGER_IFACE,
+ 'SupportedMonitorTypes',
+ dbus_interface=DBUS_PROP_IFACE))
+ return json.loads(types)
+
+
+def read_adapter_supported_monitor_features(adapter_props):
+ features = json.dumps(adapter_props.Get(ADV_MONITOR_MANAGER_IFACE,
+ 'SupportedFeatures',
+ dbus_interface=DBUS_PROP_IFACE))
+ return json.loads(features)
+
+
+def print_supported_types_and_features(adapter_props):
+ supported_types = read_adapter_supported_monitor_types(adapter_props)
+ for supported_type in supported_types:
+ print(supported_type)
+
+ supported_features = read_adapter_supported_monitor_features(adapter_props)
+ for supported_feature in supported_features:
+ print(supported_feature)
+
+
+def find_advmon_mgr(bus, adapter):
+ return dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
+ ADV_MONITOR_MANAGER_IFACE)
+
+
+def find_adapter(bus):
+ remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
+ DBUS_OM_IFACE)
+ objects = remote_om.GetManagedObjects()
+
+ adapter = None
+ adapter_props = None
+
+ for o, props in objects.items():
+ if ADV_MONITOR_MANAGER_IFACE in props:
+ adapter = o
+ break
+
+ if adapter:
+ # Turn on the bluetooth adapter.
+ adapter_props = dbus.Interface(
+ bus.get_object(BLUEZ_SERVICE_NAME, adapter),
+ DBUS_PROP_IFACE)
+ adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1))
+
+ return adapter, adapter_props
+
+
+def test(bus, mainloop, advmon_mgr, app_id):
+ # Create an App instance.
+ app = AdvMonitorApp(bus, advmon_mgr, app_id)
+
+ # Create two monitor objects before registering the app. No Activate() or
+ # Release() should get called yet as the app is not registered.
+ data0 = [
+ 'invalid_patterns',
+ [-50, 1, -70, 1],
+ [[0, 0x03, [0x12, 0x18]]] # Service Class UUID is 0x1812 (HOG)
+ ]
+ data1 = [
+ 'or_patterns',
+ [127, 0, 127, 0],
+ [[5, 0x09, [ord('_')]]] # 5th character of the Local Name is '_'
+ ]
+ monitor0 = app.add_monitor(data0)
+ monitor1 = app.add_monitor(data1)
+
+ # Register the app root path to expose advertisement monitors.
+ # Release() should get called on monitor0 - incorrect monitor type.
+ # Activate() should get called on monitor1.
+ ret = app.register_app()
+ if not ret:
+ print('RegisterMonitor failed.')
+ mainloop.quit()
+ exit(-1)
+
+ # Create two more monitor objects.
+ # Release() should get called on monitor2 - incorrect RSSI Filter values.
+ # Activate() should get called on monitor3.
+ data2 = [
+ 'or_patterns',
+ [-50, 1, -30, 1],
+ [[0, 0x19, [0xC2, 0x03]]] # Appearance is 0xC203 (Mouse)
+ ]
+ data3 = [
+ 'or_patterns',
+ [-50, 1, -70, 1],
+ [[0, 0x03, [0x12, 0x18]], [0, 0x19, [0xC2, 0x03]]]
+ ]
+ monitor2 = app.add_monitor(data2)
+ monitor3 = app.add_monitor(data3)
+
+ # Run until user hits the 'Enter' key. If any peer device is advertising
+ # during this time, DeviceFound() should get triggered for monitors
+ # matching the advertisements.
+ raw_input('Press "Enter" key to quit...\n')
+
+ # Remove a monitor. DeviceFound() for this monitor should not get
+ # triggered any more.
+ app.remove_monitor(monitor1)
+
+ # Unregister the app. Release() should get invoked on active monitors,
+ # monitor3 in this case.
+ app.unregister_app()
+
+ mainloop.quit()
+
+
+def main(app_id):
+ # Initialize threads in gobject/dbus-glib before creating local threads.
+ GObject.threads_init()
+ dbus.mainloop.glib.threads_init()
+
+ # Arrange for the GLib main loop to be the default.
+ dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+ bus = dbus.SystemBus()
+ mainloop = GObject.MainLoop()
+
+ # Find bluetooth adapter and power it on.
+ adapter, adapter_props = find_adapter(bus)
+ if not adapter or not adapter_props:
+ print('Bluetooth adapter not found.')
+ exit(-1)
+
+ # Read supported types and find AdvertisementMonitorManager1 interface.
+ print_supported_types_and_features(adapter_props)
+ advmon_mgr = find_advmon_mgr(bus, adapter)
+ if not advmon_mgr :
+ print('AdvertisementMonitorManager1 interface not found.')
+ exit(-1)
+
+ Thread(target=test, args=(bus, mainloop, advmon_mgr, app_id)).start()
+
+ mainloop.run() # blocks until mainloop.quit() is called
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--app_id', default=0, type=int, help='use this App-ID '
+ 'for creating dbus objects (default: 0)')
+ args = parser.parse_args()
+
+ main(args.app_id)
--
2.29.2.576.ga3fc446d84-goog


2020-12-08 23:36:23

by bluez.test.bot

[permalink] [raw]
Subject: RE: [bluez,v1] test: add test app for Advertisement Monitor API

This is automated email and please do not reply to this email!

Dear submitter,

Thank you for submitting the patches to the linux bluetooth mailing list.
This is a CI test results with your patch series:
PW Link:https://patchwork.kernel.org/project/bluetooth/list/?series=398547

---Test result---

##############################
Test: CheckPatch - PASS

##############################
Test: CheckGitLint - PASS

##############################
Test: CheckBuild - PASS

##############################
Test: MakeCheck - PASS



---
Regards,
Linux Bluetooth