=== modified file 'mandos-ctl' --- mandos-ctl 2019-12-04 23:35:08 +0000 +++ mandos-ctl 2020-01-12 01:42:09 +0000 @@ -50,19 +50,27 @@ __metaclass__ = type str = unicode +class gi: + """Dummy gi module, for the tests""" + class repository: + class GLib: + class Error(Exception): + pass +dbussy = None +ravel = None +dbus_python = None +pydbus = None + try: - import pydbus - import gi - dbus_python = None + import dbussy + import ravel except ImportError: - import dbus as dbus_python - pydbus = None - class gi: - """Dummy gi module, for the tests""" - class repository: - class GLib: - class Error(Exception): - pass + try: + import pydbus + import gi + except ImportError: + import dbus as dbus_python + # Show warnings by default if not sys.warnoptions: @@ -96,7 +104,9 @@ if options.debug: log.setLevel(logging.DEBUG) - if pydbus is not None: + if dbussy is not None and ravel is not None: + bus = dbussy_adapter.CachingBus(dbussy, ravel) + elif pydbus is not None: bus = pydbus_adapter.CachingBus(pydbus) else: bus = dbus_python_adapter.CachingBus(dbus_python) @@ -487,7 +497,6 @@ self.properties_iface, interface, key, value) - class MandosBus(SystemBus): busname_domain = "se.recompile" busname = busname_domain + ".Mandos" @@ -682,6 +691,94 @@ return new_object +class dbussy_adapter: + class SystemBus(dbus.SystemBus): + """Use DBussy""" + + def __init__(self, dbussy, ravel): + self.dbussy = dbussy + self.ravel = ravel + self.bus = ravel.system_bus() + + @contextlib.contextmanager + def convert_exception(self, exception_class=dbus.Error): + try: + yield + except self.dbussy.DBusError as e: + # This does what "raise from" would do + exc = exception_class(*e.args) + exc.__cause__ = e + raise exc + + def call_method(self, methodname, busname, objectpath, + interface, *args): + proxy_object = self.get_object(busname, objectpath) + log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath, + interface, methodname, + ", ".join(repr(a) for a in args)) + iface = proxy_object.get_interface(interface) + method = getattr(iface, methodname) + with self.convert_exception(dbus.Error): + value = method(*args) + # DBussy returns values either as an empty list or as a + # tuple: (signature, value) + if value: + return self.type_filter(value[0]) + + def get_object(self, busname, objectpath): + log.debug("D-Bus: Connect to: (busname=%r, path=%r)", + busname, objectpath) + with self.convert_exception(dbus.ConnectFailed): + return self.bus[busname][objectpath] + + def type_filter(self, value): + """Convert the most bothersome types to Python types""" + if isinstance(value, tuple): + if (len(value) == 2 + and isinstance(value[0], + self.dbussy.DBUS.Signature)): + return self.type_filter(value[1]) + elif isinstance(value, self.dbussy.DBUS.ObjectPath): + return str(value) + # Also recurse into dictionaries + elif isinstance(value, dict): + return {self.type_filter(key): + self.type_filter(subval) + for key, subval in value.items()} + return value + + def set_property(self, busname, objectpath, interface, key, + value): + proxy_object = self.get_object(busname, objectpath) + log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname, + objectpath, self.properties_iface, interface, + key, value) + if key == "Secret": + # DBussy wants a Byte Array to be a sequence of + # values, not a byte string + value = tuple(value) + setattr(proxy_object.get_interface(interface), key, value) + + class MandosBus(SystemBus, dbus.MandosBus): + pass + + class CachingBus(MandosBus): + """A caching layer for dbussy_adapter.MandosBus""" + def __init__(self, *args, **kwargs): + self.object_cache = {} + super(dbussy_adapter.CachingBus, self).__init__(*args, + **kwargs) + def get_object(self, busname, objectpath): + try: + return self.object_cache[(busname, objectpath)] + except KeyError: + new_object = super( + dbussy_adapter.CachingBus, + self).get_object(busname, objectpath) + self.object_cache[(busname, objectpath)] = new_object + return new_object + + def commands_from_options(options): commands = list(options.commands) @@ -1774,6 +1871,188 @@ self.assertIs(obj1, obj1b) +class Test_dbussy_adapter_SystemBus(TestCaseWithAssertLogs): + + class dummy_dbussy: + class DBUS: + class ObjectPath(str): + pass + class DBusError(Exception): + pass + + def fake_ravel_func(self, func): + class fake_ravel: + @staticmethod + def system_bus(): + class DBusInterfaceProxy: + @staticmethod + def methodname(*args): + return [func(*args)] + class DBusObject: + @staticmethod + def get_interface(interface): + if interface == "interface": + return DBusInterfaceProxy() + return {"busname": {"objectpath": DBusObject()}} + return fake_ravel + + def call_method(self, bus, methodname, busname, objectpath, + interface, *args): + with self.assertLogs(log, logging.DEBUG): + return bus.call_method(methodname, busname, objectpath, + interface, *args) + + def test_call_method_returns(self): + expected_method_return = Unique() + method_args = (Unique(), Unique()) + def func(*args): + self.assertEqual(len(method_args), len(args)) + for marg, arg in zip(method_args, args): + self.assertIs(marg, arg) + return expected_method_return + fake_ravel = self.fake_ravel_func(func) + bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface", + *method_args) + self.assertIs(ret, expected_method_return) + + def test_call_method_filters_objectpath(self): + def func(): + return method_return + fake_ravel = self.fake_ravel_func(func) + bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel) + method_return = (self.dummy_dbussy.DBUS + .ObjectPath("objectpath")) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + self.assertEqual("objectpath", ret) + self.assertNotIsInstance(ret, + self.dummy_dbussy.DBUS.ObjectPath) + + def test_call_method_filters_objectpaths_in_dict(self): + ObjectPath = self.dummy_dbussy.DBUS.ObjectPath + def func(): + return method_return + fake_ravel = self.fake_ravel_func(func) + bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel) + method_return = { + ObjectPath("objectpath_key_1"): + ObjectPath("objectpath_value_1"), + ObjectPath("objectpath_key_2"): + ObjectPath("objectpath_value_2"), + } + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + expected_method_return = {str(key): str(value) + for key, value in + method_return.items()} + for key, value in ret.items(): + self.assertNotIsInstance(key, ObjectPath) + self.assertNotIsInstance(value, ObjectPath) + self.assertEqual(expected_method_return, ret) + self.assertIsInstance(ret, dict) + + def test_call_method_filters_objectpaths_in_dict_in_dict(self): + ObjectPath = self.dummy_dbussy.DBUS.ObjectPath + def func(): + return method_return + fake_ravel = self.fake_ravel_func(func) + bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel) + method_return = { + ObjectPath("key1"): { + ObjectPath("key11"): ObjectPath("value11"), + ObjectPath("key12"): ObjectPath("value12"), + }, + ObjectPath("key2"): { + ObjectPath("key21"): ObjectPath("value21"), + ObjectPath("key22"): ObjectPath("value22"), + }, + } + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + expected_method_return = { + "key1": {"key11": "value11", + "key12": "value12"}, + "key2": {"key21": "value21", + "key22": "value22"}, + } + self.assertEqual(expected_method_return, ret) + for key, value in ret.items(): + self.assertIsInstance(value, dict) + self.assertEqual(expected_method_return[key], value) + self.assertNotIsInstance(key, ObjectPath) + for inner_key, inner_value in value.items(): + self.assertIsInstance(value, dict) + self.assertEqual( + expected_method_return[key][inner_key], + inner_value) + self.assertNotIsInstance(key, ObjectPath) + + def test_call_method_filters_objectpaths_in_dict_three_deep(self): + ObjectPath = self.dummy_dbussy.DBUS.ObjectPath + def func(): + return method_return + fake_ravel = self.fake_ravel_func(func) + bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel) + method_return = { + ObjectPath("key1"): { + ObjectPath("key2"): { + ObjectPath("key3"): ObjectPath("value"), + }, + }, + } + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + expected_method_return = {"key1": {"key2": {"key3": "value"}}} + self.assertEqual(expected_method_return, ret) + self.assertIsInstance(ret, dict) + self.assertNotIsInstance(next(iter(ret.keys())), ObjectPath) + self.assertIsInstance(ret["key1"], dict) + self.assertNotIsInstance(next(iter(ret["key1"].keys())), + ObjectPath) + self.assertIsInstance(ret["key1"]["key2"], dict) + self.assertNotIsInstance( + next(iter(ret["key1"]["key2"].keys())), + ObjectPath) + self.assertEqual("value", ret["key1"]["key2"]["key3"]) + self.assertNotIsInstance(ret["key1"]["key2"]["key3"], + self.dummy_dbussy.DBUS.ObjectPath) + + def test_call_method_handles_exception(self): + def func(): + raise self.dummy_dbussy.DBusError() + + fake_ravel = self.fake_ravel_func(func) + bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel) + + with self.assertRaises(dbus.Error) as e: + self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + + self.assertNotIsInstance(e.exception, dbus.ConnectFailed) + + def test_get_object_converts_to_correct_exception(self): + class fake_ravel_raises_exception_on_connect: + @staticmethod + def system_bus(): + class Bus: + @staticmethod + def __getitem__(key): + if key == "objectpath": + raise self.dummy_dbussy.DBusError() + raise Exception(key) + return {"busname": Bus()} + def func(): + raise self.dummy_dbussy.DBusError() + bus = dbussy_adapter.SystemBus( + self.dummy_dbussy, + fake_ravel_raises_exception_on_connect) + with self.assertRaises(dbus.ConnectFailed): + self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + + class Test_commands_from_options(unittest.TestCase): def setUp(self):