2019-03-10 07:53:16

by Stotland, Inga

[permalink] [raw]
Subject: [PATCH BlueZ v3] test: Add unified test for mesh node example app

This adds one script, test-mesh, to replace three test-join,
example-onoff-server and example-onoff-client.
This is menu driven test that allows provisioning (join) and/or
connecting existing (attach) nodes.
---
Makefile.tools | 2 +-
test/agent.py | 14 +-
test/example-onoff-client | 288 -------------
test/example-onoff-server | 365 -----------------
test/test-mesh | 842 ++++++++++++++++++++++++++++++++++++++
5 files changed, 854 insertions(+), 657 deletions(-)
delete mode 100644 test/example-onoff-client
delete mode 100644 test/example-onoff-server
create mode 100755 test/test-mesh

diff --git a/Makefile.tools b/Makefile.tools
index 0f94bbbe7..379e127b6 100644
--- a/Makefile.tools
+++ b/Makefile.tools
@@ -463,7 +463,7 @@ test_scripts += test/sap_client.py test/bluezutils.py \
test/test-hfp test/opp-client test/ftp-client \
test/pbap-client test/map-client test/example-advertisement \
test/example-gatt-server test/example-gatt-client \
- test/test-gatt-profile
+ test/test-gatt-profile test/test-mesh test/agent.py

if BTPCLIENT
noinst_PROGRAMS += tools/btpclient
diff --git a/test/agent.py b/test/agent.py
index 22c92f952..778dbe092 100755
--- a/test/agent.py
+++ b/test/agent.py
@@ -1,9 +1,16 @@
-#!/usr/bin/python
+#!/usr/bin/python3

import sys
import dbus
import dbus.service
-import dbus.mainloop.glib
+
+try:
+ from termcolor import colored, cprint
+ set_green = lambda x: colored(x, 'green', attrs=['bold'])
+ set_cyan = lambda x: colored(x, 'cyan', attrs=['bold'])
+except ImportError:
+ set_green = lambda x: x
+ set_cyan = lambda x: x

AGENT_IFACE = 'org.bluez.mesh.ProvisionAgent1'
AGENT_PATH = "/mesh/test/agent"
@@ -37,4 +44,5 @@ class Agent(dbus.service.Object):

