691
679
return new_object
694
class dbussy_adapter:
695
class SystemBus(dbus.SystemBus):
698
def __init__(self, dbussy, ravel):
701
self.bus = ravel.system_bus()
703
@contextlib.contextmanager
704
def convert_exception(self, exception_class=dbus.Error):
707
except self.dbussy.DBusError as e:
708
# This does what "raise from" would do
709
exc = exception_class(*e.args)
713
def call_method(self, methodname, busname, objectpath,
715
proxy_object = self.get_object(busname, objectpath)
716
log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
717
interface, methodname,
718
", ".join(repr(a) for a in args))
719
iface = proxy_object.get_interface(interface)
720
method = getattr(iface, methodname)
721
with self.convert_exception(dbus.Error):
722
value = method(*args)
723
# DBussy returns values either as an empty list or as a
724
# tuple: (signature, value)
726
return self.type_filter(value[0])
728
def get_object(self, busname, objectpath):
729
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
731
with self.convert_exception(dbus.ConnectFailed):
732
return self.bus[busname][objectpath]
734
def type_filter(self, value):
735
"""Convert the most bothersome types to Python types"""
736
if isinstance(value, tuple):
738
and isinstance(value[0],
739
self.dbussy.DBUS.Signature)):
740
return self.type_filter(value[1])
741
elif isinstance(value, self.dbussy.DBUS.ObjectPath):
743
# Also recurse into dictionaries
744
elif isinstance(value, dict):
745
return {self.type_filter(key):
746
self.type_filter(subval)
747
for key, subval in value.items()}
750
def set_property(self, busname, objectpath, interface, key,
752
proxy_object = self.get_object(busname, objectpath)
753
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
754
objectpath, self.properties_iface, interface,
757
# DBussy wants a Byte Array to be a sequence of
758
# values, not a byte string
760
setattr(proxy_object.get_interface(interface), key, value)
762
class MandosBus(SystemBus, dbus.MandosBus):
765
class CachingBus(MandosBus):
766
"""A caching layer for dbussy_adapter.MandosBus"""
767
def __init__(self, *args, **kwargs):
768
self.object_cache = {}
769
super(dbussy_adapter.CachingBus, self).__init__(*args,
771
def get_object(self, busname, objectpath):
773
return self.object_cache[(busname, objectpath)]
776
dbussy_adapter.CachingBus,
777
self).get_object(busname, objectpath)
778
self.object_cache[(busname, objectpath)] = new_object
782
682
def commands_from_options(options):
784
684
commands = list(options.commands)
1871
1771
self.assertIs(obj1, obj1b)
1874
class Test_dbussy_adapter_SystemBus(TestCaseWithAssertLogs):
1878
class ObjectPath(str):
1880
class DBusError(Exception):
1883
def fake_ravel_func(self, func):
1887
class DBusInterfaceProxy:
1889
def methodname(*args):
1890
return [func(*args)]
1893
def get_interface(interface):
1894
if interface == "interface":
1895
return DBusInterfaceProxy()
1896
return {"busname": {"objectpath": DBusObject()}}
1899
def call_method(self, bus, methodname, busname, objectpath,
1901
with self.assertLogs(log, logging.DEBUG):
1902
return bus.call_method(methodname, busname, objectpath,
1905
def test_call_method_returns(self):
1906
expected_method_return = Unique()
1907
method_args = (Unique(), Unique())
1909
self.assertEqual(len(method_args), len(args))
1910
for marg, arg in zip(method_args, args):
1911
self.assertIs(marg, arg)
1912
return expected_method_return
1913
fake_ravel = self.fake_ravel_func(func)
1914
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1915
ret = self.call_method(bus, "methodname", "busname",
1916
"objectpath", "interface",
1918
self.assertIs(ret, expected_method_return)
1920
def test_call_method_filters_objectpath(self):
1922
return method_return
1923
fake_ravel = self.fake_ravel_func(func)
1924
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1925
method_return = (self.dummy_dbussy.DBUS
1926
.ObjectPath("objectpath"))
1927
ret = self.call_method(bus, "methodname", "busname",
1928
"objectpath", "interface")
1929
self.assertEqual("objectpath", ret)
1930
self.assertNotIsInstance(ret,
1931
self.dummy_dbussy.DBUS.ObjectPath)
1933
def test_call_method_filters_objectpaths_in_dict(self):
1934
ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1936
return method_return
1937
fake_ravel = self.fake_ravel_func(func)
1938
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1940
ObjectPath("objectpath_key_1"):
1941
ObjectPath("objectpath_value_1"),
1942
ObjectPath("objectpath_key_2"):
1943
ObjectPath("objectpath_value_2"),
1945
ret = self.call_method(bus, "methodname", "busname",
1946
"objectpath", "interface")
1947
expected_method_return = {str(key): str(value)
1949
method_return.items()}
1950
for key, value in ret.items():
1951
self.assertNotIsInstance(key, ObjectPath)
1952
self.assertNotIsInstance(value, ObjectPath)
1953
self.assertEqual(expected_method_return, ret)
1954
self.assertIsInstance(ret, dict)
1956
def test_call_method_filters_objectpaths_in_dict_in_dict(self):
1957
ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1959
return method_return
1960
fake_ravel = self.fake_ravel_func(func)
1961
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1963
ObjectPath("key1"): {
1964
ObjectPath("key11"): ObjectPath("value11"),
1965
ObjectPath("key12"): ObjectPath("value12"),
1967
ObjectPath("key2"): {
1968
ObjectPath("key21"): ObjectPath("value21"),
1969
ObjectPath("key22"): ObjectPath("value22"),
1972
ret = self.call_method(bus, "methodname", "busname",
1973
"objectpath", "interface")
1974
expected_method_return = {
1975
"key1": {"key11": "value11",
1976
"key12": "value12"},
1977
"key2": {"key21": "value21",
1978
"key22": "value22"},
1980
self.assertEqual(expected_method_return, ret)
1981
for key, value in ret.items():
1982
self.assertIsInstance(value, dict)
1983
self.assertEqual(expected_method_return[key], value)
1984
self.assertNotIsInstance(key, ObjectPath)
1985
for inner_key, inner_value in value.items():
1986
self.assertIsInstance(value, dict)
1988
expected_method_return[key][inner_key],
1990
self.assertNotIsInstance(key, ObjectPath)
1992
def test_call_method_filters_objectpaths_in_dict_three_deep(self):
1993
ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1995
return method_return
1996
fake_ravel = self.fake_ravel_func(func)
1997
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1999
ObjectPath("key1"): {
2000
ObjectPath("key2"): {
2001
ObjectPath("key3"): ObjectPath("value"),
2005
ret = self.call_method(bus, "methodname", "busname",
2006
"objectpath", "interface")
2007
expected_method_return = {"key1": {"key2": {"key3": "value"}}}
2008
self.assertEqual(expected_method_return, ret)
2009
self.assertIsInstance(ret, dict)
2010
self.assertNotIsInstance(next(iter(ret.keys())), ObjectPath)
2011
self.assertIsInstance(ret["key1"], dict)
2012
self.assertNotIsInstance(next(iter(ret["key1"].keys())),
2014
self.assertIsInstance(ret["key1"]["key2"], dict)
2015
self.assertNotIsInstance(
2016
next(iter(ret["key1"]["key2"].keys())),
2018
self.assertEqual("value", ret["key1"]["key2"]["key3"])
2019
self.assertNotIsInstance(ret["key1"]["key2"]["key3"],
2020
self.dummy_dbussy.DBUS.ObjectPath)
2022
def test_call_method_handles_exception(self):
2024
raise self.dummy_dbussy.DBusError()
2026
fake_ravel = self.fake_ravel_func(func)
2027
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
2029
with self.assertRaises(dbus.Error) as e:
2030
self.call_method(bus, "methodname", "busname",
2031
"objectpath", "interface")
2033
self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
2035
def test_get_object_converts_to_correct_exception(self):
2036
class fake_ravel_raises_exception_on_connect:
2041
def __getitem__(key):
2042
if key == "objectpath":
2043
raise self.dummy_dbussy.DBusError()
2044
raise Exception(key)
2045
return {"busname": Bus()}
2047
raise self.dummy_dbussy.DBusError()
2048
bus = dbussy_adapter.SystemBus(
2050
fake_ravel_raises_exception_on_connect)
2051
with self.assertRaises(dbus.ConnectFailed):
2052
self.call_method(bus, "methodname", "busname",
2053
"objectpath", "interface")
2056
1774
class Test_commands_from_options(unittest.TestCase):
2058
1776
def setUp(self):