This adds one script, test-mesh, to replace three test-join,
example-onoff-server and example-onoff-client.
This is menu driven test that allows provisioning (join) and/or
connecting existing (attach) nodes.
---
test/agent.py | 14 +-
test/example-onoff-client | 288 ----------------
test/example-onoff-server | 365 --------------------
test/test-mesh | 706 ++++++++++++++++++++++++++++++++++++++
4 files changed, 717 insertions(+), 656 deletions(-)
delete mode 100644 test/example-onoff-client
delete mode 100644 test/example-onoff-server
create mode 100755 test/test-mesh
diff --git a/test/agent.py b/test/agent.py
index 22c92f952..778dbe092 100755
--- a/test/agent.py
+++ b/test/agent.py
@@ -1,9 +1,16 @@
-#!/usr/bin/python
+#!/usr/bin/python3
import sys
import dbus
import dbus.service
-import dbus.mainloop.glib
+
+try:
+ from termcolor import colored, cprint
+ set_green = lambda x: colored(x, 'green', attrs=['bold'])
+ set_cyan = lambda x: colored(x, 'cyan', attrs=['bold'])
+except ImportError:
+ set_green = lambda x: x
+ set_cyan = lambda x: x
AGENT_IFACE = 'org.bluez.mesh.ProvisionAgent1'
AGENT_PATH = "/mesh/test/agent"
@@ -37,4 +44,5 @@ class Agent(dbus.service.Object):
@dbus.service.method(AGENT_IFACE, in_signature="su", out_signature="")
def DisplayNumeric(self, type, value):
- print("DisplayNumeric type=", type, " number=", value)
+ print(set_cyan('DisplayNumeric ('), type,
+ set_cyan(') number ='), set_green(value))
diff --git a/test/example-onoff-client b/test/example-onoff-client
deleted file mode 100644
index e4a87eb12..000000000
--- a/test/example-onoff-client
+++ /dev/null
@@ -1,288 +0,0 @@
-#!/usr/bin/env python3
-
-import sys
-import struct
-import numpy
-import dbus
-import dbus.service
-import dbus.exceptions
-
-try:
- from gi.repository import GObject
-except ImportError:
- import gobject as GObject
-from dbus.mainloop.glib import DBusGMainLoop
-
-MESH_SERVICE_NAME = 'org.bluez.mesh'
-DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
-DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
-
-MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
-MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
-MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
-
-VENDOR_ID_NONE = 0xffff
-
-app = None
-bus = None
-mainloop = None
-node = None
-token = numpy.uint64(0x76bd4f2372477600)
-
-def unwrap(item):
- if isinstance(item, dbus.Boolean):
- return bool(item)
- if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32,
- dbus.UInt64, dbus.Int64)):
- return int(item)
- if isinstance(item, dbus.Byte):
- return bytes([int(item)])
- if isinstance(item, dbus.String):
- return item
- if isinstance(item, (dbus.Array, list, tuple)):
- return [unwrap(x) for x in item]
- if isinstance(item, (dbus.Dictionary, dict)):
- return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
-
- print('Dictionary item not handled')
- print(type(item))
- return item
-
-def attach_app_cb(node_path, dict_array):
- print('Mesh application registered ', node_path)
- print(type(node_path))
- print(type(dict_array))
- print(dict_array)
-
- els = unwrap(dict_array)
- print("Get Elements")
- for el in els:
- print(el)
- idx = struct.unpack('b', el[0])[0]
- print('Configuration for Element ', end='')
- print(idx)
- models = el[1]
-
- element = app.get_element(idx)
- element.set_model_config(models)
-
- obj = bus.get_object(MESH_SERVICE_NAME, node_path)
- global node
- node = dbus.Interface(obj, MESH_NODE_IFACE)
-
-def error_cb(error):
- print('D-Bus call failed: ' + str(error))
-
-def generic_reply_cb():
- print('D-Bus call done')
-
-def interfaces_removed_cb(object_path, interfaces):
- if not mesh_net:
- return
-
- if object_path == mesh_net[2]:
- print('Service was removed')
- mainloop.quit()
-
-class Application(dbus.service.Object):
-
- def __init__(self, bus):
- self.path = '/example'
- self.elements = []
- dbus.service.Object.__init__(self, bus, self.path)
-
- def get_path(self):
- return dbus.ObjectPath(self.path)
-
- def add_element(self, element):
- self.elements.append(element)
-
- def get_element(self, idx):
- for ele in self.elements:
- if ele.get_index() == idx:
- return ele
-
- @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
- def GetManagedObjects(self):
- response = {}
- print('GetManagedObjects')
- for element in self.elements:
- response[element.get_path()] = element.get_properties()
- return response
-
-class Element(dbus.service.Object):
- PATH_BASE = '/example/ele'
-
- def __init__(self, bus, index):
- self.path = self.PATH_BASE + format(index, '02x')
- print(self.path)
- self.models = []
- self.bus = bus
- self.index = index
- dbus.service.Object.__init__(self, bus, self.path)
-
- def _get_sig_models(self):
- ids = []
- for model in self.models:
- id = model.get_id()
- vendor = model.get_vendor()
- if vendor == VENDOR_ID_NONE:
- ids.append(id)
- return ids
-
- def get_properties(self):
- return {
- MESH_ELEMENT_IFACE: {
- 'Index': dbus.Byte(self.index),
- 'Models': dbus.Array(
- self._get_sig_models(), signature='q')
- }
- }
-
- def add_model(self, model):
- model.set_path(self.path)
- self.models.append(model)
-
- def get_index(self):
- return self.index
-
- def set_model_config(self, config):
- print('Set element models config')
-
- @dbus.service.method(MESH_ELEMENT_IFACE,
- in_signature="qqbay", out_signature="")
- def MessageReceived(self, source, key, is_sub, data):
- print('Message Received on Element ', end='')
- print(self.index)
- for model in self.models:
- model.process_message(source, key, data)
-
- @dbus.service.method(MESH_ELEMENT_IFACE,
- in_signature="qa{sv}", out_signature="")
-
- def UpdateModelConfiguration(self, model_id, config):
- print('UpdateModelConfig ', end='')
- print(hex(model_id))
- for model in self.models:
- if model_id == model.get_id():
- model.set_config(config)
- return
-
- @dbus.service.method(MESH_ELEMENT_IFACE,
- in_signature="", out_signature="")
- def get_path(self):
- return dbus.ObjectPath(self.path)
-
-class Model():
- def __init__(self, model_id):
- self.cmd_ops = []
- self.model_id = model_id
- self.vendor = VENDOR_ID_NONE
- self.path = None
-
- def set_path(self, path):
- self.path = path
-
- def get_id(self):
- return self.model_id
-
- def get_vendor(self):
- return self.vendor
-
- def process_message(self, source, key, data):
- print('Model process message')
-
- def set_publication(self, period):
- self.period = period
-
- def set_bindings(self, bindings):
- self.bindings = bindings
-
- def set_config(self, config):
- if 'Bindings' in config:
- self.bindings = config.get('Bindings')
- print('Bindings: ', end='')
- print(self.bindings)
- if 'PublicationPeriod' in config:
- self.set_publication(config.get('PublicationPeriod'))
- print('Model publication period ', end='')
- print(self.pub_period, end='')
- print(' ms')
-
-class OnOffClient(Model):
- def __init__(self, model_id):
- Model.__init__(self, model_id)
- self.cmd_ops = { 0x8201, # get
- 0x8202, # set
- 0x8203 } # set unacknowledged
- print('OnOff Client')
-
- def _reply_cb(state):
- print('State ', end='');
- print(state)
-
- def _send_message(self, dest, key, data, reply_cb):
- print('OnOffClient send data')
- node.Send(self.path, dest, key, data, reply_handler=reply_cb,
- error_handler=error_cb)
-
- def get_state(self, dest, key):
- opcode = 0x8201
- data = struct.pack('<H', opcode)
- self._send_message(dest, key, data, self._reply_cb)
-
- def set_state(self, dest, key, state):
- opcode = 0x8202
- data = struct.pack('<HB', opcode, state)
- self._send_message(dest, key, data, self._reply_cb)
-
- def process_message(self, source, key, data):
- print('OnOffClient process message len ', end = '')
- datalen = len(data)
- print(datalen)
-
- if datalen!=3:
- return
-
- opcode, state=struct.unpack('<HB',bytes(data))
- if opcode != 0x8202 :
- print('Bad opcode ', end='')
- print(hex(opcode))
- return
-
- print('Got state ', end = '')
- print(hex(state))
-
-def attach_app_error_cb(error):
- print('Failed to register application: ' + str(error))
- mainloop.quit()
-
-def main():
-
- DBusGMainLoop(set_as_default=True)
-
- global bus
- bus = dbus.SystemBus()
- global mainloop
- global app
-
- mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
- "/org/bluez/mesh"),
- MESH_NETWORK_IFACE)
- mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
-
- app = Application(bus)
- first_ele = Element(bus, 0x00)
- first_ele.add_model(OnOffClient(0x1001))
- app.add_element(first_ele)
-
- mainloop = GObject.MainLoop()
-
- print('Attach')
- mesh_net.Attach(app.get_path(), token,
- reply_handler=attach_app_cb,
- error_handler=attach_app_error_cb)
- mainloop.run()
-
-if __name__ == '__main__':
- main()
diff --git a/test/example-onoff-server b/test/example-onoff-server
deleted file mode 100644
index 131b6415c..000000000
--- a/test/example-onoff-server
+++ /dev/null
@@ -1,365 +0,0 @@
-#!/usr/bin/env python3
-
-import sys
-import struct
-import numpy
-import dbus
-import dbus.service
-import dbus.exceptions
-
-from threading import Timer
-import time
-
-
-try:
- from gi.repository import GObject
-except ImportError:
- import gobject as GObject
-from dbus.mainloop.glib import DBusGMainLoop
-
-MESH_SERVICE_NAME = 'org.bluez.mesh'
-DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
-DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
-
-MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
-MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
-MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1'
-MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
-
-APP_COMPANY_ID = 0x05f1
-APP_PRODUCT_ID = 0x0001
-APP_VERSION_ID = 0x0001
-
-VENDOR_ID_NONE = 0xffff
-
-app = None
-bus = None
-mainloop = None
-node = None
-
-token = numpy.uint64(0x76bd4f2372476578)
-
-def generic_error_cb(error):
- print('D-Bus call failed: ' + str(error))
-
-def generic_reply_cb():
- print('D-Bus call done')
-
-def unwrap(item):
- if isinstance(item, dbus.Boolean):
- return bool(item)
- if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32,
- dbus.UInt64, dbus.Int64)):
- return int(item)
- if isinstance(item, dbus.Byte):
- return bytes([int(item)])
- if isinstance(item, dbus.String):
- return item
- if isinstance(item, (dbus.Array, list, tuple)):
- return [unwrap(x) for x in item]
- if isinstance(item, (dbus.Dictionary, dict)):
- return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
-
- print('Dictionary item not handled')
- print(type(item))
- return item
-
-def attach_app_cb(node_path, dict_array):
- print('Mesh application registered ', node_path)
-
- obj = bus.get_object(MESH_SERVICE_NAME, node_path)
-
- global node
- node = dbus.Interface(obj, MESH_NODE_IFACE)
-
- els = unwrap(dict_array)
- print("Get Elements")
-
- for el in els:
- idx = struct.unpack('b', el[0])[0]
- print('Configuration for Element ', end='')
- print(idx)
-
- models = el[1]
- element = app.get_element(idx)
- element.set_model_config(models)
-
-def interfaces_removed_cb(object_path, interfaces):
- if not mesh_net:
- return
-
- if object_path == mesh_net[2]:
- print('Service was removed')
- mainloop.quit()
-
-def send_response(path, dest, key, data):
- print('send response ', end='')
- print(data)
- node.Send(path, dest, key, data, reply_handler=generic_reply_cb,
- error_handler=generic_error_cb)
-
-def send_publication(path, model_id, data):
- print('send publication ', end='')
- print(data)
- node.Publish(path, model_id, data,
- reply_handler=generic_reply_cb,
- error_handler=generic_error_cb)
-
-class PubTimer():
- def __init__(self):
- self.seconds = None
- self.func = None
- self.thread = None
- self.busy = False
-
- def _timeout_cb(self):
- self.func()
- self.busy = True
- self._schedule_timer()
- self.busy =False
-
- def _schedule_timer(self):
- self.thread = Timer(self.seconds, self._timeout_cb)
- self.thread.start()
-
- def start(self, seconds, func):
- self.func = func
- self.seconds = seconds
- if not self.busy:
- self._schedule_timer()
-
- def cancel(self):
- print('Cancel timer')
- if self.thread is not None:
- print('Cancel thread')
- self.thread.cancel()
- self.thread = None
-
-class Application(dbus.service.Object):
-
- def __init__(self, bus):
- self.path = '/example'
- self.elements = []
- dbus.service.Object.__init__(self, bus, self.path)
-
- def get_path(self):
- return dbus.ObjectPath(self.path)
-
- def add_element(self, element):
- self.elements.append(element)
-
- def get_element(self, idx):
- for ele in self.elements:
- if ele.get_index() == idx:
- return ele
-
- def get_properties(self):
- return {
- MESH_APPLICATION_IFACE: {
- 'CompanyID': dbus.UInt16(APP_COMPANY_ID),
- 'ProductID': dbus.UInt16(APP_PRODUCT_ID),
- 'VersionID': dbus.UInt16(APP_VERSION_ID)
- }
- }
-
- @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
- def GetManagedObjects(self):
- response = {}
- print('GetManagedObjects')
- response[self.path] = self.get_properties()
- for element in self.elements:
- response[element.get_path()] = element.get_properties()
- return response
-
-class Element(dbus.service.Object):
- PATH_BASE = '/example/ele'
-
- def __init__(self, bus, index):
- self.path = self.PATH_BASE + format(index, '02x')
- print(self.path)
- self.models = []
- self.bus = bus
- self.index = index
- dbus.service.Object.__init__(self, bus, self.path)
-
- def _get_sig_models(self):
- ids = []
- for model in self.models:
- id = model.get_id()
- vendor = model.get_vendor()
- if vendor == VENDOR_ID_NONE:
- ids.append(id)
- return ids
-
- def get_properties(self):
- return {
- MESH_ELEMENT_IFACE: {
- 'Index': dbus.Byte(self.index),
- 'Models': dbus.Array(
- self._get_sig_models(), signature='q')
- }
- }
-
- def add_model(self, model):
- model.set_path(self.path)
- self.models.append(model)
-
- def get_index(self):
- return self.index
-
- def set_model_config(self, configs):
- print('Set element models config')
- for config in configs:
- mod_id = config[0]
- self.UpdateModelConfiguration(mod_id, config[1])
-
- @dbus.service.method(MESH_ELEMENT_IFACE,
- in_signature="qqbay", out_signature="")
- def MessageReceived(self, source, key, is_sub, data):
- print('Message Received on Element ', end='')
- print(self.index)
- for model in self.models:
- model.process_message(source, key, data)
-
- @dbus.service.method(MESH_ELEMENT_IFACE,
- in_signature="qa{sv}", out_signature="")
-
- def UpdateModelConfiguration(self, model_id, config):
- print('UpdateModelConfig ', end='')
- print(hex(model_id))
- for model in self.models:
- if model_id == model.get_id():
- model.set_config(config)
- return
-
- @dbus.service.method(MESH_ELEMENT_IFACE,
- in_signature="", out_signature="")
-
- def get_path(self):
- return dbus.ObjectPath(self.path)
-
-class Model():
- def __init__(self, model_id):
- self.cmd_ops = []
- self.model_id = model_id
- self.vendor = VENDOR_ID_NONE
- self.bindings = []
- self.pub_period = 0
- self.pub_id = 0
- self.path = None
-
- def set_path(self, path):
- self.path = path
-
- def get_id(self):
- return self.model_id
-
- def get_vendor(self):
- return self.vendor
-
- def process_message(self, source, key, data):
- print('Model process message')
-
- def set_publication(self, period):
- self.pub_period = period
-
- def set_config(self, config):
- if 'Bindings' in config:
- self.bindings = config.get('Bindings')
- print('Bindings: ', end='')
- print(self.bindings)
- if 'PublicationPeriod' in config:
- self.set_publication(config.get('PublicationPeriod'))
- print('Model publication period ', end='')
- print(self.pub_period, end='')
- print(' ms')
-
-class OnOffServer(Model):
- def __init__(self, model_id):
- Model.__init__(self, model_id)
- self.cmd_ops = { 0x8201, # get
- 0x8202, # set
- 0x8203 } # set unacknowledged
-
- print("OnOff Server ", end="")
- self.state = 0
- print('State ', end='')
- self.timer = PubTimer()
-
- def process_message(self, source, key, data):
- datalen = len(data)
- print('OnOff Server process message len ', datalen)
-
- if datalen!=2 and datalen!=3:
- return
-
- if datalen==2:
- op_tuple=struct.unpack('<H',bytes(data))
- opcode = op_tuple[0]
- if opcode != 0x8201:
- print(hex(opcode))
- return
- print('Get state')
- elif datalen==3:
- opcode,self.state=struct.unpack('<HB',bytes(data))
- if opcode != 0x8202 and opcode != 0x8203:
- print(hex(opcode))
- return
- print('Set state: ', end='')
- print(self.state)
-
- rsp_data = struct.pack('<HB', 0x8204, self.state)
- send_response(self.path, source, key, rsp_data)
-
- def publish(self):
- print('Publish')
- data = struct.pack('B', self.state)
- send_publication(self.path, self.model_id, data)
-
- def set_publication(self, period):
- if period == 0:
- self.pub_period = 0
- self.timer.cancel()
- return
-
- # We do not handle ms in this example
- if period < 1000:
- return
-
- self.pub_period = period
- self.timer.start(period/1000, self.publish)
-
-def attach_app_error_cb(error):
- print('Failed to register application: ' + str(error))
- mainloop.quit()
-
-def main():
-
- DBusGMainLoop(set_as_default=True)
-
- global bus
- bus = dbus.SystemBus()
- global mainloop
- global app
-
- mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
- "/org/bluez/mesh"),
- MESH_NETWORK_IFACE)
- mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
-
- app = Application(bus)
- first_ele = Element(bus, 0x00)
- first_ele.add_model(OnOffServer(0x1000))
- app.add_element(first_ele)
-
- mainloop = GObject.MainLoop()
-
- print('Attach')
- mesh_net.Attach(app.get_path(), token,
- reply_handler=attach_app_cb,
- error_handler=attach_app_error_cb)
-
- mainloop.run()
-
-if __name__ == '__main__':
- main()
diff --git a/test/test-mesh b/test/test-mesh
new file mode 100755
index 000000000..965959c84
--- /dev/null
+++ b/test/test-mesh
@@ -0,0 +1,706 @@
+#!/usr/bin/env python3
+
+###################################################################
+#
+# This is a unified test for BT Mesh
+#
+# To run the test:
+# test-mesh [token]
+#
+# 'token' is an optional argument. It must be a 16-digit
+# hexadecimal number. The token must be associated with
+# an existing node. The token is generated and assigned
+# to a node as a result of successful provisioning (see
+# explanation of "join" option).
+# When the token is set, the menu operations "attach"
+# and "remove" may be performed on a node specified
+# by this token.
+#
+# The test imitates a 2-element device:
+# element 0: OnOff Server model
+# element 1: OnOff Client model
+#
+# The main menu:
+# 1 - set node ID (token)
+# 2 - join mesh network
+# 3 - attach mesh node
+# 4 - remove node
+# 5 - exit
+#
+# The main menu options explained:
+# 1 - set token
+# Set the unique node token.
+# The token can be set from command line arguments as
+# well.
+#
+# 2 - join
+# Request provisioning of a device to become a node
+# on a mesh network. The test generates device UUID
+# which is displayed and will need to be provided to
+# an outside entity that acts as a Provisioner. Also,
+# during the provisioning process, an agent that is
+# part of the test, will request (or will be requested)
+# to perform a specified operation, e.g., a number will
+# be displayed and this number will need to be entered
+# on the Provisioner's side.
+# In case of successful provisioning, the application
+# automatically attaches as a node to the daemon. A node
+# 'token' is returned to the application and is used
+# for the runtime of the test.
+#
+# 3 - attach
+# Attach the application to bluetoothd-daemon as a node.
+# For the call to be successful, the valid node token must
+# be already set, either from command arguments or by
+# executing "set token" operation or automatically after
+# successfully executing "join" operation in the same test
+# run.
+#
+# 4 - remove
+# Permanently removes any node configuration from daemon
+# and persistent storage. After this operation, the node
+# is permanently forgotten by the daemon and the associated
+# node token is no longer valid.
+#
+# 5 - exit
+#
+###################################################################
+import sys
+import struct
+import fcntl
+import os
+import numpy
+import random
+import dbus
+import dbus.service
+import dbus.exceptions
+
+from threading import Timer
+import time
+
+try:
+ from gi.repository import GLib
+except ImportError:
+ import glib as GLib
+from dbus.mainloop.glib import DBusGMainLoop
+
+try:
+ from termcolor import colored, cprint
+ set_error = lambda x: colored('!' + x, 'red', attrs=['bold'])
+ set_cyan = lambda x: colored(x, 'cyan', attrs=['bold'])
+ set_green = lambda x: colored(x, 'green', attrs=['bold'])
+ set_yellow = lambda x: colored(x, 'yellow', attrs=['bold'])
+except ImportError:
+ print('** Install termcolor module for better experience) **')
+ set_error = lambda x: x
+ set_cyan = lambda x: x
+ set_green = lambda x: x
+ set_yellow = lambda x: x
+
+# Provisioning agent
+try:
+ import agent
+except ImportError:
+ agent = None
+
+MESH_SERVICE_NAME = 'org.bluez.mesh'
+DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
+DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
+
+MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
+MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
+MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1'
+MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
+
+APP_COMPANY_ID = 0x05f1
+APP_PRODUCT_ID = 0x0001
+APP_VERSION_ID = 0x0001
+
+VENDOR_ID_NONE = 0xffff
+
+app = None
+bus = None
+mainloop = None
+node = None
+mesh_net = None
+
+# Node token housekeeping
+token = None
+token_input = False
+have_token = False
+
+def array_to_string(b_array):
+ str = ""
+ for b in b_array:
+ str += "%02x" % b
+ return str
+
+def generic_error_cb(error):
+ print(set_error('D-Bus call failed: ') + str(error))
+
+def generic_reply_cb():
+ return
+
+def attach_app_error_cb(error):
+ print(set_error('Failed to register application: ') + str(error))
+
+def attach(token):
+ print('Attach mesh node to bluetooth-meshd daemon')
+
+ mesh_net.Attach(app.get_path(), token,
+ reply_handler=attach_app_cb,
+ error_handler=attach_app_error_cb)
+
+def join_cb():
+ print('Join procedure started')
+
+def join_error_cb(reason):
+ print('Join procedure failed: ', reason)
+
+def unwrap(item):
+ if isinstance(item, dbus.Boolean):
+ return bool(item)
+ if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32,
+ dbus.UInt64, dbus.Int64)):
+ return int(item)
+ if isinstance(item, dbus.Byte):
+ return bytes([int(item)])
+ if isinstance(item, dbus.String):
+ return item
+ if isinstance(item, (dbus.Array, list, tuple)):
+ return [unwrap(x) for x in item]
+ if isinstance(item, (dbus.Dictionary, dict)):
+ return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
+
+ print(set_error('Dictionary item not handled: ') + type(item))
+
+ return item
+
+def attach_app_cb(node_path, dict_array):
+ print('Mesh application registered ', node_path)
+
+ obj = bus.get_object(MESH_SERVICE_NAME, node_path)
+
+ global node
+ node = dbus.Interface(obj, MESH_NODE_IFACE)
+
+ els = unwrap(dict_array)
+
+ for el in els:
+ idx = struct.unpack('b', el[0])[0]
+
+ models = el[1]
+ element = app.get_element(idx)
+ element.set_model_config(models)
+
+def interfaces_removed_cb(object_path, interfaces):
+ print('Removed')
+ if not mesh_net:
+ return
+
+ print(object_path)
+ if object_path == mesh_net[2]:
+ print('Service was removed')
+ app_exit()
+
+def send_response(path, dest, key, data):
+ node.Send(path, dest, key, data, reply_handler=generic_reply_cb,
+ error_handler=generic_error_cb)
+
+def send_publication(path, model_id, data):
+ print('Send publication ', end='')
+ print(data)
+ node.Publish(path, model_id, data,
+ reply_handler=generic_reply_cb,
+ error_handler=generic_error_cb)
+
+def print_state(state):
+ print('State is ', end='')
+ if state == 0:
+ print('OFF')
+ elif state == 1:
+ print('ON')
+ else:
+ print('UNKNOWN')
+class PubTimer():
+ def __init__(self):
+ self.seconds = None
+ self.func = None
+ self.thread = None
+ self.busy = False
+
+ def _timeout_cb(self):
+ self.func()
+ self.busy = True
+ self._schedule_timer()
+ self.busy =False
+
+ def _schedule_timer(self):
+ self.thread = Timer(self.seconds, self._timeout_cb)
+ self.thread.start()
+
+ def start(self, seconds, func):
+ self.func = func
+ self.seconds = seconds
+ if not self.busy:
+ self._schedule_timer()
+
+ def cancel(self):
+ if self.thread is not None:
+ self.thread.cancel()
+ self.thread = None
+
+class Application(dbus.service.Object):
+
+ def __init__(self, bus):
+ self.path = '/example'
+ self.agent = None
+ self.elements = []
+ dbus.service.Object.__init__(self, bus, self.path)
+
+ def set_agent(self, agent):
+ self.agent = agent
+
+ def get_path(self):
+ return dbus.ObjectPath(self.path)
+
+ def add_element(self, element):
+ self.elements.append(element)
+
+ def get_element(self, idx):
+ for ele in self.elements:
+ if ele.get_index() == idx:
+ return ele
+
+ def get_properties(self):
+ return {
+ MESH_APPLICATION_IFACE: {
+ 'CompanyID': dbus.UInt16(APP_COMPANY_ID),
+ 'ProductID': dbus.UInt16(APP_PRODUCT_ID),
+ 'VersionID': dbus.UInt16(APP_VERSION_ID)
+ }
+ }
+
+ @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
+ def GetManagedObjects(self):
+ response = {}
+ response[self.path] = self.get_properties()
+ response[self.agent.get_path()] = self.agent.get_properties()
+ for element in self.elements:
+ response[element.get_path()] = element.get_properties()
+ return response
+
+ @dbus.service.method(MESH_APPLICATION_IFACE,
+ in_signature="t", out_signature="")
+ def JoinComplete(self, value):
+ global token
+ global have_token
+
+ print('JoinComplete with token ' + set_green(hex(value)))
+
+ token = value
+ have_token = True
+
+ attach(token)
+
+ @dbus.service.method(MESH_APPLICATION_IFACE,
+ in_signature="s", out_signature="")
+ def JoinFailed(self, value):
+ print(set_error('JoinFailed '), value)
+
+
+class Element(dbus.service.Object):
+ PATH_BASE = '/example/ele'
+
+ def __init__(self, bus, index):
+ self.path = self.PATH_BASE + format(index, '02x')
+ self.models = []
+ self.bus = bus
+ self.index = index
+ dbus.service.Object.__init__(self, bus, self.path)
+
+ def _get_sig_models(self):
+ ids = []
+ for model in self.models:
+ id = model.get_id()
+ vendor = model.get_vendor()
+ if vendor == VENDOR_ID_NONE:
+ ids.append(id)
+ return ids
+
+ def get_properties(self):
+ return {
+ MESH_ELEMENT_IFACE: {
+ 'Index': dbus.Byte(self.index),
+ 'Models': dbus.Array(
+ self._get_sig_models(), signature='q')
+ }
+ }
+
+ def add_model(self, model):
+ model.set_path(self.path)
+ self.models.append(model)
+
+ def get_index(self):
+ return self.index
+
+ def set_model_config(self, configs):
+ for config in configs:
+ mod_id = config[0]
+ self.UpdateModelConfiguration(mod_id, config[1])
+
+ @dbus.service.method(MESH_ELEMENT_IFACE,
+ in_signature="qqbay", out_signature="")
+ def MessageReceived(self, source, key, is_sub, data):
+ print('Message Received on Element ', end='')
+ print(self.index)
+ for model in self.models:
+ model.process_message(source, key, data)
+
+ @dbus.service.method(MESH_ELEMENT_IFACE,
+ in_signature="qa{sv}", out_signature="")
+
+ def UpdateModelConfiguration(self, model_id, config):
+ print('UpdateModelConfig ', end='')
+ print(hex(model_id))
+ for model in self.models:
+ if model_id == model.get_id():
+ model.set_config(config)
+ return
+
+ @dbus.service.method(MESH_ELEMENT_IFACE,
+ in_signature="", out_signature="")
+
+ def get_path(self):
+ return dbus.ObjectPath(self.path)
+
+class Model():
+ def __init__(self, model_id):
+ self.cmd_ops = []
+ self.model_id = model_id
+ self.vendor = VENDOR_ID_NONE
+ self.bindings = []
+ self.pub_period = 0
+ self.pub_id = 0
+ self.path = None
+ self.timer = None
+
+ def set_path(self, path):
+ self.path = path
+
+ def get_id(self):
+ return self.model_id
+
+ def get_vendor(self):
+ return self.vendor
+
+ def process_message(self, source, key, data):
+ return
+
+ def set_publication(self, period):
+ self.pub_period = period
+
+ def set_config(self, config):
+ if 'Bindings' in config:
+ self.bindings = config.get('Bindings')
+ print('Bindings: ', end='')
+ print(self.bindings)
+ if 'PublicationPeriod' in config:
+ self.set_publication(config.get('PublicationPeriod'))
+ print('Model publication period ', end='')
+ print(self.pub_period, end='')
+ print(' ms')
+
+########################
+# On Off Server Model
+########################
+class OnOffServer(Model):
+ def __init__(self, model_id):
+ Model.__init__(self, model_id)
+ self.cmd_ops = { 0x8201, # get
+ 0x8202, # set
+ 0x8203, # set unacknowledged
+ 0x8204 } # status
+
+ print("OnOff Server ")
+ self.state = 0
+ print_state(self.state)
+ self.timer = PubTimer()
+
+ def process_message(self, source, key, data):
+ datalen = len(data)
+ print('OnOff Server process message len: ', datalen)
+
+ if datalen != 2 and datalen != 3:
+ # The opcode is not recognized by this model
+ return
+
+ if datalen == 2:
+ op_tuple=struct.unpack('<H',bytes(data))
+ opcode = op_tuple[0]
+ if opcode != 0x8201:
+ # The opcode is not recognized by this model
+ return
+ print('Get state')
+ elif datalen == 3:
+ opcode,self.state=struct.unpack('<HB',bytes(data))
+ if opcode != 0x8202 and opcode != 0x8203:
+ # The opcode is not recognized by this model
+ return
+ print_state(self.state)
+
+ rsp_data = struct.pack('<HB', 0x8204, self.state)
+ send_response(self.path, source, key, rsp_data)
+
+ def publish(self):
+ print('Publish')
+ data = struct.pack('<HB', 0x8204, self.state)
+ send_publication(self.path, self.model_id, data)
+
+ def set_publication(self, period):
+ if period == 0:
+ self.pub_period = 0
+ self.timer.cancel()
+ return
+
+ # We do not handle ms in this example
+ if period < 1000:
+ return
+
+ self.pub_period = period
+ self.timer.start(period/1000, self.publish)
+
+########################
+# On Off Client Model
+########################
+class OnOffClient(Model):
+ def __init__(self, model_id):
+ Model.__init__(self, model_id)
+ self.cmd_ops = { 0x8201, # get
+ 0x8202, # set
+ 0x8203, # set unacknowledged
+ 0x8204 } # status
+ print('OnOff Client')
+
+ def _reply_cb(state):
+ print('State ', end='');
+ print(state)
+
+ def _send_message(self, dest, key, data, reply_cb):
+ print('OnOffClient send data')
+ node.Send(self.path, dest, key, data, reply_handler=reply_cb,
+ error_handler=generic_error_cb)
+
+ def get_state(self, dest, key):
+ opcode = 0x8201
+ data = struct.pack('<H', opcode)
+ self._send_message(dest, key, data, self._reply_cb)
+
+ def set_state(self, dest, key, state):
+ opcode = 0x8202
+ data = struct.pack('<HB', opcode, state)
+ self._send_message(dest, key, data, self._reply_cb)
+
+ def process_message(self, source, key, data):
+ print('OnOffClient process message len = ', end = '')
+ datalen = len(data)
+ print(datalen)
+
+ if datalen != 3:
+ # The opcode is not recognized by this model
+ return
+
+ opcode, state=struct.unpack('<HB',bytes(data))
+ print(opcode)
+ if opcode != 0x8204 :
+ # The opcode is not recognized by this model
+ return
+
+ print(set_yellow('Got state '), end = '')
+
+ state_str = "ON"
+ if hex(state) == 0:
+ state_str = "OFF"
+
+ print(set_yellow(state_str))
+
+########################
+# Menu functions
+########################
+class MenuDriver(object):
+ def __init__(self, callback):
+ self.cb = callback
+ flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
+ flags |= os.O_NONBLOCK
+ fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags)
+ sys.stdin.flush()
+ GLib.io_add_watch(sys.stdin, GLib.IO_IN, self.input_callback)
+
+ def input_callback(self, fd, condition):
+ chunk = fd.read()
+ buffer = ''
+ for char in chunk:
+ buffer += char
+ if char == '\n':
+ self.cb(buffer)
+
+ return True
+
+def process_input(input_str):
+ global token
+ global token_input
+ global have_token
+
+ str = input_str.strip()
+
+ if token_input == True:
+ res = set_token(str)
+ token_input = False
+
+ if res == False:
+ main_menu()
+
+ return
+
+ # Allow entering empty lines for better output visibility
+ if len(str) == 0:
+ return
+
+ if str.isdigit() == False:
+ main_menu()
+ return
+
+ opt = int(str)
+
+ if opt > 5:
+ print(set_error('Unknown menu option: '), opt)
+ main_menu()
+ elif opt == 1:
+ if have_token:
+ print('Token already set')
+ return
+
+ token_input = True;
+ print(set_cyan('Enter 16-digit hex node ID:'))
+ elif opt == 2:
+ if agent == None:
+ print(set_error('Provisioning agent not found'))
+ return
+
+ join_mesh()
+ elif opt == 3:
+ if have_token == False:
+ print(set_error('Token is not set'))
+ main_menu()
+ return
+
+ attach(token)
+ elif opt == 4:
+ if have_token == False:
+ print(set_error('Token is not set'))
+ main_menu()
+ return
+
+ print('Remove mesh node')
+ mesh_net.Leave(token, reply_handler=generic_reply_cb,
+ error_handler=generic_error_cb)
+ have_token = False
+ elif opt == 5:
+ app_exit()
+
+def main_menu():
+ print(set_cyan('1 - set node ID (token)'))
+ print(set_cyan('2 - join mesh network'))
+ print(set_cyan('3 - attach mesh node'))
+ print(set_cyan('4 - remove node'))
+ print(set_cyan('5 - exit'))
+
+def set_token(str):
+ global token
+ global have_token
+
+ if len(str) != 16:
+ print(set_error('Expected 16 digits'))
+ return False
+
+ try:
+ input_number = int(str, 16)
+ except ValueError:
+ print(set_error('Not a valid hexadecimal number'))
+ return False
+
+ token = numpy.uint64(input_number)
+ have_token = True
+
+ return True
+
+def join_mesh():
+ uuid = bytearray.fromhex("0a0102030405060708090A0B0C0D0E0F")
+
+ caps = ["out-numeric"]
+ oob = ["other"]
+
+ random.shuffle(uuid)
+ uuid_str = array_to_string(uuid)
+ print('Joining with UUID ' + set_green(uuid_str))
+
+ mesh_net.Join(app.get_path(), uuid,
+ reply_handler=join_cb,
+ error_handler=join_error_cb)
+
+def app_exit():
+ global mainloop
+ global app
+
+ for el in app.elements:
+ for model in el.models:
+ if model.timer != None:
+ model.timer.cancel()
+ mainloop.quit()
+
+########################
+# Main entry
+########################
+def main():
+
+ DBusGMainLoop(set_as_default=True)
+
+ global bus
+ bus = dbus.SystemBus()
+ global mainloop
+ global app
+ global mesh_net
+
+ if len(sys.argv) > 1 :
+ set_token(sys.argv[1])
+
+ mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
+ "/org/bluez/mesh"),
+ MESH_NETWORK_IFACE)
+ mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
+
+ app = Application(bus)
+
+ # Provisioning agent
+ if agent != None:
+ app.set_agent(agent.Agent(bus))
+
+ first_ele = Element(bus, 0x00)
+ second_ele = Element(bus, 0x01)
+
+ print(set_yellow('Register OnOff Server model on element 0'))
+ first_ele.add_model(OnOffServer(0x1000))
+
+ print(set_yellow('Register OnOff Client model on element 1'))
+ second_ele.add_model(OnOffClient(0x1001))
+ app.add_element(first_ele)
+ app.add_element(second_ele)
+
+ mainloop = GLib.MainLoop()
+
+ main_menu()
+ event_catcher = MenuDriver(process_input);
+ mainloop.run()
+
+if __name__ == '__main__':
+ main()
--
2.17.2