@dbus.service.method(AGENT_IFACE, in_signature="su", out_signature="")
def DisplayNumeric(self, type, value):
- print("DisplayNumeric type=", type, " number=", value)
+ print(set_cyan('DisplayNumeric ('), type,
+ set_cyan(') number ='), set_green(value))
diff --git a/test/example-onoff-client b/test/example-onoff-client
deleted file mode 100644
index e4a87eb12..000000000
--- a/test/example-onoff-client
+++ /dev/null
@@ -1,288 +0,0 @@
-#!/usr/bin/env python3
-
-import sys
-import struct
-import numpy
-import dbus
-import dbus.service
-import dbus.exceptions
-
-try:
- from gi.repository import GObject
-except ImportError:
- import gobject as GObject
-from dbus.mainloop.glib import DBusGMainLoop
-
-MESH_SERVICE_NAME = 'org.bluez.mesh'
-DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
-DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
-
-MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
-MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
-MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
-
-VENDOR_ID_NONE = 0xffff
-
-app = None
-bus = None
-mainloop = None
-node = None
-token = numpy.uint64(0x76bd4f2372477600)
-
-def unwrap(item):
- if isinstance(item, dbus.Boolean):
- return bool(item)
- if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32,
- dbus.UInt64, dbus.Int64)):
- return int(item)
- if isinstance(item, dbus.Byte):
- return bytes([int(item)])
- if isinstance(item, dbus.String):
- return item
- if isinstance(item, (dbus.Array, list, tuple)):
- return [unwrap(x) for x in item]
- if isinstance(item, (dbus.Dictionary, dict)):
- return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
-
- print('Dictionary item not handled')
- print(type(item))
- return item
-
-def attach_app_cb(node_path, dict_array):
- print('Mesh application registered ', node_path)
- print(type(node_path))
- print(type(dict_array))
- print(dict_array)
-
- els = unwrap(dict_array)
- print("Get Elements")
- for el in els:
- print(el)
- idx = struct.unpack('b', el[0])[0]
- print('Configuration for Element ', end='')
- print(idx)
- models = el[1]
-
- element = app.get_element(idx)
- element.set_model_config(models)
-
- obj = bus.get_object(MESH_SERVICE_NAME, node_path)
- global node
- node = dbus.Interface(obj, MESH_NODE_IFACE)
-
-def error_cb(error):
- print('D-Bus call failed: ' + str(error))
-
-def generic_reply_cb():
- print('D-Bus call done')
-
-def interfaces_removed_cb(object_path, interfaces):
- if not mesh_net:
- return
-
- if object_path == mesh_net[2]:
- print('Service was removed')
- mainloop.quit()
-
-class Application(dbus.service.Object):
-
- def __init__(self, bus):
- self.path = '/example'
- self.elements = []
- dbus.service.Object.__init__(self, bus, self.path)
-
- def get_path(self):
- return dbus.ObjectPath(self.path)
-
- def add_element(self, element):
- self.elements.append(element)
-
- def get_element(self, idx):
- for ele in self.elements:
- if ele.get_index() == idx:
- return ele
-
- @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
- def GetManagedObjects(self):
- response = {}
- print('GetManagedObjects')
- for element in self.elements:
- response[element.get_path()] = element.get_properties()
- return response
-
-class Element(dbus.service.Object):
- PATH_BASE = '/example/ele'
-
- def __init__(self, bus, index):
- self.path = self.PATH_BASE + format(index, '02x')
- print(self.path)
- self.models = []
- self.bus = bus
- self.index = index
- dbus.service.Object.__init__(self, bus, self.path)
-
- def _get_sig_models(self):
- ids = []
- for model in self.models:
- id = model.get_id()
- vendor = model.get_vendor()
- if vendor == VENDOR_ID_NONE:
- ids.append(id)
- return ids
-
- def get_properties(self):
- return {
- MESH_ELEMENT_IFACE: {
- 'Index': dbus.Byte(self.index),
- 'Models': dbus.Array(
- self._get_sig_models(), signature='q')
- }
- }
-
- def add_model(self, model):
- model.set_path(self.path)
- self.models.append(model)
-
- def get_index(self):
- return self.index
-
- def set_model_config(self, config):
- print('Set element models config')
-
- @dbus.service.method(MESH_ELEMENT_IFACE,
- in_signature="qqbay", out_signature="")
- def MessageReceived(self, source, key, is_sub, data):
- print('Message Received on Element ', end='')
- print(self.index)
- for model in self.models:
- model.process_message(source, key, data)
-
- @dbus.service.method(MESH_ELEMENT_IFACE,
- in_signature="qa{sv}", out_signature="")
-
- def UpdateModelConfiguration(self, model_id, config):
- print('UpdateModelConfig ', end='')
- print(hex(model_id))
- for model in self.models:
- if model_id == model.get_id():
- model.set_config(config)
- return
-
- @dbus.service.method(MESH_ELEMENT_IFACE,
- in_signature="", out_signature="")
- def get_path(self):
- return dbus.ObjectPath(self.path)
-
-class Model():
- def __init__(self, model_id):
- self.cmd_ops = []
- self.model_id = model_id
- self.vendor = VENDOR_ID_NONE
- self.path = None
-
- def set_path(self, path):
- self.path = path
-
- def get_id(self):
- return self.model_id
-
- def get_vendor(self):
- return self.vendor
-
- def process_message(self, source, key, data):
- print('Model process message')
-
- def set_publication(self, period):
- self.period = period
-
- def set_bindings(self, bindings):
- self.bindings = bindings
-
- def set_config(self, config):
- if 'Bindings' in config:
- self.bindings = config.get('Bindings')
- print('Bindings: ', end='')
- print(self.bindings)
- if 'PublicationPeriod' in config:
- self.set_publication(config.get('PublicationPeriod'))
- print('Model publication period ', end='')
- print(self.pub_period, end='')
- print(' ms')
-
-class OnOffClient(Model):
- def __init__(self, model_id):
- Model.__init__(self, model_id)
- self.cmd_ops = { 0x8201, # get
- 0x8202, # set
- 0x8203 } # set unacknowledged
- print('OnOff Client')
-
- def _reply_cb(state):
- print('State ', end='');
- print(state)
-
- def _send_message(self, dest, key, data, reply_cb):
- print('OnOffClient send data')
- node.Send(self.path, dest, key, data, reply_handler=reply_cb,
- error_handler=error_cb)
-
- def get_state(self, dest, key):
- opcode = 0x8201
- data = struct.pack('<H', opcode)
- self._send_message(dest, key, data, self._reply_cb)
-
- def set_state(self, dest, key, state):
- opcode = 0x8202
- data = struct.pack('<HB', opcode, state)
- self._send_message(dest, key, data, self._reply_cb)
-
- def process_message(self, source, key, data):
- print('OnOffClient process message len ', end = '')
- datalen = len(data)
- print(datalen)
-
- if datalen!=3:
- return
-
- opcode, state=struct.unpack('<HB',bytes(data))
- if opcode != 0x8202 :
- print('Bad opcode ', end='')
- print(hex(opcode))
- return
-
- print('Got state ', end = '')
- print(hex(state))
-
-def attach_app_error_cb(error):
- print('Failed to register application: ' + str(error))
- mainloop.quit()
-
-def main():
-
- DBusGMainLoop(set_as_default=True)
-
- global bus
- bus = dbus.SystemBus()
- global mainloop
- global app
-
- mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
- "/org/bluez/mesh"),
- MESH_NETWORK_IFACE)
- mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
-
- app = Application(bus)
- first_ele = Element(bus, 0x00)
- first_ele.add_model(OnOffClient(0x1001))
- app.add_element(first_ele)
-
- mainloop = GObject.MainLoop()
-
- print('Attach')
- mesh_net.Attach(app.get_path(), token,
- reply_handler=attach_app_cb,
- error_handler=attach_app_error_cb)
- mainloop.run()
-
-if __name__ == '__main__':
- main()
diff --git a/test/example-onoff-server b/test/example-onoff-server
deleted file mode 100644
index 131b6415c..000000000
--- a/test/example-onoff-server
+++ /dev/null
@@ -1,365 +0,0 @@
-#!/usr/bin/env python3
-
-import sys
-import struct
-import numpy
-import dbus
-import dbus.service
-import dbus.exceptions
-
-from threading import Timer
-import time
-
-
-try:
- from gi.repository import GObject
-except ImportError:
- import gobject as GObject
-from dbus.mainloop.glib import DBusGMainLoop
-
-MESH_SERVICE_NAME = 'org.bluez.mesh'
-DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
-DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
-
-MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
-MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
-MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1'
-MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
-
-APP_COMPANY_ID = 0x05f1
-APP_PRODUCT_ID = 0x0001
-APP_VERSION_ID = 0x0001
-
-VENDOR_ID_NONE = 0xffff
-
-app = None
-bus = None
-mainloop = None
-node = None
-
-token = numpy.uint64(0x76bd4f2372476578)
-
-def generic_error_cb(error):
- print('D-Bus call failed: ' + str(error))
-
-def generic_reply_cb():
- print('D-Bus call done')
-
-def unwrap(item):
- if isinstance(item, dbus.Boolean):
- return bool(item)
- if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32,
- dbus.UInt64, dbus.Int64)):
- return int(item)
- if isinstance(item, dbus.Byte):
- return bytes([int(item)])
- if isinstance(item, dbus.String):
- return item
- if isinstance(item, (dbus.Array, list, tuple)):
- return [unwrap(x) for x in item]
- if isinstance(item, (dbus.Dictionary, dict)):
- return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
-
- print('Dictionary item not handled')
- print(type(item))
- return item
-
-def attach_app_cb(node_path, dict_array):
- print('Mesh application registered ', node_path)
-
- obj = bus.get_object(MESH_SERVICE_NAME, node_path)
-
- global node
- node = dbus.Interface(obj, MESH_NODE_IFACE)
-
- els = unwrap(dict_array)
- print("Get Elements")
-
- for el in els:
- idx = struct.unpack('b', el[0])[0]
- print('Configuration for Element ', end='')
- print(idx)
-
- models = el[1]
- element = app.get_element(idx)
- element.set_model_config(models)
-
-def interfaces_removed_cb(object_path, interfaces):
- if not mesh_net:
- return
-
- if object_path == mesh_net[2]:
- print('Service was removed')
- mainloop.quit()
-
-def send_response(path, dest, key, data):
- print('send response ', end='')
- print(data)
- node.Send(path, dest, key, data, reply_handler=generic_reply_cb,
- error_handler=generic_error_cb)
-
-def send_publication(path, model_id, data):
- print('send publication ', end='')
- print(data)
- node.Publish(path, model_id, data,
- reply_handler=generic_reply_cb,
- error_handler=generic_error_cb)
-
-class PubTimer():
- def __init__(self):
- self.seconds = None
- self.func = None
- self.thread = None
- self.busy = False
-
- def _timeout_cb(self):
- self.func()
- self.busy = True
- self._schedule_timer()
- self.busy =False
-
- def _schedule_timer(self):
- self.thread = Timer(self.seconds, self._timeout_cb)
- self.thread.start()
-
- def start(self, seconds, func):
- self.func = func
- self.seconds = seconds
- if not self.busy:
- self._schedule_timer()
-
- def cancel(self):
- print('Cancel timer')
- if self.thread is not None:
- print('Cancel thread')
- self.thread.cancel()
- self.thread = None
-
-class Application(dbus.service.Object):
-
- def __init__(self, bus):
- self.path = '/example'
- self.elements = []
- dbus.service.Object.__init__(self, bus, self.path)
-
- def get_path(self):
- return dbus.ObjectPath(self.path)
-
- def add_element(self, element):
- self.elements.append(element)
-
- def get_element(self, idx):
- for ele in self.elements:
- if ele.get_index() == idx:
- return ele
-
- def get_properties(self):
- return {
- MESH_APPLICATION_IFACE: {
- 'CompanyID': dbus.UInt16(APP_COMPANY_ID),
- 'ProductID': dbus.UInt16(APP_PRODUCT_ID),
- 'VersionID': dbus.UInt16(APP_VERSION_ID)
- }
- }
-
- @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
- def GetManagedObjects(self):
- response = {}
- print('GetManagedObjects')
- response[self.path] = self.get_properties()
- for element in self.elements:
- response[element.get_path()] = element.get_properties()
- return response
-
-class Element(dbus.service.Object):
- PATH_BASE = '/example/ele'
-
- def __init__(self, bus, index):
- self.path = self.PATH_BASE + format(index, '02x')
- print(self.path)
- self.models = []
- self.bus = bus
- self.index = index
- dbus.service.Object.__init__(self, bus, self.path)
-
- def _get_sig_models(self):
- ids = []
- for model in self.models:
- id = model.get_id()
- vendor = model.get_vendor()
- if vendor == VENDOR_ID_NONE:
- ids.append(id)
- return ids
-
- def get_properties(self):
- return {
- MESH_ELEMENT_IFACE: {
- 'Index': dbus.Byte(self.index),
- 'Models': dbus.Array(
- self._get_sig_models(), signature='q')
- }
- }
-
- def add_model(self, model):
- model.set_path(self.path)
- self.models.append(model)
-
- def get_index(self):
- return self.index
-
- def set_model_config(self, configs):
- print('Set element models config')
- for config in configs:
- mod_id = config[0]
- self.UpdateModelConfiguration(mod_id, config[1])
-
- @dbus.service.method(MESH_ELEMENT_IFACE,
- in_signature="qqbay", out_signature="")
- def MessageReceived(self, source, key, is_sub, data):
- print('Message Received on Element ', end='')
- print(self.index)
- for model in self.models:
- model.process_message(source, key, data)
-
- @dbus.service.method(MESH_ELEMENT_IFACE,
- in_signature="qa{sv}", out_signature="")
-
- def UpdateModelConfiguration(self, model_id, config):
- print('UpdateModelConfig ', end='')
- print(hex(model_id))
- for model in self.models:
- if model_id == model.get_id():
- model.set_config(config)
- return
-
- @dbus.service.method(MESH_ELEMENT_IFACE,
- in_signature="", out_signature="")
-
- def get_path(self):
- return dbus.ObjectPath(self.path)
-
-class Model():
- def __init__(self, model_id):
- self.cmd_ops = []
- self.model_id = model_id
- self.vendor = VENDOR_ID_NONE
- self.bindings = []
- self.pub_period = 0
- self.pub_id = 0
- self.path = None
-
- def set_path(self, path):
- self.path = path
-
- def get_id(self):
- return self.model_id
-
- def get_vendor(self):
- return self.vendor
-
- def process_message(self, source, key, data):
- print('Model process message')
-
- def set_publication(self, period):
- self.pub_period = period
-
- def set_config(self, config):
- if 'Bindings' in config:
- self.bindings = config.get('Bindings')
- print('Bindings: ', end='')
- print(self.bindings)
- if 'PublicationPeriod' in config:
- self.set_publication(config.get('PublicationPeriod'))
- print('Model publication period ', end='')
- print(self.pub_period, end='')
- print(' ms')
-
-class OnOffServer(Model):
- def __init__(self, model_id):
- Model.__init__(self, model_id)
- self.cmd_ops = { 0x8201, # get
- 0x8202, # set
- 0x8203 } # set unacknowledged
-
- print("OnOff Server ", end="")
- self.state = 0
- print('State ', end='')
- self.timer = PubTimer()
-
- def process_message(self, source, key, data):
- datalen = len(data)
- print('OnOff Server process message len ', datalen)
-
- if datalen!=2 and datalen!=3:
- return
-
- if datalen==2:
- op_tuple=struct.unpack('<H',bytes(data))
- opcode = op_tuple[0]
- if opcode != 0x8201:
- print(hex(opcode))
- return
- print('Get state')
- elif datalen==3:
- opcode,self.state=struct.unpack('<HB',bytes(data))
- if opcode != 0x8202 and opcode != 0x8203:
- print(hex(opcode))
- return
- print('Set state: ', end='')
- print(self.state)
-
- rsp_data = struct.pack('<HB', 0x8204, self.state)
- send_response(self.path, source, key, rsp_data)
-
- def publish(self):
- print('Publish')
- data = struct.pack('B', self.state)
- send_publication(self.path, self.model_id, data)
-
- def set_publication(self, period):
- if period == 0:
- self.pub_period = 0
- self.timer.cancel()
- return
-
- # We do not handle ms in this example
- if period < 1000:
- return
-
- self.pub_period = period
- self.timer.start(period/1000, self.publish)
-
-def attach_app_error_cb(error):
- print('Failed to register application: ' + str(error))
- mainloop.quit()
-
-def main():
-
- DBusGMainLoop(set_as_default=True)
-
- global bus
- bus = dbus.SystemBus()
- global mainloop
- global app
-
- mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
- "/org/bluez/mesh"),
- MESH_NETWORK_IFACE)
- mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
-
- app = Application(bus)
- first_ele = Element(bus, 0x00)
- first_ele.add_model(OnOffServer(0x1000))
- app.add_element(first_ele)
-
- mainloop = GObject.MainLoop()
-
- print('Attach')
- mesh_net.Attach(app.get_path(), token,
- reply_handler=attach_app_cb,
- error_handler=attach_app_error_cb)
-
- mainloop.run()
-
-if __name__ == '__main__':
- main()
diff --git a/test/test-mesh b/test/test-mesh
new file mode 100755
index 000000000..fd02207bc
--- /dev/null
+++ b/test/test-mesh
@@ -0,0 +1,842 @@
+#!/usr/bin/env python3
+
+###################################################################
+#
+# This is a unified test sample for BT Mesh
+#
+# To run the test:
+# test-mesh [token]
+#
+# 'token' is an optional argument. It must be a 16-digit
+# hexadecimal number. The token must be associated with
+# an existing node. The token is generated and assigned
+# to a node as a result of successful provisioning (see
+# explanation of "join" option).
+# When the token is set, the menu operations "attach"
+# and "remove" may be performed on a node specified
+# by this token.
+#
+# The test imitates a device with 2 elements:
+# element 0: OnOff Server model
+# element 1: OnOff Client model
+#
+# The main menu:
+# 1 - set node ID (token)
+# 2 - join mesh network
+# 3 - attach mesh node
+# 4 - remove node
+# 5 - client menu
+# 6 - exit
+#
+# The main menu options explained:
+# 1 - set token
+# Set the unique node token.
+# The token can be set from command line arguments as
+# well.
+#
+# 2 - join
+# Request provisioning of a device to become a node
+# on a mesh network. The test generates device UUID
+# which is displayed and will need to be provided to
+# an outside entity that acts as a Provisioner. Also,
+# during the provisioning process, an agent that is
+# part of the test, will request (or will be requested)
+# to perform a specified operation, e.g., a number will
+# be displayed and this number will need to be entered
+# on the Provisioner's side.
+# In case of successful provisioning, the application
+# automatically attaches as a node to the daemon. A node
+# 'token' is returned to the application and is used
+# for the runtime of the test.
+#
+# 3 - attach
+# Attach the application to bluetoothd-daemon as a node.
+# For the call to be successful, the valid node token must
+# be already set, either from command arguments or by
+# executing "set token" operation or automatically after
+# successfully executing "join" operation in the same test
+# run.
+#
+# 4 - remove
+# Permanently removes any node configuration from daemon
+# and persistent storage. After this operation, the node
+# is permanently forgotten by the daemon and the associated
+# node token is no longer valid.
+#
+# 5 - client menu
+# Enter On/Off client submenu.
+#
+# 6 - exit
+#
+###################################################################
+import sys
+import struct
+import fcntl
+import os
+import numpy
+import random
+import dbus
+import dbus.service
+import dbus.exceptions
+
+from threading import Timer
+import time
+
+try:
+ from gi.repository import GLib
+except ImportError:
+ import glib as GLib
+from dbus.mainloop.glib import DBusGMainLoop
+
+try:
+ from termcolor import colored, cprint
+ set_error = lambda x: colored('!' + x, 'red', attrs=['bold'])
+ set_cyan = lambda x: colored(x, 'cyan', attrs=['bold'])
+ set_green = lambda x: colored(x, 'green', attrs=['bold'])
+ set_yellow = lambda x: colored(x, 'yellow', attrs=['bold'])
+except ImportError:
+ print('!!! Install termcolor module for better experience !!!')
+ set_error = lambda x: x
+ set_cyan = lambda x: x
+ set_green = lambda x: x
+ set_yellow = lambda x: x
+
+# Provisioning agent
+try:
+ import agent
+except ImportError:
+ agent = None
+
+MESH_SERVICE_NAME = 'org.bluez.mesh'
+DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
+DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
+
+MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
+MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
+MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1'
+MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
+
+APP_COMPANY_ID = 0x05f1
+APP_PRODUCT_ID = 0x0001
+APP_VERSION_ID = 0x0001
+
+VENDOR_ID_NONE = 0xffff
+
+app = None
+bus = None
+mainloop = None
+node = None
+mesh_net = None
+
+menu_level = 0
+dst_addr = 0x0000
+app_idx = 0
+
+# Node token housekeeping
+token = None
+have_token = False
+
+user_input = 0
+
+
+def app_exit():
+ global mainloop
+ global app
+
+ for el in app.elements:
+ for model in el.models:
+ if model.timer != None:
+ model.timer.cancel()
+ mainloop.quit()
+
+def array_to_string(b_array):
+ str = ""
+ for b in b_array:
+ str += "%02x" % b
+ return str
+
+def generic_error_cb(error):
+ print(set_error('D-Bus call failed: ') + str(error))
+
+def generic_reply_cb():
+ return
+
+def attach_app_error_cb(error):
+ print(set_error('Failed to register application: ') + str(error))
+
+def attach(token):
+ print('Attach mesh node to bluetooth-meshd daemon')
+
+ mesh_net.Attach(app.get_path(), token,
+ reply_handler=attach_app_cb,
+ error_handler=attach_app_error_cb)
+
+def join_cb():
+ print('Join procedure started')
+
+def join_error_cb(reason):
+ print('Join procedure failed: ', reason)
+
+def unwrap(item):
+ if isinstance(item, dbus.Boolean):
+ return bool(item)
+ if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32,
+ dbus.UInt64, dbus.Int64)):
+ return int(item)
+ if isinstance(item, dbus.Byte):
+ return bytes([int(item)])
+ if isinstance(item, dbus.String):
+ return item
+ if isinstance(item, (dbus.Array, list, tuple)):
+ return [unwrap(x) for x in item]
+ if isinstance(item, (dbus.Dictionary, dict)):
+ return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
+
+ print(set_error('Dictionary item not handled: ') + type(item))
+
+ return item
+
+def attach_app_cb(node_path, dict_array):
+ print('Mesh application registered ', node_path)
+
+ obj = bus.get_object(MESH_SERVICE_NAME, node_path)
+
+ global node
+ node = dbus.Interface(obj, MESH_NODE_IFACE)
+
+ els = unwrap(dict_array)
+
+ for el in els:
+ idx = struct.unpack('b', el[0])[0]
+
+ models = el[1]
+ element = app.get_element(idx)
+ element.set_model_config(models)
+
+def interfaces_removed_cb(object_path, interfaces):
+ print('Removed')
+ if not mesh_net:
+ return
+
+ print(object_path)
+ if object_path == mesh_net[2]:
+ print('Service was removed')
+ app_exit()
+
+def send_response(path, dest, key, data):
+ node.Send(path, dest, key, data, reply_handler=generic_reply_cb,
+ error_handler=generic_error_cb)
+
+def send_publication(path, model_id, data):
+ print('Send publication ', end='')
+ print(data)
+ node.Publish(path, model_id, data,
+ reply_handler=generic_reply_cb,
+ error_handler=generic_error_cb)
+
+def print_state(state):
+ print('State is ', end='')
+ if state == 0:
+ print('OFF')
+ elif state == 1:
+ print('ON')
+ else:
+ print('UNKNOWN')
+class PubTimer():
+ def __init__(self):
+ self.seconds = None
+ self.func = None
+ self.thread = None
+ self.busy = False
+
+ def _timeout_cb(self):
+ self.func()
+ self.busy = True
+ self._schedule_timer()
+ self.busy =False
+
+ def _schedule_timer(self):
+ self.thread = Timer(self.seconds, self._timeout_cb)
+ self.thread.start()
+
+ def start(self, seconds, func):
+ self.func = func
+ self.seconds = seconds
+ if not self.busy:
+ self._schedule_timer()
+
+ def cancel(self):
+ if self.thread is not None:
+ self.thread.cancel()
+ self.thread = None
+
+class Application(dbus.service.Object):
+
+ def __init__(self, bus):
+ self.path = '/example'
+ self.agent = None
+ self.elements = []
+ dbus.service.Object.__init__(self, bus, self.path)
+
+ def set_agent(self, agent):
+ self.agent = agent
+
+ def get_path(self):
+ return dbus.ObjectPath(self.path)
+
+ def add_element(self, element):
+ self.elements.append(element)
+
+ def get_element(self, idx):
+ for ele in self.elements:
+ if ele.get_index() == idx:
+ return ele
+
+ def get_properties(self):
+ return {
+ MESH_APPLICATION_IFACE: {
+ 'CompanyID': dbus.UInt16(APP_COMPANY_ID),
+ 'ProductID': dbus.UInt16(APP_PRODUCT_ID),
+ 'VersionID': dbus.UInt16(APP_VERSION_ID)
+ }
+ }
+
+ @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
+ def GetManagedObjects(self):
+ response = {}
+ response[self.path] = self.get_properties()
+ response[self.agent.get_path()] = self.agent.get_properties()
+ for element in self.elements:
+ response[element.get_path()] = element.get_properties()
+ return response
+
+ @dbus.service.method(MESH_APPLICATION_IFACE,
+ in_signature="t", out_signature="")
+ def JoinComplete(self, value):
+ global token
+ global have_token
+
+ print('JoinComplete with token ' + set_green(hex(value)))
+
+ token = value
+ have_token = True
+
+ attach(token)
+
+ @dbus.service.method(MESH_APPLICATION_IFACE,
+ in_signature="s", out_signature="")
+ def JoinFailed(self, value):
+ print(set_error('JoinFailed '), value)
+
+
+class Element(dbus.service.Object):
+ PATH_BASE = '/example/ele'
+
+ def __init__(self, bus, index):
+ self.path = self.PATH_BASE + format(index, '02x')
+ self.models = []
+ self.bus = bus
+ self.index = index
+ dbus.service.Object.__init__(self, bus, self.path)
+
+ def _get_sig_models(self):
+ ids = []
+ for model in self.models:
+ id = model.get_id()
+ vendor = model.get_vendor()
+ if vendor == VENDOR_ID_NONE:
+ ids.append(id)
+ return ids
+
+ def get_properties(self):
+ return {
+ MESH_ELEMENT_IFACE: {
+ 'Index': dbus.Byte(self.index),
+ 'Models': dbus.Array(
+ self._get_sig_models(), signature='q')
+ }
+ }
+
+ def add_model(self, model):
+ model.set_path(self.path)
+ self.models.append(model)
+
+ def get_index(self):
+ return self.index
+
+ def set_model_config(self, configs):
+ for config in configs:
+ mod_id = config[0]
+ self.UpdateModelConfiguration(mod_id, config[1])
+
+ @dbus.service.method(MESH_ELEMENT_IFACE,
+ in_signature="qqbay", out_signature="")
+ def MessageReceived(self, source, key, is_sub, data):
+ print('Message Received on Element ', end='')
+ print(self.index)
+ for model in self.models:
+ model.process_message(source, key, data)
+
+ @dbus.service.method(MESH_ELEMENT_IFACE,
+ in_signature="qa{sv}", out_signature="")
+
+ def UpdateModelConfiguration(self, model_id, config):
+ print('UpdateModelConfig ', end='')
+ print(hex(model_id))
+ for model in self.models:
+ if model_id == model.get_id():
+ model.set_config(config)
+ return
+
+ @dbus.service.method(MESH_ELEMENT_IFACE,
+ in_signature="", out_signature="")
+
+ def get_path(self):
+ return dbus.ObjectPath(self.path)
+
+class Model():
+ def __init__(self, model_id):
+ self.cmd_ops = []
+ self.model_id = model_id
+ self.vendor = VENDOR_ID_NONE
+ self.bindings = []
+ self.pub_period = 0
+ self.pub_id = 0
+ self.path = None
+ self.timer = None
+
+ def set_path(self, path):
+ self.path = path
+
+ def get_id(self):
+ return self.model_id
+
+ def get_vendor(self):
+ return self.vendor
+
+ def process_message(self, source, key, data):
+ return
+
+ def set_publication(self, period):
+ self.pub_period = period
+
+ def set_config(self, config):
+ if 'Bindings' in config:
+ self.bindings = config.get('Bindings')
+ print('Bindings: ', end='')
+ print(self.bindings)
+ if 'PublicationPeriod' in config:
+ self.set_publication(config.get('PublicationPeriod'))
+ print('Model publication period ', end='')
+ print(self.pub_period, end='')
+ print(' ms')
+
+ def print_bindings(self):
+ print(set_cyan('Model'), set_cyan('%04x' % self.model_id),
+ set_cyan('is bound to application key(s): '), end = '')
+
+ if len(self.bindings) == 0:
+ print(set_cyan('** None **'))
+ for b in self.bindings:
+ print(set_cyan('%04x' % b), set_cyan(', '))
+
+########################
+# On Off Server Model
+########################
+class OnOffServer(Model):
+ def __init__(self, model_id):
+ Model.__init__(self, model_id)
+ self.cmd_ops = { 0x8201, # get
+ 0x8202, # set
+ 0x8203, # set unacknowledged
+ 0x8204 } # status
+
+ print("OnOff Server ")
+ self.state = 0
+ print_state(self.state)
+ self.timer = PubTimer()
+
+ def process_message(self, source, key, data):
+ datalen = len(data)
+ print('OnOff Server process message len: ', datalen)
+
+ if datalen != 2 and datalen != 3:
+ # The opcode is not recognized by this model
+ return
+
+ if datalen == 2:
+ op_tuple=struct.unpack('<H',bytes(data))
+ opcode = op_tuple[0]
+ if opcode != 0x8201:
+ # The opcode is not recognized by this model
+ return
+ print('Get state')
+ elif datalen == 3:
+ opcode,self.state=struct.unpack('<HB',bytes(data))
+ if opcode != 0x8202 and opcode != 0x8203:
+ # The opcode is not recognized by this model
+ return
+ print_state(self.state)
+
+ rsp_data = struct.pack('<HB', 0x8204, self.state)
+ send_response(self.path, source, key, rsp_data)
+
+ def set_publication(self, period):
+
+ # We do not handle ms in this example
+ if period < 1000:
+ return
+
+ self.pub_period = period
+ if period == 0:
+ self.timer.cancel()
+ return
+
+ self.timer.start(period/1000, self.publish)
+
+
+ def publish(self):
+ print('Publish')
+ data = struct.pack('<HB', 0x8204, self.state)
+ send_publication(self.path, self.model_id, data)
+
+########################
+# On Off Client Model
+########################
+class OnOffClient(Model):
+ def __init__(self, model_id):
+ Model.__init__(self, model_id)
+ self.cmd_ops = { 0x8201, # get
+ 0x8202, # set
+ 0x8203, # set unacknowledged
+ 0x8204 } # status
+ print('OnOff Client')
+
+ def _reply_cb(state):
+ print('State ', end='');
+ print(state)
+
+ def _send_message(self, dest, key, data, reply_cb):
+ print('OnOffClient send data')
+ node.Send(self.path, dest, key, data, reply_handler=reply_cb,
+ error_handler=generic_error_cb)
+
+ def get_state(self, dest, key):
+ opcode = 0x8201
+ data = struct.pack('<H', opcode)
+ self._send_message(dest, key, data, self._reply_cb)
+
+ def set_state(self, dest, key, state):
+ opcode = 0x8202
+ print('State:', state)
+ data = struct.pack('<HB', opcode, state)
+ self._send_message(dest, key, data, self._reply_cb)
+
+ def process_message(self, source, key, data):
+ print('OnOffClient process message len = ', end = '')
+ datalen = len(data)
+ print(datalen)
+
+ if datalen != 3:
+ # The opcode is not recognized by this model
+ return
+
+ opcode, state=struct.unpack('<HB',bytes(data))
+
+ if opcode != 0x8204 :
+ # The opcode is not recognized by this model
+ return
+
+ print(set_yellow('Got state '), end = '')
+
+ state_str = "ON"
+ if state == 0:
+ state_str = "OFF"
+
+ print(set_green(state_str), set_yellow('from'),
+ set_green('%04x' % source))
+
+########################
+# Menu functions
+########################
+class MenuHandler(object):
+ def __init__(self, callback):
+ self.cb = callback
+ flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
+ flags |= os.O_NONBLOCK
+ fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags)
+ sys.stdin.flush()
+ GLib.io_add_watch(sys.stdin, GLib.IO_IN, self.input_callback)
+
+ def input_callback(self, fd, condition):
+ chunk = fd.read()
+ buffer = ''
+ for char in chunk:
+ buffer += char
+ if char == '\n':
+ self.cb(buffer)
+
+ return True
+
+def process_input(input_str):
+ if menu_level == 0:
+ process_main_menu(input_str)
+ elif menu_level == 1:
+ process_client_menu(input_str)
+ else:
+ print(set_error('BUG: bad menu level'))
+
+def switch_menu(level):
+ global menu_level
+
+ if level > 1:
+ return
+
+ if level == 0:
+ main_menu()
+ elif level == 1:
+ client_menu()
+
+ menu_level = level
+
+########################
+# Main menu functions
+########################
+def process_main_menu(input_str):
+ global token
+ global user_input
+ global have_token
+
+ str = input_str.strip()
+
+ if user_input == 1:
+ res = set_token(str)
+ user_input = 0
+
+ if res == False:
+ main_menu()
+
+ return
+
+ # Allow entering empty lines for better output visibility
+ if len(str) == 0:
+ return
+
+ if str.isdigit() == False:
+ main_menu()
+ return
+
+ opt = int(str)
+
+ if opt > 6:
+ print(set_error('Unknown menu option: '), opt)
+ main_menu()
+ elif opt == 1:
+ if have_token:
+ print('Token already set')
+ return
+
+ user_input = 1;
+ print(set_cyan('Enter 16-digit hex node ID:'))
+ elif opt == 2:
+ if agent == None:
+ print(set_error('Provisioning agent not found'))
+ return
+
+ join_mesh()
+ elif opt == 3:
+ if have_token == False:
+ print(set_error('Token is not set'))
+ main_menu()
+ return
+
+ attach(token)
+ elif opt == 4:
+ if have_token == False:
+ print(set_error('Token is not set'))
+ main_menu()
+ return
+
+ print('Remove mesh node')
+ mesh_net.Leave(token, reply_handler=generic_reply_cb,
+ error_handler=generic_error_cb)
+ have_token = False
+ elif opt == 5:
+ switch_menu(1)
+ elif opt == 6:
+ app_exit()
+
+
+def main_menu():
+ print(set_cyan('*** MAIN MENU ***'))
+ print(set_cyan('1 - set node ID (token)'))
+ print(set_cyan('2 - join mesh network'))
+ print(set_cyan('3 - attach mesh node'))
+ print(set_cyan('4 - remove node'))
+ print(set_cyan('5 - client menu'))
+ print(set_cyan('6 - exit'))
+
+def set_token(str):
+ global token
+ global have_token
+
+ if len(str) != 16:
+ print(set_error('Expected 16 digits'))
+ return False
+
+ try:
+ input_number = int(str, 16)
+ except ValueError:
+ print(set_error('Not a valid hexadecimal number'))
+ return False
+
+ token = numpy.uint64(input_number)
+ have_token = True
+
+ return True
+
+def join_mesh():
+ uuid = bytearray.fromhex("0a0102030405060708090A0B0C0D0E0F")
+
+ caps = ["out-numeric"]
+ oob = ["other"]
+
+ random.shuffle(uuid)
+ uuid_str = array_to_string(uuid)
+ print('Joining with UUID ' + set_green(uuid_str))
+
+ mesh_net.Join(app.get_path(), uuid,
+ reply_handler=join_cb,
+ error_handler=join_error_cb)
+
+##############################
+# On/Off Client menu functions
+##############################
+def process_client_menu(input_str):
+ global user_input
+ global dst_addr
+ global app_idx
+
+ res = -1
+ str = input_str.strip()
+
+ if user_input == 1:
+ res = set_value(str)
+ if res != -1:
+ dst_addr = res
+ elif user_input == 2:
+ res = set_value(str)
+ if res != -1:
+ app_idx = res
+
+ if user_input != 0:
+ user_input = 0
+ if res == -1:
+ client_menu()
+ return
+
+ # Allow entering empty lines for better output visibility
+ if len(str) == 0:
+ return
+
+ if str.isdigit() == False:
+ client_menu()
+ return
+
+ opt = int(str)
+
+ if opt > 7:
+ print(set_error('Unknown menu option: '), opt)
+ client_menu()
+ return
+
+ if opt >= 3 and opt <= 5 and dst_addr == 0x0000:
+ print(set_error('Destination address not set!'))
+ return
+
+ if opt == 1:
+ user_input = 1;
+ print(set_cyan('Enter 4-digit hex destination address:'))
+ elif opt == 2:
+ user_input = 2;
+ app.elements[1].models[0].print_bindings()
+ print(set_cyan('Choose application key index:'))
+ elif opt == 3:
+ app.elements[1].models[0].get_state(dst_addr, app_idx)
+ elif opt == 4 or opt == 5:
+ app.elements[1].models[0].set_state(dst_addr, app_idx, opt - 4)
+ elif opt == 6:
+ switch_menu(0)
+ elif opt == 7:
+ app_exit()
+
+def client_menu():
+ print(set_cyan('*** ON/OFF CLIENT MENU ***'))
+ print(set_cyan('1 - set destination address'))
+ print(set_cyan('2 - set application key index'))
+ print(set_cyan('3 - get state'))
+ print(set_cyan('4 - set state OFF'))
+ print(set_cyan('5 - set state ON'))
+ print(set_cyan('6 - back to main menu'))
+ print(set_cyan('7 - exit'))
+
+def set_value(str):
+
+ if len(str) != 4:
+ print(set_error('Expected 4 digits'))
+ return -1
+
+ try:
+ value = int(str, 16)
+ except ValueError:
+ print(set_error('Not a valid hexadecimal number'))
+ return -1
+
+ return value
+
+########################
+# Main entry
+########################
+def main():
+
+ DBusGMainLoop(set_as_default=True)
+
+ global bus
+ bus = dbus.SystemBus()
+ global mainloop
+ global app
+ global mesh_net
+
+ if len(sys.argv) > 1 :
+ set_token(sys.argv[1])
+
+ mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
+ "/org/bluez/mesh"),
+ MESH_NETWORK_IFACE)
+ mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
+
+ app = Application(bus)
+
+ # Provisioning agent
+ if agent != None:
+ app.set_agent(agent.Agent(bus))
+
+ first_ele = Element(bus, 0x00)
+ second_ele = Element(bus, 0x01)
+
+ print(set_yellow('Register OnOff Server model on element 0'))
+ first_ele.add_model(OnOffServer(0x1000))
+
+ print(set_yellow('Register OnOff Client model on element 1'))
+ second_ele.add_model(OnOffClient(0x1001))
+ app.add_element(first_ele)
+ app.add_element(second_ele)
+
+ mainloop = GLib.MainLoop()
+
+ main_menu()
+ event_catcher = MenuHandler(process_input);
+ mainloop.run()
+
+if __name__ == '__main__':
+ main()
--
2.17.2



