=== modified file 'mandos-ctl' --- mandos-ctl 2019-03-31 04:47:26 +0000 +++ mandos-ctl 2019-03-31 09:13:31 +0000 @@ -46,7 +46,19 @@ import tempfile import contextlib -import dbus as dbus_python +try: + import pydbus + import gi + dbus_python = None +except ImportError: + import dbus as dbus_python + pydbus = None + class gi(object): + """Dummy gi module, for the tests""" + class repository(object): + class GLib(object): + class Error(Exception): + pass # Show warnings by default if not sys.warnoptions: @@ -81,7 +93,10 @@ if options.debug: log.setLevel(logging.DEBUG) - bus = dbus_python_adapter.CachingBus(dbus_python) + if pydbus is not None: + bus = pydbus_adapter.CachingBus(pydbus) + else: + bus = dbus_python_adapter.CachingBus(dbus_python) try: all_clients = bus.get_clients_and_properties() @@ -592,6 +607,69 @@ return new_object +class pydbus_adapter(object): + class SystemBus(dbus.MandosBus): + def __init__(self, module=pydbus): + self.pydbus = module + self.bus = self.pydbus.SystemBus() + + @contextlib.contextmanager + def convert_exception(self, exception_class=dbus.Error): + try: + yield + except gi.repository.GLib.Error as e: + # This does what "raise from" would do + exc = exception_class(*e.args) + exc.__cause__ = e + raise exc + + def call_method(self, methodname, busname, objectpath, + interface, *args): + proxy_object = self.get(busname, objectpath) + log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath, + interface, methodname, + ", ".join(repr(a) for a in args)) + method = getattr(proxy_object[interface], methodname) + with self.convert_exception(): + return method(*args) + + def get(self, busname, objectpath): + log.debug("D-Bus: Connect to: (busname=%r, path=%r)", + busname, objectpath) + with self.convert_exception(dbus.ConnectFailed): + if sys.version_info.major <= 2: + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", "", DeprecationWarning, + r"^xml\.etree\.ElementTree$") + return self.bus.get(busname, objectpath) + else: + return self.bus.get(busname, objectpath) + + def set_property(self, busname, objectpath, interface, key, + value): + proxy_object = self.get(busname, objectpath) + log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname, + objectpath, self.properties_iface, interface, + key, value) + setattr(proxy_object[interface], key, value) + + class CachingBus(SystemBus): + """A caching layer for pydbus_adapter.SystemBus""" + def __init__(self, *args, **kwargs): + self.object_cache = {} + super(pydbus_adapter.CachingBus, + self).__init__(*args, **kwargs) + def get(self, busname, objectpath): + try: + return self.object_cache[(busname, objectpath)] + except KeyError: + new_object = (super(pydbus_adapter.CachingBus, self) + .get(busname, objectpath)) + self.object_cache[(busname, objectpath)] = new_object + return new_object + + def commands_from_options(options): commands = list(options.commands) @@ -1508,6 +1586,155 @@ self.assertIs(obj1, obj1b) +class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs): + + def Stub_pydbus_func(self, func): + class stub_pydbus(object): + """stub pydbus module""" + class SystemBus(object): + @staticmethod + def get(busname, objectpath): + DBusObject = collections.namedtuple( + "DBusObject", ("methodname",)) + return {"interface": + DBusObject(methodname=func)} + return stub_pydbus + + def call_method(self, bus, methodname, busname, objectpath, + interface, *args): + with self.assertLogs(log, logging.DEBUG): + return bus.call_method(methodname, busname, objectpath, + interface, *args) + + def test_call_method_returns(self): + expected_method_return = Unique() + method_args = (Unique(), Unique()) + def func(*args): + self.assertEqual(len(method_args), len(args)) + for marg, arg in zip(method_args, args): + self.assertIs(marg, arg) + return expected_method_return + stub_pydbus = self.Stub_pydbus_func(func) + bus = pydbus_adapter.SystemBus(stub_pydbus) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface", + *method_args) + self.assertIs(ret, expected_method_return) + + def test_call_method_handles_exception(self): + dbus_logger = logging.getLogger("dbus.proxies") + + def func(): + raise gi.repository.GLib.Error() + + stub_pydbus = self.Stub_pydbus_func(func) + bus = pydbus_adapter.SystemBus(stub_pydbus) + + with self.assertRaises(dbus.Error) as e: + self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + + self.assertNotIsInstance(e, dbus.ConnectFailed) + + def test_get_converts_to_correct_exception(self): + bus = pydbus_adapter.SystemBus( + self.fake_pydbus_raises_exception_on_connect) + with self.assertRaises(dbus.ConnectFailed): + self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + + class fake_pydbus_raises_exception_on_connect(object): + """fake dbus-python module""" + @classmethod + def SystemBus(cls): + def get(busname, objectpath): + raise gi.repository.GLib.Error() + Bus = collections.namedtuple("Bus", ["get"]) + return Bus(get=get) + + def test_set_property_uses_setattr(self): + class Object(object): + pass + obj = Object() + class pydbus_spy(object): + class SystemBus(object): + @staticmethod + def get(busname, objectpath): + return {"interface": obj} + bus = pydbus_adapter.SystemBus(pydbus_spy) + value = Unique() + bus.set_property("busname", "objectpath", "interface", "key", + value) + self.assertIs(value, obj.key) + + def test_get_suppresses_xml_deprecation_warning(self): + if sys.version_info.major >= 3: + return + class stub_pydbus_get(object): + class SystemBus(object): + @staticmethod + def get(busname, objectpath): + warnings.warn_explicit( + "deprecated", DeprecationWarning, + "xml.etree.ElementTree", 0) + bus = pydbus_adapter.SystemBus(stub_pydbus_get) + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + bus.get("busname", "objectpath") + self.assertEqual(0, len(w)) + + +class Test_pydbus_adapter_CachingBus(unittest.TestCase): + class stub_pydbus(object): + """stub pydbus module""" + class SystemBus(object): + @staticmethod + def get(busname, objectpath): + return Unique() + + def setUp(self): + self.bus = pydbus_adapter.CachingBus(self.stub_pydbus) + + def test_returns_distinct_objectpaths(self): + obj1 = self.bus.get("busname", "objectpath1") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get("busname", "objectpath2") + self.assertIsInstance(obj2, Unique) + self.assertIsNot(obj1, obj2) + + def test_returns_distinct_busnames(self): + obj1 = self.bus.get("busname1", "objectpath") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get("busname2", "objectpath") + self.assertIsInstance(obj2, Unique) + self.assertIsNot(obj1, obj2) + + def test_returns_distinct_both(self): + obj1 = self.bus.get("busname1", "objectpath") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get("busname2", "objectpath") + self.assertIsInstance(obj2, Unique) + self.assertIsNot(obj1, obj2) + + def test_returns_same(self): + obj1 = self.bus.get("busname", "objectpath") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get("busname", "objectpath") + self.assertIsInstance(obj2, Unique) + self.assertIs(obj1, obj2) + + def test_returns_same_old(self): + obj1 = self.bus.get("busname1", "objectpath1") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get("busname2", "objectpath2") + self.assertIsInstance(obj2, Unique) + obj1b = self.bus.get("busname1", "objectpath1") + self.assertIsInstance(obj1b, Unique) + self.assertIsNot(obj1, obj2) + self.assertIsNot(obj2, obj1b) + self.assertIs(obj1, obj1b) + + class Test_commands_from_options(unittest.TestCase): def setUp(self):