Return-Path: From: Mikel Astiz To: linux-bluetooth@vger.kernel.org Cc: Mikel Astiz Subject: [PATCH obexd v1 13/15] client-test: pbap-client uses new API Date: Mon, 4 Jun 2012 11:37:56 +0200 Message-Id: <1338802678-7009-14-git-send-email-mikel.astiz.oss@gmail.com> In-Reply-To: <1338802678-7009-1-git-send-email-mikel.astiz.oss@gmail.com> References: <1338802678-7009-1-git-send-email-mikel.astiz.oss@gmail.com> Sender: linux-bluetooth-owner@vger.kernel.org List-ID: From: Mikel Astiz --- test/pbap-client | 187 +++++++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 151 insertions(+), 36 deletions(-) diff --git a/test/pbap-client b/test/pbap-client index ecca14f..2432acf 100755 --- a/test/pbap-client +++ b/test/pbap-client @@ -1,41 +1,156 @@ #!/usr/bin/python +import gobject + import sys +import os import dbus +import dbus.service +import dbus.mainloop.glib + +class Transfer: + def __init__(self, callback_func): + self.callback_func = callback_func + self.path = None + self.filename = None + +class PbapClient: + def __init__(self, session_path): + self.pending_transfers = 0 + self.transfer_dict = dict() + self.flush_func = None + bus = dbus.SessionBus() + self.session = dbus.Interface(bus.get_object("org.openobex.client", + session_path), + "org.openobex.Session") + self.pbap = dbus.Interface(bus.get_object("org.openobex.client", + session_path), + "org.openobex.PhonebookAccess") + bus.add_signal_receiver( + self.transfer_complete, + dbus_interface="org.openobex.Transfer", + signal_name="Complete", + path_keyword="path") + bus.add_signal_receiver( + self.transfer_error, + dbus_interface="org.openobex.Transfer", + signal_name="Error", + path_keyword="path") + + def register(self, reply, transfer): + (path, properties) = reply + transfer.path = path + transfer.filename = properties["Filename"] + self.transfer_dict[path] = transfer + print "Transfer created: %s (file %s)" % (path, transfer.filename) + + def error(self, err): + print err + mainloop.quit() + + def transfer_complete(self, path): + req = self.transfer_dict.get(path) + if req == None: + return + self.pending_transfers -= 1 + print "Transfer %s finished" % path + f = open(req.filename, "r") + os.remove(req.filename) + lines = f.readlines() + del self.transfer_dict[path] + req.callback_func(lines) + + if (len(self.transfer_dict) == 0) and (self.pending_transfers == 0): + if self.flush_func != None: + f = self.flush_func + self.flush_func = None + f() + + def transfer_error(self, code, message, path): + req = self.transfer_dict.get(path) + if req == None: + return + print "Transfer finished with error %s: %s" % (code, message) + mainloop.quit() + + def pull(self, vcard, func): + req = Transfer(func) + self.pbap.Pull(vcard, "", + reply_handler=lambda r: self.register(r, req), + error_handler=self.error) + self.pending_transfers += 1 + + def pull_all(self, func): + req = Transfer(func) + self.pbap.PullAll("", + reply_handler=lambda r: self.register(r, req), + error_handler=self.error) + self.pending_transfers += 1 + + def flush_transfers(self, func): + if (len(self.transfer_dict) == 0) and (self.pending_transfers == 0): + return + self.flush_func = func + + def interface(self): + return self.pbap + +if __name__ == '__main__': + + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + bus = dbus.SessionBus() + mainloop = gobject.MainLoop() + + client = dbus.Interface(bus.get_object("org.openobex.client", "/"), + "org.openobex.Client") + + if (len(sys.argv) < 2): + print "Usage: %s " % (sys.argv[0]) + sys.exit(1) + + print "Creating Session" + session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" }) + + pbap_client = PbapClient(session_path) + + def process_result(lines, header): + if header != None: + print header + for line in lines: + print line, + print + + def test_paths(paths): + if len(paths) == 0: + print + print "FINISHED" + mainloop.quit() + return + + path = paths[0] + + print "\n--- Select Phonebook %s ---\n" % (path) + pbap_client.interface().Select("int", path) + + print "\n--- GetSize ---\n" + ret = pbap_client.interface().GetSize() + print "Size = %d\n" % (ret) + + print "\n--- List vCard ---\n" + ret = pbap_client.interface().List() + for item in ret: + print "%s : %s" % (item[0], item[1]) + pbap_client.interface().SetFormat("vcard30") + pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]); + pbap_client.pull(item[0], lambda x: process_result(x, None)) + + pbap_client.interface().SetFormat("vcard30") + pbap_client.interface().SetFilter(["VERSION", "FN", "TEL"]); + pbap_client.pull_all(lambda x: process_result(x, "\n--- PullAll ---\n")) + + pbap_client.flush_transfers(lambda: test_paths(paths[1:])) + + test_paths(["PB", "ICH", "OCH", "MCH", "CCH"]) -bus = dbus.SessionBus() - -client = dbus.Interface(bus.get_object("org.openobex.client", "/"), - "org.openobex.Client") - -print "Creating Session" -session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" }) -pbap = dbus.Interface(bus.get_object("org.openobex.client", session_path), - "org.openobex.PhonebookAccess") -session = dbus.Interface(bus.get_object("org.openobex.client", session_path), - "org.openobex.Session") - -paths = ["PB", "ICH", "OCH", "MCH", "CCH"] - -for path in paths: - print "\n--- Select Phonebook %s ---\n" % (path) - pbap.Select("int", path) - - print "\n--- GetSize ---\n" - ret = pbap.GetSize() - print "Size = %d\n" % (ret) - - print "\n--- List vCard ---\n" - ret = pbap.List() - for item in ret: - print "%s : %s" % (item[0], item[1]) - pbap.SetFormat("vcard30") - pbap.SetFilter(["VERSION", "FN", "TEL"]); - ret = pbap.Pull(item[0]) - print "%s" % (ret) - - print "\n--- PullAll ---\n" - pbap.SetFormat("vcard30") - pbap.SetFilter(["VERSION", "FN", "TEL"]); - ret = pbap.PullAll() - print "%s" % (ret) + mainloop.run() -- 1.7.7.6