2019-03-11 22:20:54

by Gix, Brian

[permalink] [raw]
Subject: Re: [PATCH BlueZ v3] test: Add unified test for mesh node example app

Applied

On Sat, 2019-03-09 at 23:53 -0800, Inga Stotland wrote:
> This adds one script, test-mesh, to replace three test-join,
> example-onoff-server and example-onoff-client.
> This is menu driven test that allows provisioning (join) and/or
> connecting existing (attach) nodes.
> ---
> Makefile.tools | 2 +-
> test/agent.py | 14 +-
> test/example-onoff-client | 288 -------------
> test/example-onoff-server | 365 -----------------
> test/test-mesh | 842 ++++++++++++++++++++++++++++++++++++++
> 5 files changed, 854 insertions(+), 657 deletions(-)
> delete mode 100644 test/example-onoff-client
> delete mode 100644 test/example-onoff-server
> create mode 100755 test/test-mesh
>
> diff --git a/Makefile.tools b/Makefile.tools
> index 0f94bbbe7..379e127b6 100644
> --- a/Makefile.tools
> +++ b/Makefile.tools
> @@ -463,7 +463,7 @@ test_scripts += test/sap_client.py test/bluezutils.py \
> test/test-hfp test/opp-client test/ftp-client \
> test/pbap-client test/map-client test/example-advertisement \
> test/example-gatt-server test/example-gatt-client \
> - test/test-gatt-profile
> + test/test-gatt-profile test/test-mesh test/agent.py
>
> if BTPCLIENT
> noinst_PROGRAMS += tools/btpclient
> diff --git a/test/agent.py b/test/agent.py
> index 22c92f952..778dbe092 100755
> --- a/test/agent.py
> +++ b/test/agent.py
> @@ -1,9 +1,16 @@
> -#!/usr/bin/python
> +#!/usr/bin/python3
>
> import sys
> import dbus
> import dbus.service
> -import dbus.mainloop.glib
> +
> +try:
> + from termcolor import colored, cprint
> + set_green = lambda x: colored(x, 'green', attrs=['bold'])
> + set_cyan = lambda x: colored(x, 'cyan', attrs=['bold'])
> +except ImportError:
> + set_green = lambda x: x
> + set_cyan = lambda x: x
>
> AGENT_IFACE = 'org.bluez.mesh.ProvisionAgent1'
> AGENT_PATH = "/mesh/test/agent"
> @@ -37,4 +44,5 @@ class Agent(dbus.service.Object):
>
> @dbus.service.method(AGENT_IFACE, in_signature="su", out_signature="")
> def DisplayNumeric(self, type, value):
> - print("DisplayNumeric type=", type, " number=", value)
> + print(set_cyan('DisplayNumeric ('), type,
> + set_cyan(') number ='), set_green(value))
> diff --git a/test/example-onoff-client b/test/example-onoff-client
> deleted file mode 100644
> index e4a87eb12..000000000
> --- a/test/example-onoff-client
> +++ /dev/null
> @@ -1,288 +0,0 @@
> -#!/usr/bin/env python3
> -
> -import sys
> -import struct
> -import numpy
> -import dbus
> -import dbus.service
> -import dbus.exceptions
> -
> -try:
> - from gi.repository import GObject
> -except ImportError:
> - import gobject as GObject
> -from dbus.mainloop.glib import DBusGMainLoop
> -
> -MESH_SERVICE_NAME = 'org.bluez.mesh'
> -DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
> -DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
> -
> -MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
> -MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
> -MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
> -
> -VENDOR_ID_NONE = 0xffff
> -
> -app = None
> -bus = None
> -mainloop = None
> -node = None
> -token = numpy.uint64(0x76bd4f2372477600)
> -
> -def unwrap(item):
> - if isinstance(item, dbus.Boolean):
> - return bool(item)
> - if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32,
> - dbus.UInt64, dbus.Int64)):
> - return int(item)
> - if isinstance(item, dbus.Byte):
> - return bytes([int(item)])
> - if isinstance(item, dbus.String):
> - return item
> - if isinstance(item, (dbus.Array, list, tuple)):
> - return [unwrap(x) for x in item]
> - if isinstance(item, (dbus.Dictionary, dict)):
> - return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
> -
> - print('Dictionary item not handled')
> - print(type(item))
> - return item
> -
> -def attach_app_cb(node_path, dict_array):
> - print('Mesh application registered ', node_path)
> - print(type(node_path))
> - print(type(dict_array))
> - print(dict_array)
> -
> - els = unwrap(dict_array)
> - print("Get Elements")
> - for el in els:
> - print(el)
> - idx = struct.unpack('b', el[0])[0]
> - print('Configuration for Element ', end='')
> - print(idx)
> - models = el[1]
> -
> - element = app.get_element(idx)
> - element.set_model_config(models)
> -
> - obj = bus.get_object(MESH_SERVICE_NAME, node_path)
> - global node
> - node = dbus.Interface(obj, MESH_NODE_IFACE)
> -
> -def error_cb(error):
> - print('D-Bus call failed: ' + str(error))
> -
> -def generic_reply_cb():
> - print('D-Bus call done')
> -
> -def interfaces_removed_cb(object_path, interfaces):
> - if not mesh_net:
> - return
> -
> - if object_path == mesh_net[2]:
> - print('Service was removed')
> - mainloop.quit()
> -
> -class Application(dbus.service.Object):
> -
> - def __init__(self, bus):
> - self.path = '/example'
> - self.elements = []
> - dbus.service.Object.__init__(self, bus, self.path)
> -
> - def get_path(self):
> - return dbus.ObjectPath(self.path)
> -
> - def add_element(self, element):
> - self.elements.append(element)
> -
> - def get_element(self, idx):
> - for ele in self.elements:
> - if ele.get_index() == idx:
> - return ele
> -
> - @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
> - def GetManagedObjects(self):
> - response = {}
> - print('GetManagedObjects')
> - for element in self.elements:
> - response[element.get_path()] = element.get_properties()
> - return response
> -
> -class Element(dbus.service.Object):
> - PATH_BASE = '/example/ele'
> -
> - def __init__(self, bus, index):
> - self.path = self.PATH_BASE + format(index, '02x')
> - print(self.path)
> - self.models = []
> - self.bus = bus
> - self.index = index
> - dbus.service.Object.__init__(self, bus, self.path)
> -
> - def _get_sig_models(self):
> - ids = []
> - for model in self.models:
> - id = model.get_id()
> - vendor = model.get_vendor()
> - if vendor == VENDOR_ID_NONE:
> - ids.append(id)
> - return ids
> -
> - def get_properties(self):
> - return {
> - MESH_ELEMENT_IFACE: {
> - 'Index': dbus.Byte(self.index),
> - 'Models': dbus.Array(
> - self._get_sig_models(), signature='q')
> - }
> - }
> -
> - def add_model(self, model):
> - model.set_path(self.path)
> - self.models.append(model)
> -
> - def get_index(self):
> - return self.index
> -
> - def set_model_config(self, config):
> - print('Set element models config')
> -
> - @dbus.service.method(MESH_ELEMENT_IFACE,
> - in_signature="qqbay", out_signature="")
> - def MessageReceived(self, source, key, is_sub, data):
> - print('Message Received on Element ', end='')
> - print(self.index)
> - for model in self.models:
> - model.process_message(source, key, data)
> -
> - @dbus.service.method(MESH_ELEMENT_IFACE,
> - in_signature="qa{sv}", out_signature="")
> -
> - def UpdateModelConfiguration(self, model_id, config):
> - print('UpdateModelConfig ', end='')
> - print(hex(model_id))
> - for model in self.models:
> - if model_id == model.get_id():
> - model.set_config(config)
> - return
> -
> - @dbus.service.method(MESH_ELEMENT_IFACE,
> - in_signature="", out_signature="")
> - def get_path(self):
> - return dbus.ObjectPath(self.path)
> -
> -class Model():
> - def __init__(self, model_id):
> - self.cmd_ops = []
> - self.model_id = model_id
> - self.vendor = VENDOR_ID_NONE
> - self.path = None
> -
> - def set_path(self, path):
> - self.path = path
> -
> - def get_id(self):
> - return self.model_id
> -
> - def get_vendor(self):
> - return self.vendor
> -
> - def process_message(self, source, key, data):
> - print('Model process message')
> -
> - def set_publication(self, period):
> - self.period = period
> -
> - def set_bindings(self, bindings):
> - self.bindings = bindings
> -
> - def set_config(self, config):
> - if 'Bindings' in config:
> - self.bindings = config.get('Bindings')
> - print('Bindings: ', end='')
> - print(self.bindings)
> - if 'PublicationPeriod' in config:
> - self.set_publication(config.get('PublicationPeriod'))
> - print('Model publication period ', end='')
> - print(self.pub_period, end='')
> - print(' ms')
> -
> -class OnOffClient(Model):
> - def __init__(self, model_id):
> - Model.__init__(self, model_id)
> - self.cmd_ops = { 0x8201, # get
> - 0x8202, # set
> - 0x8203 } # set unacknowledged
> - print('OnOff Client')
> -
> - def _reply_cb(state):
> - print('State ', end='');
> - print(state)
> -
> - def _send_message(self, dest, key, data, reply_cb):
> - print('OnOffClient send data')
> - node.Send(self.path, dest, key, data, reply_handler=reply_cb,
> - error_handler=error_cb)
> -
> - def get_state(self, dest, key):
> - opcode = 0x8201
> - data = struct.pack('<H', opcode)
> - self._send_message(dest, key, data, self._reply_cb)
> -
> - def set_state(self, dest, key, state):
> - opcode = 0x8202
> - data = struct.pack('<HB', opcode, state)
> - self._send_message(dest, key, data, self._reply_cb)
> -
> - def process_message(self, source, key, data):
> - print('OnOffClient process message len ', end = '')
> - datalen = len(data)
> - print(datalen)
> -
> - if datalen!=3:
> - return
> -
> - opcode, state=struct.unpack('<HB',bytes(data))
> - if opcode != 0x8202 :
> - print('Bad opcode ', end='')
> - print(hex(opcode))
> - return
> -
> - print('Got state ', end = '')
> - print(hex(state))
> -
> -def attach_app_error_cb(error):
> - print('Failed to register application: ' + str(error))
> - mainloop.quit()
> -
> -def main():
> -
> - DBusGMainLoop(set_as_default=True)
> -
> - global bus
> - bus = dbus.SystemBus()
> - global mainloop
> - global app
> -
> - mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
> - "/org/bluez/mesh"),
> - MESH_NETWORK_IFACE)
> - mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
> -
> - app = Application(bus)
> - first_ele = Element(bus, 0x00)
> - first_ele.add_model(OnOffClient(0x1001))
> - app.add_element(first_ele)
> -
> - mainloop = GObject.MainLoop()
> -
> - print('Attach')
> - mesh_net.Attach(app.get_path(), token,
> - reply_handler=attach_app_cb,
> - error_handler=attach_app_error_cb)
> - mainloop.run()
> -
> -if __name__ == '__main__':
> - main()
> diff --git a/test/example-onoff-server b/test/example-onoff-server
> deleted file mode 100644
> index 131b6415c..000000000
> --- a/test/example-onoff-server
> +++ /dev/null
> @@ -1,365 +0,0 @@
> -#!/usr/bin/env python3
> -
> -import sys
> -import struct
> -import numpy
> -import dbus
> -import dbus.service
> -import dbus.exceptions
> -
> -from threading import Timer
> -import time
> -
> -
> -try:
> - from gi.repository import GObject
> -except ImportError:
> - import gobject as GObject
> -from dbus.mainloop.glib import DBusGMainLoop
> -
> -MESH_SERVICE_NAME = 'org.bluez.mesh'
> -DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
> -DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
> -
> -MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
> -MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
> -MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1'
> -MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
> -
> -APP_COMPANY_ID = 0x05f1
> -APP_PRODUCT_ID = 0x0001
> -APP_VERSION_ID = 0x0001
> -
> -VENDOR_ID_NONE = 0xffff
> -
> -app = None
> -bus = None
> -mainloop = None
> -node = None
> -
> -token = numpy.uint64(0x76bd4f2372476578)
> -
> -def generic_error_cb(error):
> - print('D-Bus call failed: ' + str(error))
> -
> -def generic_reply_cb():
> - print('D-Bus call done')
> -
> -def unwrap(item):
> - if isinstance(item, dbus.Boolean):
> - return bool(item)
> - if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32,
> - dbus.UInt64, dbus.Int64)):
> - return int(item)
> - if isinstance(item, dbus.Byte):
> - return bytes([int(item)])
> - if isinstance(item, dbus.String):
> - return item
> - if isinstance(item, (dbus.Array, list, tuple)):
> - return [unwrap(x) for x in item]
> - if isinstance(item, (dbus.Dictionary, dict)):
> - return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
> -
> - print('Dictionary item not handled')
> - print(type(item))
> - return item
> -
> -def attach_app_cb(node_path, dict_array):
> - print('Mesh application registered ', node_path)
> -
> - obj = bus.get_object(MESH_SERVICE_NAME, node_path)
> -
> - global node
> - node = dbus.Interface(obj, MESH_NODE_IFACE)
> -
> - els = unwrap(dict_array)
> - print("Get Elements")
> -
> - for el in els:
> - idx = struct.unpack('b', el[0])[0]
> - print('Configuration for Element ', end='')
> - print(idx)
> -
> - models = el[1]
> - element = app.get_element(idx)
> - element.set_model_config(models)
> -
> -def interfaces_removed_cb(object_path, interfaces):
> - if not mesh_net:
> - return
> -
> - if object_path == mesh_net[2]:
> - print('Service was removed')
> - mainloop.quit()
> -
> -def send_response(path, dest, key, data):
> - print('send response ', end='')
> - print(data)
> - node.Send(path, dest, key, data, reply_handler=generic_reply_cb,
> - error_handler=generic_error_cb)
> -
> -def send_publication(path, model_id, data):
> - print('send publication ', end='')
> - print(data)
> - node.Publish(path, model_id, data,
> - reply_handler=generic_reply_cb,
> - error_handler=generic_error_cb)
> -
> -class PubTimer():
> - def __init__(self):
> - self.seconds = None
> - self.func = None
> - self.thread = None
> - self.busy = False
> -
> - def _timeout_cb(self):
> - self.func()
> - self.busy = True
> - self._schedule_timer()
> - self.busy =False
> -
> - def _schedule_timer(self):
> - self.thread = Timer(self.seconds, self._timeout_cb)
> - self.thread.start()
> -
> - def start(self, seconds, func):
> - self.func = func
> - self.seconds = seconds
> - if not self.busy:
> - self._schedule_timer()
> -
> - def cancel(self):
> - print('Cancel timer')
> - if self.thread is not None:
> - print('Cancel thread')
> - self.thread.cancel()
> - self.thread = None
> -
> -class Application(dbus.service.Object):
> -
> - def __init__(self, bus):
> - self.path = '/example'
> - self.elements = []
> - dbus.service.Object.__init__(self, bus, self.path)
> -
> - def get_path(self):
> - return dbus.ObjectPath(self.path)
> -
> - def add_element(self, element):
> - self.elements.append(element)
> -
> - def get_element(self, idx):
> - for ele in self.elements:
> - if ele.get_index() == idx:
> - return ele
> -
> - def get_properties(self):
> - return {
> - MESH_APPLICATION_IFACE: {
> - 'CompanyID': dbus.UInt16(APP_COMPANY_ID),
> - 'ProductID': dbus.UInt16(APP_PRODUCT_ID),
> - 'VersionID': dbus.UInt16(APP_VERSION_ID)
> - }
> - }
> -
> - @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
> - def GetManagedObjects(self):
> - response = {}
> - print('GetManagedObjects')
> - response[self.path] = self.get_properties()
> - for element in self.elements:
> - response[element.get_path()] = element.get_properties()
> - return response
> -
> -class Element(dbus.service.Object):
> - PATH_BASE = '/example/ele'
> -
> - def __init__(self, bus, index):
> - self.path = self.PATH_BASE + format(index, '02x')
> - print(self.path)
> - self.models = []
> - self.bus = bus
> - self.index = index
> - dbus.service.Object.__init__(self, bus, self.path)
> -
> - def _get_sig_models(self):
> - ids = []
> - for model in self.models:
> - id = model.get_id()
> - vendor = model.get_vendor()
> - if vendor == VENDOR_ID_NONE:
> - ids.append(id)
> - return ids
> -
> - def get_properties(self):
> - return {
> - MESH_ELEMENT_IFACE: {
> - 'Index': dbus.Byte(self.index),
> - 'Models': dbus.Array(
> - self._get_sig_models(), signature='q')
> - }
> - }
> -
> - def add_model(self, model):
> - model.set_path(self.path)
> - self.models.append(model)
> -
> - def get_index(self):
> - return self.index
> -
> - def set_model_config(self, configs):
> - print('Set element models config')
> - for config in configs:
> - mod_id = config[0]
> - self.UpdateModelConfiguration(mod_id, config[1])
> -
> - @dbus.service.method(MESH_ELEMENT_IFACE,
> - in_signature="qqbay", out_signature="")
> - def MessageReceived(self, source, key, is_sub, data):
> - print('Message Received on Element ', end='')
> - print(self.index)
> - for model in self.models:
> - model.process_message(source, key, data)
> -
> - @dbus.service.method(MESH_ELEMENT_IFACE,
> - in_signature="qa{sv}", out_signature="")
> -
> - def UpdateModelConfiguration(self, model_id, config):
> - print('UpdateModelConfig ', end='')
> - print(hex(model_id))
> - for model in self.models:
> - if model_id == model.get_id():
> - model.set_config(config)
> - return
> -
> - @dbus.service.method(MESH_ELEMENT_IFACE,
> - in_signature="", out_signature="")
> -
> - def get_path(self):
> - return dbus.ObjectPath(self.path)
> -
> -class Model():
> - def __init__(self, model_id):
> - self.cmd_ops = []
> - self.model_id = model_id
> - self.vendor = VENDOR_ID_NONE
> - self.bindings = []
> - self.pub_period = 0
> - self.pub_id = 0
> - self.path = None
> -
> - def set_path(self, path):
> - self.path = path
> -
> - def get_id(self):
> - return self.model_id
> -
> - def get_vendor(self):
> - return self.vendor
> -
> - def process_message(self, source, key, data):
> - print('Model process message')
> -
> - def set_publication(self, period):
> - self.pub_period = period
> -
> - def set_config(self, config):
> - if 'Bindings' in config:
> - self.bindings = config.get('Bindings')
> - print('Bindings: ', end='')
> - print(self.bindings)
> - if 'PublicationPeriod' in config:
> - self.set_publication(config.get('PublicationPeriod'))
> - print('Model publication period ', end='')
> - print(self.pub_period, end='')
> - print(' ms')
> -
> -class OnOffServer(Model):
> - def __init__(self, model_id):
> - Model.__init__(self, model_id)
> - self.cmd_ops = { 0x8201, # get
> - 0x8202, # set
> - 0x8203 } # set unacknowledged
> -
> - print("OnOff Server ", end="")
> - self.state = 0
> - print('State ', end='')
> - self.timer = PubTimer()
> -
> - def process_message(self, source, key, data):
> - datalen = len(data)
> - print('OnOff Server process message len ', datalen)
> -
> - if datalen!=2 and datalen!=3:
> - return
> -
> - if datalen==2:
> - op_tuple=struct.unpack('<H',bytes(data))
> - opcode = op_tuple[0]
> - if opcode != 0x8201:
> - print(hex(opcode))
> - return
> - print('Get state')
> - elif datalen==3:
> - opcode,self.state=struct.unpack('<HB',bytes(data))
> - if opcode != 0x8202 and opcode != 0x8203:
> - print(hex(opcode))
> - return
> - print('Set state: ', end='')
> - print(self.state)
> -
> - rsp_data = struct.pack('<HB', 0x8204, self.state)
> - send_response(self.path, source, key, rsp_data)
> -
> - def publish(self):
> - print('Publish')
> - data = struct.pack('B', self.state)
> - send_publication(self.path, self.model_id, data)
> -
> - def set_publication(self, period):
> - if period == 0:
> - self.pub_period = 0
> - self.timer.cancel()
> - return
> -
> - # We do not handle ms in this example
> - if period < 1000:
> - return
> -
> - self.pub_period = period
> - self.timer.start(period/1000, self.publish)
> -
> -def attach_app_error_cb(error):
> - print('Failed to register application: ' + str(error))
> - mainloop.quit()
> -
> -def main():
> -
> - DBusGMainLoop(set_as_default=True)
> -
> - global bus
> - bus = dbus.SystemBus()
> - global mainloop
> - global app
> -
> - mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
> - "/org/bluez/mesh"),
> - MESH_NETWORK_IFACE)
> - mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
> -
> - app = Application(bus)
> - first_ele = Element(bus, 0x00)
> - first_ele.add_model(OnOffServer(0x1000))
> - app.add_element(first_ele)
> -
> - mainloop = GObject.MainLoop()
> -
> - print('Attach')
> - mesh_net.Attach(app.get_path(), token,
> - reply_handler=attach_app_cb,
> - error_handler=attach_app_error_cb)
> -
> - mainloop.run()
> -
> -if __name__ == '__main__':
> - main()
> diff --git a/test/test-mesh b/test/test-mesh
> new file mode 100755
> index 000000000..fd02207bc
> --- /dev/null
> +++ b/test/test-mesh
> @@ -0,0 +1,842 @@
> +#!/usr/bin/env python3
> +
> +###################################################################
> +#
> +# This is a unified test sample for BT Mesh
> +#
> +# To run the test:
> +# test-mesh [token]
> +#
> +# 'token' is an optional argument. It must be a 16-digit
> +# hexadecimal number. The token must be associated with
> +# an existing node. The token is generated and assigned
> +# to a node as a result of successful provisioning (see
> +# explanation of "join" option).
> +# When the token is set, the menu operations "attach"
> +# and "remove" may be performed on a node specified
> +# by this token.
> +#
> +# The test imitates a device with 2 elements:
> +# element 0: OnOff Server model
> +# element 1: OnOff Client model
> +#
> +# The main menu:
> +# 1 - set node ID (token)
> +# 2 - join mesh network
> +# 3 - attach mesh node
> +# 4 - remove node
> +# 5 - client menu
> +# 6 - exit
> +#
> +# The main menu options explained:
> +# 1 - set token
> +# Set the unique node token.
> +# The token can be set from command line arguments as
> +# well.
> +#
> +# 2 - join
> +# Request provisioning of a device to become a node
> +# on a mesh network. The test generates device UUID
> +# which is displayed and will need to be provided to
> +# an outside entity that acts as a Provisioner. Also,
> +# during the provisioning process, an agent that is
> +# part of the test, will request (or will be requested)
> +# to perform a specified operation, e.g., a number will
> +# be displayed and this number will need to be entered
> +# on the Provisioner's side.
> +# In case of successful provisioning, the application
> +# automatically attaches as a node to the daemon. A node
> +# 'token' is returned to the application and is used
> +# for the runtime of the test.
> +#
> +# 3 - attach
> +# Attach the application to bluetoothd-daemon as a node.
> +# For the call to be successful, the valid node token must
> +# be already set, either from command arguments or by
> +# executing "set token" operation or automatically after
> +# successfully executing "join" operation in the same test
> +# run.
> +#
> +# 4 - remove
> +# Permanently removes any node configuration from daemon
> +# and persistent storage. After this operation, the node
> +# is permanently forgotten by the daemon and the associated
> +# node token is no longer valid.
> +#
> +# 5 - client menu
> +# Enter On/Off client submenu.
> +#
> +# 6 - exit
> +#
> +###################################################################
> +import sys
> +import struct
> +import fcntl
> +import os
> +import numpy
> +import random
> +import dbus
> +import dbus.service
> +import dbus.exceptions
> +
> +from threading import Timer
> +import time
> +
> +try:
> + from gi.repository import GLib
> +except ImportError:
> + import glib as GLib
> +from dbus.mainloop.glib import DBusGMainLoop
> +
> +try:
> + from termcolor import colored, cprint
> + set_error = lambda x: colored('!' + x, 'red', attrs=['bold'])
> + set_cyan = lambda x: colored(x, 'cyan', attrs=['bold'])
> + set_green = lambda x: colored(x, 'green', attrs=['bold'])
> + set_yellow = lambda x: colored(x, 'yellow', attrs=['bold'])
> +except ImportError:
> + print('!!! Install termcolor module for better experience !!!')
> + set_error = lambda x: x
> + set_cyan = lambda x: x
> + set_green = lambda x: x
> + set_yellow = lambda x: x
> +
> +# Provisioning agent
> +try:
> + import agent
> +except ImportError:
> + agent = None
> +
> +MESH_SERVICE_NAME = 'org.bluez.mesh'
> +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
> +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
> +
> +MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
> +MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
> +MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1'
> +MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
> +
> +APP_COMPANY_ID = 0x05f1
> +APP_PRODUCT_ID = 0x0001
> +APP_VERSION_ID = 0x0001
> +
> +VENDOR_ID_NONE = 0xffff
> +
> +app = None
> +bus = None
> +mainloop = None
> +node = None
> +mesh_net = None
> +
> +menu_level = 0
> +dst_addr = 0x0000
> +app_idx = 0
> +
> +# Node token housekeeping
> +token = None
> +have_token = False
> +
> +user_input = 0
> +
> +
> +def app_exit():
> + global mainloop
> + global app
> +
> + for el in app.elements:
> + for model in el.models:
> + if model.timer != None:
> + model.timer.cancel()
> + mainloop.quit()
> +
> +def array_to_string(b_array):
> + str = ""
> + for b in b_array:
> + str += "%02x" % b
> + return str
> +
> +def generic_error_cb(error):
> + print(set_error('D-Bus call failed: ') + str(error))
> +
> +def generic_reply_cb():
> + return
> +
> +def attach_app_error_cb(error):
> + print(set_error('Failed to register application: ') + str(error))
> +
> +def attach(token):
> + print('Attach mesh node to bluetooth-meshd daemon')
> +
> + mesh_net.Attach(app.get_path(), token,
> + reply_handler=attach_app_cb,
> + error_handler=attach_app_error_cb)
> +
> +def join_cb():
> + print('Join procedure started')
> +
> +def join_error_cb(reason):
> + print('Join procedure failed: ', reason)
> +
> +def unwrap(item):
> + if isinstance(item, dbus.Boolean):
> + return bool(item)
> + if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32,
> + dbus.UInt64, dbus.Int64)):
> + return int(item)
> + if isinstance(item, dbus.Byte):
> + return bytes([int(item)])
> + if isinstance(item, dbus.String):
> + return item
> + if isinstance(item, (dbus.Array, list, tuple)):
> + return [unwrap(x) for x in item]
> + if isinstance(item, (dbus.Dictionary, dict)):
> + return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
> +
> + print(set_error('Dictionary item not handled: ') + type(item))
> +
> + return item
> +
> +def attach_app_cb(node_path, dict_array):
> + print('Mesh application registered ', node_path)
> +
> + obj = bus.get_object(MESH_SERVICE_NAME, node_path)
> +
> + global node
> + node = dbus.Interface(obj, MESH_NODE_IFACE)
> +
> + els = unwrap(dict_array)
> +
> + for el in els:
> + idx = struct.unpack('b', el[0])[0]
> +
> + models = el[1]
> + element = app.get_element(idx)
> + element.set_model_config(models)
> +
> +def interfaces_removed_cb(object_path, interfaces):
> + print('Removed')
> + if not mesh_net:
> + return
> +
> + print(object_path)
> + if object_path == mesh_net[2]:
> + print('Service was removed')
> + app_exit()
> +
> +def send_response(path, dest, key, data):
> + node.Send(path, dest, key, data, reply_handler=generic_reply_cb,
> + error_handler=generic_error_cb)
> +
> +def send_publication(path, model_id, data):
> + print('Send publication ', end='')
> + print(data)
> + node.Publish(path, model_id, data,
> + reply_handler=generic_reply_cb,
> + error_handler=generic_error_cb)
> +
> +def print_state(state):
> + print('State is ', end='')
> + if state == 0:
> + print('OFF')
> + elif state == 1:
> + print('ON')
> + else:
> + print('UNKNOWN')
> +class PubTimer():
> + def __init__(self):
> + self.seconds = None
> + self.func = None
> + self.thread = None
> + self.busy = False
> +
> + def _timeout_cb(self):
> + self.func()
> + self.busy = True
> + self._schedule_timer()
> + self.busy =False
> +
> + def _schedule_timer(self):
> + self.thread = Timer(self.seconds, self._timeout_cb)
> + self.thread.start()
> +
> + def start(self, seconds, func):
> + self.func = func
> + self.seconds = seconds
> + if not self.busy:
> + self._schedule_timer()
> +
> + def cancel(self):
> + if self.thread is not None:
> + self.thread.cancel()
> + self.thread = None
> +
> +class Application(dbus.service.Object):
> +
> + def __init__(self, bus):
> + self.path = '/example'
> + self.agent = None
> + self.elements = []
> + dbus.service.Object.__init__(self, bus, self.path)
> +
> + def set_agent(self, agent):
> + self.agent = agent
> +
> + def get_path(self):
> + return dbus.ObjectPath(self.path)
> +
> + def add_element(self, element):
> + self.elements.append(element)
> +
> + def get_element(self, idx):
> + for ele in self.elements:
> + if ele.get_index() == idx:
> + return ele
> +
> + def get_properties(self):
> + return {
> + MESH_APPLICATION_IFACE: {
> + 'CompanyID': dbus.UInt16(APP_COMPANY_ID),
> + 'ProductID': dbus.UInt16(APP_PRODUCT_ID),
> + 'VersionID': dbus.UInt16(APP_VERSION_ID)
> + }
> + }
> +
> + @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
> + def GetManagedObjects(self):
> + response = {}
> + response[self.path] = self.get_properties()
> + response[self.agent.get_path()] = self.agent.get_properties()
> + for element in self.elements:
> + response[element.get_path()] = element.get_properties()
> + return response
> +
> + @dbus.service.method(MESH_APPLICATION_IFACE,
> + in_signature="t", out_signature="")
> + def JoinComplete(self, value):
> + global token
> + global have_token
> +
> + print('JoinComplete with token ' + set_green(hex(value)))
> +
> + token = value
> + have_token = True
> +
> + attach(token)
> +
> + @dbus.service.method(MESH_APPLICATION_IFACE,
> + in_signature="s", out_signature="")
> + def JoinFailed(self, value):
> + print(set_error('JoinFailed '), value)
> +
> +
> +class Element(dbus.service.Object):
> + PATH_BASE = '/example/ele'
> +
> + def __init__(self, bus, index):
> + self.path = self.PATH_BASE + format(index, '02x')
> + self.models = []
> + self.bus = bus
> + self.index = index
> + dbus.service.Object.__init__(self, bus, self.path)
> +
> + def _get_sig_models(self):
> + ids = []
> + for model in self.models:
> + id = model.get_id()
> + vendor = model.get_vendor()
> + if vendor == VENDOR_ID_NONE:
> + ids.append(id)
> + return ids
> +
> + def get_properties(self):
> + return {
> + MESH_ELEMENT_IFACE: {
> + 'Index': dbus.Byte(self.index),
> + 'Models': dbus.Array(
> + self._get_sig_models(), signature='q')
> + }
> + }
> +
> + def add_model(self, model):
> + model.set_path(self.path)
> + self.models.append(model)
> +
> + def get_index(self):
> + return self.index
> +
> + def set_model_config(self, configs):
> + for config in configs:
> + mod_id = config[0]
> + self.UpdateModelConfiguration(mod_id, config[1])
> +
> + @dbus.service.method(MESH_ELEMENT_IFACE,
> + in_signature="qqbay", out_signature="")
> + def MessageReceived(self, source, key, is_sub, data):
> + print('Message Received on Element ', end='')
> + print(self.index)
> + for model in self.models:
> + model.process_message(source, key, data)
> +
> + @dbus.service.method(MESH_ELEMENT_IFACE,
> + in_signature="qa{sv}", out_signature="")
> +
> + def UpdateModelConfiguration(self, model_id, config):
> + print('UpdateModelConfig ', end='')
> + print(hex(model_id))
> + for model in self.models:
> + if model_id == model.get_id():
> + model.set_config(config)
> + return
> +
> + @dbus.service.method(MESH_ELEMENT_IFACE,
> + in_signature="", out_signature="")
> +
> + def get_path(self):
> + return dbus.ObjectPath(self.path)
> +
> +class Model():
> + def __init__(self, model_id):
> + self.cmd_ops = []
> + self.model_id = model_id
> + self.vendor = VENDOR_ID_NONE
> + self.bindings = []
> + self.pub_period = 0
> + self.pub_id = 0
> + self.path = None
> + self.timer = None
> +
> + def set_path(self, path):
> + self.path = path
> +
> + def get_id(self):
> + return self.model_id
> +
> + def get_vendor(self):
> + return self.vendor
> +
> + def process_message(self, source, key, data):
> + return
> +
> + def set_publication(self, period):
> + self.pub_period = period
> +
> + def set_config(self, config):
> + if 'Bindings' in config:
> + self.bindings = config.get('Bindings')
> + print('Bindings: ', end='')
> + print(self.bindings)
> + if 'PublicationPeriod' in config:
> + self.set_publication(config.get('PublicationPeriod'))
> + print('Model publication period ', end='')
> + print(self.pub_period, end='')
> + print(' ms')
> +
> + def print_bindings(self):
> + print(set_cyan('Model'), set_cyan('%04x' % self.model_id),
> + set_cyan('is bound to application key(s): '), end = '')
> +
> + if len(self.bindings) == 0:
> + print(set_cyan('** None **'))
> + for b in self.bindings:
> + print(set_cyan('%04x' % b), set_cyan(', '))
> +
> +########################
> +# On Off Server Model
> +########################
> +class OnOffServer(Model):
> + def __init__(self, model_id):
> + Model.__init__(self, model_id)
> + self.cmd_ops = { 0x8201, # get
> + 0x8202, # set
> + 0x8203, # set unacknowledged
> + 0x8204 } # status
> +
> + print("OnOff Server ")
> + self.state = 0
> + print_state(self.state)
> + self.timer = PubTimer()
> +
> + def process_message(self, source, key, data):
> + datalen = len(data)
> + print('OnOff Server process message len: ', datalen)
> +
> + if datalen != 2 and datalen != 3:
> + # The opcode is not recognized by this model
> + return
> +
> + if datalen == 2:
> + op_tuple=struct.unpack('<H',bytes(data))
> + opcode = op_tuple[0]
> + if opcode != 0x8201:
> + # The opcode is not recognized by this model
> + return
> + print('Get state')
> + elif datalen == 3:
> + opcode,self.state=struct.unpack('<HB',bytes(data))
> + if opcode != 0x8202 and opcode != 0x8203:
> + # The opcode is not recognized by this model
> + return
> + print_state(self.state)
> +
> + rsp_data = struct.pack('<HB', 0x8204, self.state)
> + send_response(self.path, source, key, rsp_data)
> +
> + def set_publication(self, period):
> +
> + # We do not handle ms in this example
> + if period < 1000:
> + return
> +
> + self.pub_period = period
> + if period == 0:
> + self.timer.cancel()
> + return
> +
> + self.timer.start(period/1000, self.publish)
> +
> +
> + def publish(self):
> + print('Publish')
> + data = struct.pack('<HB', 0x8204, self.state)
> + send_publication(self.path, self.model_id, data)
> +
> +########################
> +# On Off Client Model
> +########################
> +class OnOffClient(Model):
> + def __init__(self, model_id):
> + Model.__init__(self, model_id)
> + self.cmd_ops = { 0x8201, # get
> + 0x8202, # set
> + 0x8203, # set unacknowledged
> + 0x8204 } # status
> + print('OnOff Client')
> +
> + def _reply_cb(state):
> + print('State ', end='');
> + print(state)
> +
> + def _send_message(self, dest, key, data, reply_cb):
> + print('OnOffClient send data')
> + node.Send(self.path, dest, key, data, reply_handler=reply_cb,
> + error_handler=generic_error_cb)
> +
> + def get_state(self, dest, key):
> + opcode = 0x8201
> + data = struct.pack('<H', opcode)
> + self._send_message(dest, key, data, self._reply_cb)
> +
> + def set_state(self, dest, key, state):
> + opcode = 0x8202
> + print('State:', state)
> + data = struct.pack('<HB', opcode, state)
> + self._send_message(dest, key, data, self._reply_cb)
> +
> + def process_message(self, source, key, data):
> + print('OnOffClient process message len = ', end = '')
> + datalen = len(data)
> + print(datalen)
> +
> + if datalen != 3:
> + # The opcode is not recognized by this model
> + return
> +
> + opcode, state=struct.unpack('<HB',bytes(data))
> +
> + if opcode != 0x8204 :
> + # The opcode is not recognized by this model
> + return
> +
> + print(set_yellow('Got state '), end = '')
> +
> + state_str = "ON"
> + if state == 0:
> + state_str = "OFF"
> +
> + print(set_green(state_str), set_yellow('from'),
> + set_green('%04x' % source))
> +
> +########################
> +# Menu functions
> +########################
> +class MenuHandler(object):
> + def __init__(self, callback):
> + self.cb = callback
> + flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
> + flags |= os.O_NONBLOCK
> + fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags)
> + sys.stdin.flush()
> + GLib.io_add_watch(sys.stdin, GLib.IO_IN, self.input_callback)
> +
> + def input_callback(self, fd, condition):
> + chunk = fd.read()
> + buffer = ''
> + for char in chunk:
> + buffer += char
> + if char == '\n':
> + self.cb(buffer)
> +
> + return True
> +
> +def process_input(input_str):
> + if menu_level == 0:
> + process_main_menu(input_str)
> + elif menu_level == 1:
> + process_client_menu(input_str)
> + else:
> + print(set_error('BUG: bad menu level'))
> +
> +def switch_menu(level):
> + global menu_level
> +
> + if level > 1:
> + return
> +
> + if level == 0:
> + main_menu()
> + elif level == 1:
> + client_menu()
> +
> + menu_level = level
> +
> +########################
> +# Main menu functions
> +########################
> +def process_main_menu(input_str):
> + global token
> + global user_input
> + global have_token
> +
> + str = input_str.strip()
> +
> + if user_input == 1:
> + res = set_token(str)
> + user_input = 0
> +
> + if res == False:
> + main_menu()
> +
> + return
> +
> + # Allow entering empty lines for better output visibility
> + if len(str) == 0:
> + return
> +
> + if str.isdigit() == False:
> + main_menu()
> + return
> +
> + opt = int(str)
> +
> + if opt > 6:
> + print(set_error('Unknown menu option: '), opt)
> + main_menu()
> + elif opt == 1:
> + if have_token:
> + print('Token already set')
> + return
> +
> + user_input = 1;
> + print(set_cyan('Enter 16-digit hex node ID:'))
> + elif opt == 2:
> + if agent == None:
> + print(set_error('Provisioning agent not found'))
> + return
> +
> + join_mesh()
> + elif opt == 3:
> + if have_token == False:
> + print(set_error('Token is not set'))
> + main_menu()
> + return
> +
> + attach(token)
> + elif opt == 4:
> + if have_token == False:
> + print(set_error('Token is not set'))
> + main_menu()
> + return
> +
> + print('Remove mesh node')
> + mesh_net.Leave(token, reply_handler=generic_reply_cb,
> + error_handler=generic_error_cb)
> + have_token = False
> + elif opt == 5:
> + switch_menu(1)
> + elif opt == 6:
> + app_exit()
> +
> +
> +def main_menu():
> + print(set_cyan('*** MAIN MENU ***'))
> + print(set_cyan('1 - set node ID (token)'))
> + print(set_cyan('2 - join mesh network'))
> + print(set_cyan('3 - attach mesh node'))
> + print(set_cyan('4 - remove node'))
> + print(set_cyan('5 - client menu'))
> + print(set_cyan('6 - exit'))
> +
> +def set_token(str):
> + global token
> + global have_token
> +
> + if len(str) != 16:
> + print(set_error('Expected 16 digits'))
> + return False
> +
> + try:
> + input_number = int(str, 16)
> + except ValueError:
> + print(set_error('Not a valid hexadecimal number'))
> + return False
> +
> + token = numpy.uint64(input_number)
> + have_token = True
> +
> + return True
> +
> +def join_mesh():
> + uuid = bytearray.fromhex("0a0102030405060708090A0B0C0D0E0F")
> +
> + caps = ["out-numeric"]
> + oob = ["other"]
> +
> + random.shuffle(uuid)
> + uuid_str = array_to_string(uuid)
> + print('Joining with UUID ' + set_green(uuid_str))
> +
> + mesh_net.Join(app.get_path(), uuid,
> + reply_handler=join_cb,
> + error_handler=join_error_cb)
> +
> +##############################
> +# On/Off Client menu functions
> +##############################
> +def process_client_menu(input_str):
> + global user_input
> + global dst_addr
> + global app_idx
> +
> + res = -1
> + str = input_str.strip()
> +
> + if user_input == 1:
> + res = set_value(str)
> + if res != -1:
> + dst_addr = res
> + elif user_input == 2:
> + res = set_value(str)
> + if res != -1:
> + app_idx = res
> +
> + if user_input != 0:
> + user_input = 0
> + if res == -1:
> + client_menu()
> + return
> +
> + # Allow entering empty lines for better output visibility
> + if len(str) == 0:
> + return
> +
> + if str.isdigit() == False:
> + client_menu()
> + return
> +
> + opt = int(str)
> +
> + if opt > 7:
> + print(set_error('Unknown menu option: '), opt)
> + client_menu()
> + return
> +
> + if opt >= 3 and opt <= 5 and dst_addr == 0x0000:
> + print(set_error('Destination address not set!'))
> + return
> +
> + if opt == 1:
> + user_input = 1;
> + print(set_cyan('Enter 4-digit hex destination address:'))
> + elif opt == 2:
> + user_input = 2;
> + app.elements[1].models[0].print_bindings()
> + print(set_cyan('Choose application key index:'))
> + elif opt == 3:
> + app.elements[1].models[0].get_state(dst_addr, app_idx)
> + elif opt == 4 or opt == 5:
> + app.elements[1].models[0].set_state(dst_addr, app_idx, opt - 4)
> + elif opt == 6:
> + switch_menu(0)
> + elif opt == 7:
> + app_exit()
> +
> +def client_menu():
> + print(set_cyan('*** ON/OFF CLIENT MENU ***'))
> + print(set_cyan('1 - set destination address'))
> + print(set_cyan('2 - set application key index'))
> + print(set_cyan('3 - get state'))
> + print(set_cyan('4 - set state OFF'))
> + print(set_cyan('5 - set state ON'))
> + print(set_cyan('6 - back to main menu'))
> + print(set_cyan('7 - exit'))
> +
> +def set_value(str):
> +
> + if len(str) != 4:
> + print(set_error('Expected 4 digits'))
> + return -1
> +
> + try:
> + value = int(str, 16)
> + except ValueError:
> + print(set_error('Not a valid hexadecimal number'))
> + return -1
> +
> + return value
> +
> +########################
> +# Main entry
> +########################
> +def main():
> +
> + DBusGMainLoop(set_as_default=True)
> +
> + global bus
> + bus = dbus.SystemBus()
> + global mainloop
> + global app
> + global mesh_net
> +
> + if len(sys.argv) > 1 :
> + set_token(sys.argv[1])
> +
> + mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
> + "/org/bluez/mesh"),
> + MESH_NETWORK_IFACE)
> + mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
> +
> + app = Application(bus)
> +
> + # Provisioning agent
> + if agent != None:
> + app.set_agent(agent.Agent(bus))
> +
> + first_ele = Element(bus, 0x00)
> + second_ele = Element(bus, 0x01)
> +
> + print(set_yellow('Register OnOff Server model on element 0'))
> + first_ele.add_model(OnOffServer(0x1000))
> +
> + print(set_yellow('Register OnOff Client model on element 1'))
> + second_ele.add_model(OnOffClient(0x1001))
> + app.add_element(first_ele)
> + app.add_element(second_ele)
> +
> + mainloop = GLib.MainLoop()
> +
> + main_menu()
> + event_catcher = MenuHandler(process_input);
> + mainloop.run()
> +
> +if __name__ == '__main__':
> + main()