679
696
return new_object
699
class dbussy_adapter:
700
class SystemBus(dbus.SystemBus):
703
def __init__(self, dbussy, ravel):
706
self.bus = ravel.system_bus()
708
@contextlib.contextmanager
709
def convert_exception(self, exception_class=dbus.Error):
712
except self.dbussy.DBusError as e:
713
# This does what "raise from" would do
714
exc = exception_class(*e.args)
718
def call_method(self, methodname, busname, objectpath,
720
proxy_object = self.get_object(busname, objectpath)
721
log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
722
interface, methodname,
723
", ".join(repr(a) for a in args))
724
iface = proxy_object.get_interface(interface)
725
method = getattr(iface, methodname)
726
with self.convert_exception(dbus.Error):
727
value = method(*args)
728
# DBussy returns values either as an empty list or as a
729
# list of one element with the return value
731
return self.type_filter(value[0])
733
def get_object(self, busname, objectpath):
734
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
736
with self.convert_exception(dbus.ConnectFailed):
737
return self.bus[busname][objectpath]
739
def type_filter(self, value):
740
"""Convert the most bothersome types to Python types"""
741
# A D-Bus Variant value is represented as the Python type
742
# Tuple[dbussy.DBUS.Signature, Any]
743
if isinstance(value, tuple):
745
and isinstance(value[0],
746
self.dbussy.DBUS.Signature)):
747
return self.type_filter(value[1])
748
elif isinstance(value, self.dbussy.DBUS.ObjectPath):
750
# Also recurse into dictionaries
751
elif isinstance(value, dict):
752
return {self.type_filter(key):
753
self.type_filter(subval)
754
for key, subval in value.items()}
757
def set_property(self, busname, objectpath, interface, key,
759
proxy_object = self.get_object(busname, objectpath)
760
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
761
objectpath, self.properties_iface, interface,
764
# DBussy wants a Byte Array to be a sequence of
765
# values, not a byte string
767
setattr(proxy_object.get_interface(interface), key, value)
769
class MandosBus(SystemBus, dbus.MandosBus):
772
class CachingBus(MandosBus):
773
"""A caching layer for dbussy_adapter.MandosBus"""
774
def __init__(self, *args, **kwargs):
775
self.object_cache = {}
776
super(dbussy_adapter.CachingBus, self).__init__(*args,
778
def get_object(self, busname, objectpath):
780
return self.object_cache[(busname, objectpath)]
783
dbussy_adapter.CachingBus,
784
self).get_object(busname, objectpath)
785
self.object_cache[(busname, objectpath)] = new_object
682
789
def commands_from_options(options):
684
791
commands = list(options.commands)
1771
1878
self.assertIs(obj1, obj1b)
1881
class Test_dbussy_adapter_SystemBus(TestCaseWithAssertLogs):
1885
class ObjectPath(str):
1887
class DBusError(Exception):
1890
def fake_ravel_func(self, func):
1894
class DBusInterfaceProxy:
1896
def methodname(*args):
1897
return [func(*args)]
1900
def get_interface(interface):
1901
if interface == "interface":
1902
return DBusInterfaceProxy()
1903
return {"busname": {"objectpath": DBusObject()}}
1906
def call_method(self, bus, methodname, busname, objectpath,
1908
with self.assertLogs(log, logging.DEBUG):
1909
return bus.call_method(methodname, busname, objectpath,
1912
def test_call_method_returns(self):
1913
expected_method_return = Unique()
1914
method_args = (Unique(), Unique())
1916
self.assertEqual(len(method_args), len(args))
1917
for marg, arg in zip(method_args, args):
1918
self.assertIs(marg, arg)
1919
return expected_method_return
1920
fake_ravel = self.fake_ravel_func(func)
1921
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1922
ret = self.call_method(bus, "methodname", "busname",
1923
"objectpath", "interface",
1925
self.assertIs(ret, expected_method_return)
1927
def test_call_method_filters_objectpath(self):
1929
return method_return
1930
fake_ravel = self.fake_ravel_func(func)
1931
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1932
method_return = (self.dummy_dbussy.DBUS
1933
.ObjectPath("objectpath"))
1934
ret = self.call_method(bus, "methodname", "busname",
1935
"objectpath", "interface")
1936
self.assertEqual("objectpath", ret)
1937
self.assertNotIsInstance(ret,
1938
self.dummy_dbussy.DBUS.ObjectPath)
1940
def test_call_method_filters_objectpaths_in_dict(self):
1941
ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1943
return method_return
1944
fake_ravel = self.fake_ravel_func(func)
1945
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1947
ObjectPath("objectpath_key_1"):
1948
ObjectPath("objectpath_value_1"),
1949
ObjectPath("objectpath_key_2"):
1950
ObjectPath("objectpath_value_2"),
1952
ret = self.call_method(bus, "methodname", "busname",
1953
"objectpath", "interface")
1954
expected_method_return = {str(key): str(value)
1956
method_return.items()}
1957
for key, value in ret.items():
1958
self.assertNotIsInstance(key, ObjectPath)
1959
self.assertNotIsInstance(value, ObjectPath)
1960
self.assertEqual(expected_method_return, ret)
1961
self.assertIsInstance(ret, dict)
1963
def test_call_method_filters_objectpaths_in_dict_in_dict(self):
1964
ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1966
return method_return
1967
fake_ravel = self.fake_ravel_func(func)
1968
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1970
ObjectPath("key1"): {
1971
ObjectPath("key11"): ObjectPath("value11"),
1972
ObjectPath("key12"): ObjectPath("value12"),
1974
ObjectPath("key2"): {
1975
ObjectPath("key21"): ObjectPath("value21"),
1976
ObjectPath("key22"): ObjectPath("value22"),
1979
ret = self.call_method(bus, "methodname", "busname",
1980
"objectpath", "interface")
1981
expected_method_return = {
1982
"key1": {"key11": "value11",
1983
"key12": "value12"},
1984
"key2": {"key21": "value21",
1985
"key22": "value22"},
1987
self.assertEqual(expected_method_return, ret)
1988
for key, value in ret.items():
1989
self.assertIsInstance(value, dict)
1990
self.assertEqual(expected_method_return[key], value)
1991
self.assertNotIsInstance(key, ObjectPath)
1992
for inner_key, inner_value in value.items():
1993
self.assertIsInstance(value, dict)
1995
expected_method_return[key][inner_key],
1997
self.assertNotIsInstance(key, ObjectPath)
1999
def test_call_method_filters_objectpaths_in_dict_three_deep(self):
2000
ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
2002
return method_return
2003
fake_ravel = self.fake_ravel_func(func)
2004
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
2006
ObjectPath("key1"): {
2007
ObjectPath("key2"): {
2008
ObjectPath("key3"): ObjectPath("value"),
2012
ret = self.call_method(bus, "methodname", "busname",
2013
"objectpath", "interface")
2014
expected_method_return = {"key1": {"key2": {"key3": "value"}}}
2015
self.assertEqual(expected_method_return, ret)
2016
self.assertIsInstance(ret, dict)
2017
self.assertNotIsInstance(next(iter(ret.keys())), ObjectPath)
2018
self.assertIsInstance(ret["key1"], dict)
2019
self.assertNotIsInstance(next(iter(ret["key1"].keys())),
2021
self.assertIsInstance(ret["key1"]["key2"], dict)
2022
self.assertNotIsInstance(
2023
next(iter(ret["key1"]["key2"].keys())),
2025
self.assertEqual("value", ret["key1"]["key2"]["key3"])
2026
self.assertNotIsInstance(ret["key1"]["key2"]["key3"],
2027
self.dummy_dbussy.DBUS.ObjectPath)
2029
def test_call_method_handles_exception(self):
2031
raise self.dummy_dbussy.DBusError()
2033
fake_ravel = self.fake_ravel_func(func)
2034
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
2036
with self.assertRaises(dbus.Error) as e:
2037
self.call_method(bus, "methodname", "busname",
2038
"objectpath", "interface")
2040
self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
2042
def test_get_object_converts_to_correct_exception(self):
2043
class fake_ravel_raises_exception_on_connect:
2048
def __getitem__(key):
2049
if key == "objectpath":
2050
raise self.dummy_dbussy.DBusError()
2051
raise Exception(key)
2052
return {"busname": Bus()}
2054
raise self.dummy_dbussy.DBusError()
2055
bus = dbussy_adapter.SystemBus(
2057
fake_ravel_raises_exception_on_connect)
2058
with self.assertRaises(dbus.ConnectFailed):
2059
self.call_method(bus, "methodname", "busname",
2060
"objectpath", "interface")
1774
2063
class Test_commands_from_options(unittest.TestCase):
1776
2065
def setUp(self):
2158
2448
busname = "se.recompile.Mandos"
2159
2449
client_interface = "se.recompile.Mandos.Client"
2160
2450
command.Deny().run(self.bus.clients, self.bus)
2451
self.assertTrue(self.bus.clients)
2161
2452
for clientpath in self.bus.clients:
2162
2453
self.assertIn(("Approve", busname, clientpath,
2163
2454
client_interface, (False,)),
2164
2455
self.bus.calls)
2166
2457
def test_Remove(self):
2458
busname = "se.recompile.Mandos"
2460
server_interface = "se.recompile.Mandos"
2461
orig_clients = self.bus.clients.copy()
2167
2462
command.Remove().run(self.bus.clients, self.bus)
2168
for clientpath in self.bus.clients:
2169
self.assertIn(("RemoveClient", dbus_busname,
2170
dbus_server_path, dbus_server_interface,
2463
self.assertFalse(self.bus.clients)
2464
for clientpath in orig_clients:
2465
self.assertIn(("RemoveClient", busname,
2466
server_path, server_interface,
2171
2467
(clientpath,)), self.bus.calls)
2173
2469
expected_json = {