=== modified file 'mandos-ctl' --- mandos-ctl 2019-03-23 16:00:26 +0000 +++ mandos-ctl 2019-03-30 07:03:04 +0000 @@ -45,8 +45,9 @@ import io import tempfile import contextlib +import abc -import dbus +import dbus as dbus_python # Show warnings by default if not sys.warnoptions: @@ -66,21 +67,9 @@ locale.setlocale(locale.LC_ALL, "") -dbus_busname_domain = "se.recompile" -dbus_busname = dbus_busname_domain + ".Mandos" -server_dbus_path = "/" -server_dbus_interface = dbus_busname_domain + ".Mandos" -client_dbus_interface = dbus_busname_domain + ".Mandos.Client" -del dbus_busname_domain version = "1.8.3" -try: - dbus.OBJECT_MANAGER_IFACE -except AttributeError: - dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager" - - def main(): parser = argparse.ArgumentParser() add_command_line_options(parser) @@ -93,23 +82,17 @@ if options.debug: log.setLevel(logging.DEBUG) - bus = dbus.SystemBus() - - mandos_dbus_object = get_mandos_dbus_object(bus) - - mandos_serv = dbus.Interface( - mandos_dbus_object, dbus_interface=server_dbus_interface) - mandos_serv_object_manager = dbus.Interface( - mandos_dbus_object, dbus_interface=dbus.OBJECT_MANAGER_IFACE) - - managed_objects = get_managed_objects(mandos_serv_object_manager) - - all_clients = {} - for path, ifs_and_props in managed_objects.items(): - try: - all_clients[path] = ifs_and_props[client_dbus_interface] - except KeyError: - pass + bus = dbus_python_adapter.CachingBus(dbus_python) + + try: + all_clients = bus.get_clients_and_properties() + except dbus.ConnectFailed as e: + log.critical("Could not connect to Mandos server: %s", e) + sys.exit(1) + except dbus.Error as e: + log.critical( + "Failed to access Mandos server through D-Bus:\n%s", e) + sys.exit(1) # Compile dict of (clientpath: properties) to process if not clientnames: @@ -128,7 +111,7 @@ commands = commands_from_options(options) for command in commands: - command.run(clients, bus, mandos_serv) + command.run(clients, bus) def add_command_line_options(parser): @@ -424,51 +407,146 @@ options.remove = True -def get_mandos_dbus_object(bus): - log.debug("D-Bus: Connect to: (busname=%r, path=%r)", - dbus_busname, server_dbus_path) - with if_dbus_exception_log_with_exception_and_exit( - "Could not connect to Mandos server: %s"): - mandos_dbus_object = bus.get_object(dbus_busname, - server_dbus_path) - return mandos_dbus_object - - -@contextlib.contextmanager -def if_dbus_exception_log_with_exception_and_exit(*args, **kwargs): - try: - yield - except dbus.exceptions.DBusException as e: - log.critical(*(args + (e,)), **kwargs) - sys.exit(1) - - -def get_managed_objects(object_manager): - log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname, - server_dbus_path, dbus.OBJECT_MANAGER_IFACE) - with if_dbus_exception_log_with_exception_and_exit( - "Failed to access Mandos server through D-Bus:\n%s"): - with SilenceLogger("dbus.proxies"): - managed_objects = object_manager.GetManagedObjects() - return managed_objects - - -class SilenceLogger(object): - "Simple context manager to silence a particular logger" - def __init__(self, loggername): - self.logger = logging.getLogger(loggername) - - def __enter__(self): - self.logger.addFilter(self.nullfilter) - - class NullFilter(logging.Filter): - def filter(self, record): - return False - - nullfilter = NullFilter() - - def __exit__(self, exc_type, exc_val, exc_tb): - self.logger.removeFilter(self.nullfilter) + +class dbus(object): + + class SystemBus(object): + + object_manager_iface = "org.freedesktop.DBus.ObjectManager" + def get_managed_objects(self, busname, objectpath): + return self.call_method("GetManagedObjects", busname, + objectpath, + self.object_manager_iface) + + properties_iface = "org.freedesktop.DBus.Properties" + def set_property(self, busname, objectpath, interface, key, + value): + self.call_method("Set", busname, objectpath, + self.properties_iface, interface, key, + value) + + + class MandosBus(SystemBus): + busname_domain = "se.recompile" + busname = busname_domain + ".Mandos" + server_path = "/" + server_interface = busname_domain + ".Mandos" + client_interface = busname_domain + ".Mandos.Client" + del busname_domain + + def get_clients_and_properties(self): + managed_objects = self.get_managed_objects( + self.busname, self.server_path) + return {objpath: properties[self.client_interface] + for objpath, properties in managed_objects.items() + if self.client_interface in properties} + + def set_client_property(self, objectpath, key, value): + return self.set_property(self.busname, objectpath, + self.client_interface, key, + value) + + def call_client_method(self, objectpath, method, *args): + return self.call_method(method, self.busname, objectpath, + self.client_interface, *args) + + def call_server_method(self, method, *args): + return self.call_method(method, self.busname, + self.server_path, + self.server_interface, *args) + + class Error(Exception): + pass + + class ConnectFailed(Error): + pass + + +class dbus_python_adapter(object): + + class SystemBus(dbus.MandosBus): + """Use dbus-python""" + + def __init__(self, module=dbus_python): + self.dbus_python = module + self.bus = self.dbus_python.SystemBus() + + @contextlib.contextmanager + def convert_exception(self, exception_class=dbus.Error): + try: + yield + except self.dbus_python.exceptions.DBusException as e: + # This does what "raise from" would do + exc = exception_class(*e.args) + exc.__cause__ = e + raise exc + + def call_method(self, methodname, busname, objectpath, + interface, *args): + proxy_object = self.get_object(busname, objectpath) + log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath, + interface, methodname, + ", ".join(repr(a) for a in args)) + method = getattr(proxy_object, methodname) + with self.convert_exception(): + with dbus_python_adapter.SilenceLogger( + "dbus.proxies"): + value = method(*args, dbus_interface=interface) + return self.type_filter(value) + + def get_object(self, busname, objectpath): + log.debug("D-Bus: Connect to: (busname=%r, path=%r)", + busname, objectpath) + with self.convert_exception(dbus.ConnectFailed): + return self.bus.get_object(busname, objectpath) + + def type_filter(self, value): + """Convert the most bothersome types to Python types""" + if isinstance(value, self.dbus_python.Boolean): + return bool(value) + if isinstance(value, self.dbus_python.ObjectPath): + return str(value) + # Also recurse into dictionaries + if isinstance(value, self.dbus_python.Dictionary): + return {self.type_filter(key): + self.type_filter(subval) + for key, subval in value.items()} + return value + + + class SilenceLogger(object): + "Simple context manager to silence a particular logger" + def __init__(self, loggername): + self.logger = logging.getLogger(loggername) + + def __enter__(self): + self.logger.addFilter(self.nullfilter) + + class NullFilter(logging.Filter): + def filter(self, record): + return False + + nullfilter = NullFilter() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.logger.removeFilter(self.nullfilter) + + + class CachingBus(SystemBus): + """A caching layer for dbus_python_adapter.SystemBus""" + def __init__(self, *args, **kwargs): + self.object_cache = {} + super(dbus_python_adapter.CachingBus, + self).__init__(*args, **kwargs) + def get_object(self, busname, objectpath): + try: + return self.object_cache[(busname, objectpath)] + except KeyError: + new_object = super( + dbus_python_adapter.CachingBus, + self).get_object(busname, objectpath) + self.object_cache[(busname, objectpath)] = new_object + return new_object def commands_from_options(options): @@ -551,53 +629,38 @@ class Base(object): """Abstract base class for commands""" - def run(self, clients, bus=None, mandos=None): + def run(self, clients, bus=None): """Normal commands should implement run_on_one_client(), but commands which want to operate on all clients at the same time can override this run() method instead. """ - for clientpath, properties in clients.items(): - log.debug("D-Bus: Connect to: (busname=%r, path=%r)", - dbus_busname, str(clientpath)) - client = bus.get_object(dbus_busname, clientpath) + self.bus = bus + for client, properties in clients.items(): self.run_on_one_client(client, properties) class IsEnabled(Base): - def run(self, clients, bus=None, mandos=None): - client, properties = next(iter(clients.items())) - if self.is_enabled(client, properties): + def run(self, clients, bus=None): + properties = next(iter(clients.values())) + if properties["Enabled"]: sys.exit(0) sys.exit(1) - def is_enabled(self, client, properties): - return properties["Enabled"] class Approve(Base): def run_on_one_client(self, client, properties): - log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname, - client.__dbus_object_path__, - client_dbus_interface) - client.Approve(dbus.Boolean(True), - dbus_interface=client_dbus_interface) + self.bus.call_client_method(client, "Approve", True) class Deny(Base): def run_on_one_client(self, client, properties): - log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname, - client.__dbus_object_path__, - client_dbus_interface) - client.Approve(dbus.Boolean(False), - dbus_interface=client_dbus_interface) + self.bus.call_client_method(client, "Approve", False) class Remove(Base): - def run(self, clients, bus, mandos): - for clientpath in clients.keys(): - log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", - dbus_busname, server_dbus_path, - server_dbus_interface, clientpath) - mandos.RemoveClient(clientpath) + def run(self, clients, bus): + for clientpath in frozenset(clients.keys()): + bus.call_server_method("RemoveClient", clientpath) class Output(Base): @@ -613,25 +676,19 @@ class DumpJSON(Output): - def run(self, clients, bus=None, mandos=None): - data = {client["Name"]: - {key: self.dbus_boolean_to_bool(client[key]) + def run(self, clients, bus=None): + data = {properties["Name"]: + {key: properties[key] for key in self.all_keywords} - for client in clients.values()} + for properties in clients.values()} print(json.dumps(data, indent=4, separators=(',', ': '))) - @staticmethod - def dbus_boolean_to_bool(value): - if isinstance(value, dbus.Boolean): - value = bool(value) - return value - class PrintTable(Output): def __init__(self, verbose=False): self.verbose = verbose - def run(self, clients, bus=None, mandos=None): + def run(self, clients, bus=None): default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK") keywords = default_keywords @@ -698,7 +755,7 @@ @classmethod def valuetostring(cls, value, keyword): - if isinstance(value, dbus.Boolean): + if isinstance(value, bool): return "Yes" if value else "No" if keyword in ("Timeout", "Interval", "ApprovalDelay", "ApprovalDuration", "ExtendedTimeout"): @@ -727,18 +784,10 @@ class PropertySetter(Base): "Abstract class for Actions for setting one client property" - def run_on_one_client(self, client, properties): + def run_on_one_client(self, client, properties=None): """Set the Client's D-Bus property""" - log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname, - client.__dbus_object_path__, - dbus.PROPERTIES_IFACE, client_dbus_interface, - self.propname, self.value_to_set - if not isinstance(self.value_to_set, - dbus.Boolean) - else bool(self.value_to_set)) - client.Set(client_dbus_interface, self.propname, - self.value_to_set, - dbus_interface=dbus.PROPERTIES_IFACE) + self.bus.set_client_property(client, self.propname, + self.value_to_set) @property def propname(self): @@ -747,12 +796,12 @@ class Enable(PropertySetter): propname = "Enabled" - value_to_set = dbus.Boolean(True) + value_to_set = True class Disable(PropertySetter): propname = "Enabled" - value_to_set = dbus.Boolean(False) + value_to_set = False class BumpTimeout(PropertySetter): @@ -762,22 +811,22 @@ class StartChecker(PropertySetter): propname = "CheckerRunning" - value_to_set = dbus.Boolean(True) + value_to_set = True class StopChecker(PropertySetter): propname = "CheckerRunning" - value_to_set = dbus.Boolean(False) + value_to_set = False class ApproveByDefault(PropertySetter): propname = "ApprovedByDefault" - value_to_set = dbus.Boolean(True) + value_to_set = True class DenyByDefault(PropertySetter): propname = "ApprovedByDefault" - value_to_set = dbus.Boolean(False) + value_to_set = False class PropertySetterValue(PropertySetter): @@ -879,6 +928,11 @@ "output")) +class Unique(object): + """Class for objects which exist only to be unique objects, since +unittest.mock.sentinel only exists in Python 3.3""" + + class Test_string_to_delta(TestCaseWithAssertLogs): # Just test basic RFC 3339 functionality here, the doc string for # rfc3339_duration_to_delta() already has more comprehensive @@ -1072,53 +1126,300 @@ self.check_option_syntax(options) -class Test_get_mandos_dbus_object(TestCaseWithAssertLogs): - def test_calls_and_returns_get_object_on_bus(self): - class MockBus(object): - called = False - def get_object(mockbus_self, busname, dbus_path): - # Note that "self" is still the testcase instance, - # this MockBus instance is in "mockbus_self". - self.assertEqual(dbus_busname, busname) - self.assertEqual(server_dbus_path, dbus_path) - mockbus_self.called = True - return mockbus_self - - mockbus = get_mandos_dbus_object(bus=MockBus()) - self.assertIsInstance(mockbus, MockBus) - self.assertTrue(mockbus.called) - - def test_logs_and_exits_on_dbus_error(self): - class FailingBusStub(object): - def get_object(self, busname, dbus_path): - raise dbus.exceptions.DBusException("Test") - - with self.assertLogs(log, logging.CRITICAL): - with self.assertRaises(SystemExit) as e: - bus = get_mandos_dbus_object(bus=FailingBusStub()) - - if isinstance(e.exception.code, int): - self.assertNotEqual(0, e.exception.code) - else: - self.assertIsNotNone(e.exception.code) - - -class Test_get_managed_objects(TestCaseWithAssertLogs): - def test_calls_and_returns_GetManagedObjects(self): - managed_objects = {"/clients/client": { "Name": "client"}} - class ObjectManagerStub(object): - def GetManagedObjects(self): - return managed_objects - retval = get_managed_objects(ObjectManagerStub()) - self.assertDictEqual(managed_objects, retval) - - def test_logs_and_exits_on_dbus_error(self): +class Test_dbus_exceptions(unittest.TestCase): + + def test_dbus_ConnectFailed_is_Error(self): + with self.assertRaises(dbus.Error): + raise dbus.ConnectFailed() + + +class Test_dbus_MandosBus(unittest.TestCase): + + class MockMandosBus(dbus.MandosBus): + def __init__(self): + self._name = "se.recompile.Mandos" + self._server_path = "/" + self._server_interface = "se.recompile.Mandos" + self._client_interface = "se.recompile.Mandos.Client" + self.calls = [] + self.call_method_return = Unique() + + def call_method(self, methodname, busname, objectpath, + interface, *args): + self.calls.append((methodname, busname, objectpath, + interface, args)) + return self.call_method_return + + def setUp(self): + self.bus = self.MockMandosBus() + + def test_set_client_property(self): + self.bus.set_client_property("objectpath", "key", "value") + expected_call = ("Set", self.bus._name, "objectpath", + "org.freedesktop.DBus.Properties", + (self.bus._client_interface, "key", "value")) + self.assertIn(expected_call, self.bus.calls) + + def test_call_client_method(self): + ret = self.bus.call_client_method("objectpath", "methodname") + self.assertIs(self.bus.call_method_return, ret) + expected_call = ("methodname", self.bus._name, "objectpath", + self.bus._client_interface, ()) + self.assertIn(expected_call, self.bus.calls) + + def test_call_client_method_with_args(self): + args = (Unique(), Unique()) + ret = self.bus.call_client_method("objectpath", "methodname", + *args) + self.assertIs(self.bus.call_method_return, ret) + expected_call = ("methodname", self.bus._name, "objectpath", + self.bus._client_interface, + (args[0], args[1])) + self.assertIn(expected_call, self.bus.calls) + + def test_get_clients_and_properties(self): + managed_objects = { + "objectpath": { + self.bus._client_interface: { + "key": "value", + "bool": True, + }, + "irrelevant_interface": { + "key": "othervalue", + "bool": False, + }, + }, + "other_objectpath": { + "other_irrelevant_interface": { + "key": "value 3", + "bool": None, + }, + }, + } + expected_clients_and_properties = { + "objectpath": { + "key": "value", + "bool": True, + } + } + self.bus.call_method_return = managed_objects + ret = self.bus.get_clients_and_properties() + self.assertDictEqual(expected_clients_and_properties, ret) + expected_call = ("GetManagedObjects", self.bus._name, + self.bus._server_path, + "org.freedesktop.DBus.ObjectManager", ()) + self.assertIn(expected_call, self.bus.calls) + + def test_call_server_method(self): + ret = self.bus.call_server_method("methodname") + self.assertIs(self.bus.call_method_return, ret) + expected_call = ("methodname", self.bus._name, + self.bus._server_path, + self.bus._server_interface, ()) + self.assertIn(expected_call, self.bus.calls) + + def test_call_server_method_with_args(self): + args = (Unique(), Unique()) + ret = self.bus.call_server_method("methodname", *args) + self.assertIs(self.bus.call_method_return, ret) + expected_call = ("methodname", self.bus._name, + self.bus._server_path, + self.bus._server_interface, + (args[0], args[1])) + self.assertIn(expected_call, self.bus.calls) + + +class Test_dbus_python_adapter_SystemBus(TestCaseWithAssertLogs): + + def MockDBusPython_func(self, func): + class mock_dbus_python(object): + """mock dbus-python module""" + class exceptions(object): + """Pseudo-namespace""" + class DBusException(Exception): + pass + class SystemBus(object): + @staticmethod + def get_object(busname, objectpath): + DBusObject = collections.namedtuple( + "DBusObject", ("methodname",)) + def method(*args, **kwargs): + self.assertEqual({"dbus_interface": + "interface"}, + kwargs) + return func(*args) + return DBusObject(methodname=method) + class Boolean(object): + def __init__(self, value): + self.value = bool(value) + def __bool__(self): + return self.value + if sys.version_info.major == 2: + __nonzero__ = __bool__ + class ObjectPath(str): + pass + class Dictionary(dict): + pass + return mock_dbus_python + + def call_method(self, bus, methodname, busname, objectpath, + interface, *args): + with self.assertLogs(log, logging.DEBUG): + return bus.call_method(methodname, busname, objectpath, + interface, *args) + + def test_call_method_returns(self): + expected_method_return = Unique() + method_args = (Unique(), Unique()) + def func(*args): + self.assertEqual(len(method_args), len(args)) + for marg, arg in zip(method_args, args): + self.assertIs(marg, arg) + return expected_method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface", + *method_args) + self.assertIs(ret, expected_method_return) + + def test_call_method_filters_bool_true(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.Boolean(True) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + self.assertTrue(ret) + self.assertNotIsInstance(ret, mock_dbus_python.Boolean) + + def test_call_method_filters_bool_false(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.Boolean(False) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + self.assertFalse(ret) + self.assertNotIsInstance(ret, mock_dbus_python.Boolean) + + def test_call_method_filters_objectpath(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.ObjectPath("objectpath") + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + self.assertEqual("objectpath", ret) + self.assertIsNot("objectpath", ret) + self.assertNotIsInstance(ret, mock_dbus_python.ObjectPath) + + def test_call_method_filters_booleans_in_dict(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.Dictionary( + {mock_dbus_python.Boolean(True): + mock_dbus_python.Boolean(False), + mock_dbus_python.Boolean(False): + mock_dbus_python.Boolean(True)}) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + expected_method_return = {True: False, + False: True} + self.assertEqual(expected_method_return, ret) + self.assertNotIsInstance(ret, mock_dbus_python.Dictionary) + + def test_call_method_filters_objectpaths_in_dict(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.Dictionary( + {mock_dbus_python.ObjectPath("objectpath_key_1"): + mock_dbus_python.ObjectPath("objectpath_value_1"), + mock_dbus_python.ObjectPath("objectpath_key_2"): + mock_dbus_python.ObjectPath("objectpath_value_2")}) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + expected_method_return = {str(key): str(value) + for key, value in + method_return.items()} + self.assertEqual(expected_method_return, ret) + self.assertIsInstance(ret, dict) + self.assertNotIsInstance(ret, mock_dbus_python.Dictionary) + + def test_call_method_filters_dict_in_dict(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.Dictionary( + {"key1": mock_dbus_python.Dictionary({"key11": "value11", + "key12": "value12"}), + "key2": mock_dbus_python.Dictionary({"key21": "value21", + "key22": "value22"})}) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + expected_method_return = { + "key1": {"key11": "value11", + "key12": "value12"}, + "key2": {"key21": "value21", + "key22": "value22"}, + } + self.assertEqual(expected_method_return, ret) + self.assertIsInstance(ret, dict) + self.assertNotIsInstance(ret, mock_dbus_python.Dictionary) + for key, value in ret.items(): + self.assertIsInstance(value, dict) + self.assertEqual(expected_method_return[key], value) + self.assertNotIsInstance(value, + mock_dbus_python.Dictionary) + + def test_call_method_filters_dict_three_deep(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.Dictionary( + {"key1": + mock_dbus_python.Dictionary( + {"key2": + mock_dbus_python.Dictionary( + {"key3": + mock_dbus_python.Boolean(True), + }), + }), + }) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + expected_method_return = {"key1": {"key2": {"key3": True}}} + self.assertEqual(expected_method_return, ret) + self.assertIsInstance(ret, dict) + self.assertNotIsInstance(ret, mock_dbus_python.Dictionary) + self.assertIsInstance(ret["key1"], dict) + self.assertNotIsInstance(ret["key1"], + mock_dbus_python.Dictionary) + self.assertIsInstance(ret["key1"]["key2"], dict) + self.assertNotIsInstance(ret["key1"]["key2"], + mock_dbus_python.Dictionary) + self.assertTrue(ret["key1"]["key2"]["key3"]) + self.assertNotIsInstance(ret["key1"]["key2"]["key3"], + mock_dbus_python.Boolean) + + def test_call_method_handles_exception(self): dbus_logger = logging.getLogger("dbus.proxies") - class ObjectManagerFailingStub(object): - def GetManagedObjects(self): - dbus_logger.error("Test") - raise dbus.exceptions.DBusException("Test") + def func(): + dbus_logger.error("Test") + raise mock_dbus_python.exceptions.DBusException() + + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) class CountingHandler(logging.Handler): count = 0 @@ -1130,26 +1431,93 @@ dbus_logger.addHandler(counting_handler) try: - with self.assertLogs(log, logging.CRITICAL) as watcher: - with self.assertRaises(SystemExit) as e: - get_managed_objects(ObjectManagerFailingStub()) + with self.assertRaises(dbus.Error) as e: + self.call_method(bus, "methodname", "busname", + "objectpath", "interface") finally: dbus_logger.removeFilter(counting_handler) + self.assertNotIsInstance(e, dbus.ConnectFailed) + # Make sure the dbus logger was suppressed self.assertEqual(0, counting_handler.count) - # Test that the dbus_logger still works - with self.assertLogs(dbus_logger, logging.ERROR): - dbus_logger.error("Test") - - if isinstance(e.exception.code, int): - self.assertNotEqual(0, e.exception.code) - else: - self.assertIsNotNone(e.exception.code) + def test_get_object_converts_to_correct_exception(self): + bus = dbus_python_adapter.SystemBus( + self.fake_dbus_python_raises_exception_on_connect) + with self.assertRaises(dbus.ConnectFailed): + self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + + class fake_dbus_python_raises_exception_on_connect(object): + """fake dbus-python module""" + class exceptions(object): + """Pseudo-namespace""" + class DBusException(Exception): + pass + + @classmethod + def SystemBus(cls): + def get_object(busname, objectpath): + raise cls.exceptions.DBusException() + Bus = collections.namedtuple("Bus", ["get_object"]) + return Bus(get_object=get_object) + + +class Test_dbus_python_adapter_CachingBus(unittest.TestCase): + class mock_dbus_python(object): + """mock dbus-python modules""" + class SystemBus(object): + @staticmethod + def get_object(busname, objectpath): + return Unique() + + def setUp(self): + self.bus = dbus_python_adapter.CachingBus( + self.mock_dbus_python) + + def test_returns_distinct_objectpaths(self): + obj1 = self.bus.get_object("busname", "objectpath1") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get_object("busname", "objectpath2") + self.assertIsInstance(obj2, Unique) + self.assertIsNot(obj1, obj2) + + def test_returns_distinct_busnames(self): + obj1 = self.bus.get_object("busname1", "objectpath") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get_object("busname2", "objectpath") + self.assertIsInstance(obj2, Unique) + self.assertIsNot(obj1, obj2) + + def test_returns_distinct_both(self): + obj1 = self.bus.get_object("busname1", "objectpath") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get_object("busname2", "objectpath") + self.assertIsInstance(obj2, Unique) + self.assertIsNot(obj1, obj2) + + def test_returns_same(self): + obj1 = self.bus.get_object("busname", "objectpath") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get_object("busname", "objectpath") + self.assertIsInstance(obj2, Unique) + self.assertIs(obj1, obj2) + + def test_returns_same_old(self): + obj1 = self.bus.get_object("busname1", "objectpath1") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get_object("busname2", "objectpath2") + self.assertIsInstance(obj2, Unique) + obj1b = self.bus.get_object("busname1", "objectpath1") + self.assertIsInstance(obj1b, Unique) + self.assertIsNot(obj1, obj2) + self.assertIsNot(obj2, obj1b) + self.assertIs(obj1, obj1b) class Test_commands_from_options(unittest.TestCase): + def setUp(self): self.parser = argparse.ArgumentParser() add_command_line_options(self.parser) @@ -1370,141 +1738,140 @@ class TestCommand(unittest.TestCase): """Abstract class for tests of command classes""" + class FakeMandosBus(dbus.MandosBus): + def __init__(self, testcase): + self.client_properties = { + "Name": "foo", + "KeyID": ("92ed150794387c03ce684574b1139a65" + "94a34f895daaaf09fd8ea90a27cddb12"), + "Secret": b"secret", + "Host": "foo.example.org", + "Enabled": True, + "Timeout": 300000, + "LastCheckedOK": "2019-02-03T00:00:00", + "Created": "2019-01-02T00:00:00", + "Interval": 120000, + "Fingerprint": ("778827225BA7DE539C5A" + "7CFA59CFF7CDBD9A5920"), + "CheckerRunning": False, + "LastEnabled": "2019-01-03T00:00:00", + "ApprovalPending": False, + "ApprovedByDefault": True, + "LastApprovalRequest": "", + "ApprovalDelay": 0, + "ApprovalDuration": 1000, + "Checker": "fping -q -- %(host)s", + "ExtendedTimeout": 900000, + "Expires": "2019-02-04T00:00:00", + "LastCheckerStatus": 0, + } + self.other_client_properties = { + "Name": "barbar", + "KeyID": ("0558568eedd67d622f5c83b35a115f79" + "6ab612cff5ad227247e46c2b020f441c"), + "Secret": b"secretbar", + "Host": "192.0.2.3", + "Enabled": True, + "Timeout": 300000, + "LastCheckedOK": "2019-02-04T00:00:00", + "Created": "2019-01-03T00:00:00", + "Interval": 120000, + "Fingerprint": ("3E393AEAEFB84C7E89E2" + "F547B3A107558FCA3A27"), + "CheckerRunning": True, + "LastEnabled": "2019-01-04T00:00:00", + "ApprovalPending": False, + "ApprovedByDefault": False, + "LastApprovalRequest": "2019-01-03T00:00:00", + "ApprovalDelay": 30000, + "ApprovalDuration": 93785000, + "Checker": ":", + "ExtendedTimeout": 900000, + "Expires": "2019-02-05T00:00:00", + "LastCheckerStatus": -2, + } + self.clients = collections.OrderedDict( + [ + ("client_objectpath", self.client_properties), + ("other_client_objectpath", + self.other_client_properties), + ]) + self.one_client = {"client_objectpath": + self.client_properties} + self.testcase = testcase + self.calls = [] + + def call_method(self, methodname, busname, objectpath, + interface, *args): + self.testcase.assertEqual("se.recompile.Mandos", busname) + self.calls.append((methodname, busname, objectpath, + interface, args)) + if interface == "org.freedesktop.DBus.Properties": + if methodname == "Set": + self.testcase.assertEqual(3, len(args)) + interface, key, value = args + self.testcase.assertEqual( + "se.recompile.Mandos.Client", interface) + self.clients[objectpath][key] = value + return + elif interface == "se.recompile.Mandos": + self.testcase.assertEqual("RemoveClient", methodname) + self.testcase.assertEqual(1, len(args)) + clientpath = args[0] + del self.clients[clientpath] + return + elif interface == "se.recompile.Mandos.Client": + if methodname == "Approve": + self.testcase.assertEqual(1, len(args)) + return + raise ValueError() + def setUp(self): - testcase = self - class MockClient(object): - def __init__(self, name, **attributes): - self.__dbus_object_path__ = "/clients/{}".format(name) - self.attributes = attributes - self.attributes["Name"] = name - self.calls = [] - def Set(self, interface, propname, value, dbus_interface): - testcase.assertEqual(client_dbus_interface, interface) - testcase.assertEqual(dbus.PROPERTIES_IFACE, - dbus_interface) - self.attributes[propname] = value - def Approve(self, approve, dbus_interface): - testcase.assertEqual(client_dbus_interface, - dbus_interface) - self.calls.append(("Approve", (approve, - dbus_interface))) - self.client = MockClient( - "foo", - KeyID=("92ed150794387c03ce684574b1139a65" - "94a34f895daaaf09fd8ea90a27cddb12"), - Secret=b"secret", - Host="foo.example.org", - Enabled=dbus.Boolean(True), - Timeout=300000, - LastCheckedOK="2019-02-03T00:00:00", - Created="2019-01-02T00:00:00", - Interval=120000, - Fingerprint=("778827225BA7DE539C5A" - "7CFA59CFF7CDBD9A5920"), - CheckerRunning=dbus.Boolean(False), - LastEnabled="2019-01-03T00:00:00", - ApprovalPending=dbus.Boolean(False), - ApprovedByDefault=dbus.Boolean(True), - LastApprovalRequest="", - ApprovalDelay=0, - ApprovalDuration=1000, - Checker="fping -q -- %(host)s", - ExtendedTimeout=900000, - Expires="2019-02-04T00:00:00", - LastCheckerStatus=0) - self.other_client = MockClient( - "barbar", - KeyID=("0558568eedd67d622f5c83b35a115f79" - "6ab612cff5ad227247e46c2b020f441c"), - Secret=b"secretbar", - Host="192.0.2.3", - Enabled=dbus.Boolean(True), - Timeout=300000, - LastCheckedOK="2019-02-04T00:00:00", - Created="2019-01-03T00:00:00", - Interval=120000, - Fingerprint=("3E393AEAEFB84C7E89E2" - "F547B3A107558FCA3A27"), - CheckerRunning=dbus.Boolean(True), - LastEnabled="2019-01-04T00:00:00", - ApprovalPending=dbus.Boolean(False), - ApprovedByDefault=dbus.Boolean(False), - LastApprovalRequest="2019-01-03T00:00:00", - ApprovalDelay=30000, - ApprovalDuration=93785000, - Checker=":", - ExtendedTimeout=900000, - Expires="2019-02-05T00:00:00", - LastCheckerStatus=-2) - self.clients = collections.OrderedDict( - [ - (self.client.__dbus_object_path__, - self.client.attributes), - (self.other_client.__dbus_object_path__, - self.other_client.attributes), - ]) - self.one_client = {self.client.__dbus_object_path__: - self.client.attributes} - - @property - def bus(self): - class MockBus(object): - @staticmethod - def get_object(client_bus_name, path): - self.assertEqual(dbus_busname, client_bus_name) - # Note: "self" here is the TestCmd instance, not the - # MockBus instance, since this is a static method! - if path == self.client.__dbus_object_path__: - return self.client - elif path == self.other_client.__dbus_object_path__: - return self.other_client - return MockBus() + self.bus = self.FakeMandosBus(self) class TestBaseCommands(TestCommand): def test_IsEnabled_exits_successfully(self): with self.assertRaises(SystemExit) as e: - command.IsEnabled().run(self.one_client) + command.IsEnabled().run(self.bus.one_client) if e.exception.code is not None: self.assertEqual(0, e.exception.code) else: self.assertIsNone(e.exception.code) def test_IsEnabled_exits_with_failure(self): - self.client.attributes["Enabled"] = dbus.Boolean(False) + self.bus.client_properties["Enabled"] = False with self.assertRaises(SystemExit) as e: - command.IsEnabled().run(self.one_client) + command.IsEnabled().run(self.bus.one_client) if isinstance(e.exception.code, int): self.assertNotEqual(0, e.exception.code) else: self.assertIsNotNone(e.exception.code) def test_Approve(self): - command.Approve().run(self.clients, self.bus) - for clientpath in self.clients: - client = self.bus.get_object(dbus_busname, clientpath) - self.assertIn(("Approve", (True, client_dbus_interface)), - client.calls) + busname = "se.recompile.Mandos" + client_interface = "se.recompile.Mandos.Client" + command.Approve().run(self.bus.clients, self.bus) + for clientpath in self.bus.clients: + self.assertIn(("Approve", busname, clientpath, + client_interface, (True,)), self.bus.calls) def test_Deny(self): - command.Deny().run(self.clients, self.bus) - for clientpath in self.clients: - client = self.bus.get_object(dbus_busname, clientpath) - self.assertIn(("Approve", (False, client_dbus_interface)), - client.calls) + busname = "se.recompile.Mandos" + client_interface = "se.recompile.Mandos.Client" + command.Deny().run(self.bus.clients, self.bus) + for clientpath in self.bus.clients: + self.assertIn(("Approve", busname, clientpath, + client_interface, (False,)), + self.bus.calls) def test_Remove(self): - class MandosSpy(object): - def __init__(self): - self.calls = [] - def RemoveClient(self, dbus_path): - self.calls.append(("RemoveClient", (dbus_path,))) - mandos = MandosSpy() - command.Remove().run(self.clients, self.bus, mandos) - for clientpath in self.clients: - self.assertIn(("RemoveClient", (clientpath,)), - mandos.calls) + command.Remove().run(self.bus.clients, self.bus) + for clientpath in self.bus.clients: + self.assertIn(("RemoveClient", dbus_busname, + dbus_server_path, dbus_server_interface, + (clientpath,)), self.bus.calls) expected_json = { "foo": { @@ -1559,7 +1926,7 @@ def test_DumpJSON_normal(self): with self.capture_stdout_to_buffer() as buffer: - command.DumpJSON().run(self.clients) + command.DumpJSON().run(self.bus.clients) json_data = json.loads(buffer.getvalue()) self.assertDictEqual(self.expected_json, json_data) @@ -1576,14 +1943,14 @@ def test_DumpJSON_one_client(self): with self.capture_stdout_to_buffer() as buffer: - command.DumpJSON().run(self.one_client) + command.DumpJSON().run(self.bus.one_client) json_data = json.loads(buffer.getvalue()) expected_json = {"foo": self.expected_json["foo"]} self.assertDictEqual(expected_json, json_data) def test_PrintTable_normal(self): with self.capture_stdout_to_buffer() as buffer: - command.PrintTable().run(self.clients) + command.PrintTable().run(self.bus.clients) expected_output = "\n".join(( "Name Enabled Timeout Last Successful Check", "foo Yes 00:05:00 2019-02-03T00:00:00 ", @@ -1593,7 +1960,7 @@ def test_PrintTable_verbose(self): with self.capture_stdout_to_buffer() as buffer: - command.PrintTable(verbose=True).run(self.clients) + command.PrintTable(verbose=True).run(self.bus.clients) columns = ( ( "Name ", @@ -1689,7 +2056,7 @@ def test_PrintTable_one_client(self): with self.capture_stdout_to_buffer() as buffer: - command.PrintTable().run(self.one_client) + command.PrintTable().run(self.bus.one_client) expected_output = "\n".join(( "Name Enabled Timeout Last Successful Check", "foo Yes 00:05:00 2019-02-03T00:00:00 ", @@ -1699,28 +2066,22 @@ class TestPropertySetterCmd(TestCommand): """Abstract class for tests of command.PropertySetter classes""" + def runTest(self): if not hasattr(self, "command"): - return + return # Abstract TestCase class values_to_get = getattr(self, "values_to_get", self.values_to_set) for value_to_set, value_to_get in zip(self.values_to_set, values_to_get): - for clientpath in self.clients: - client = self.bus.get_object(dbus_busname, clientpath) - old_value = client.attributes[self.propname] - client.attributes[self.propname] = self.Unique() - self.run_command(value_to_set, self.clients) - for clientpath in self.clients: - client = self.bus.get_object(dbus_busname, clientpath) - value = client.attributes[self.propname] - self.assertNotIsInstance(value, self.Unique) + for clientpath in self.bus.clients: + self.bus.clients[clientpath][self.propname] = Unique() + self.run_command(value_to_set, self.bus.clients) + for clientpath in self.bus.clients: + value = self.bus.clients[clientpath][self.propname] + self.assertNotIsInstance(value, Unique) self.assertEqual(value_to_get, value) - class Unique(object): - """Class for objects which exist only to be unique objects, -since unittest.mock.sentinel only exists in Python 3.3""" - def run_command(self, value, clients): self.command().run(clients, self.bus) @@ -1728,13 +2089,13 @@ class TestEnableCmd(TestPropertySetterCmd): command = command.Enable propname = "Enabled" - values_to_set = [dbus.Boolean(True)] + values_to_set = [True] class TestDisableCmd(TestPropertySetterCmd): command = command.Disable propname = "Enabled" - values_to_set = [dbus.Boolean(False)] + values_to_set = [False] class TestBumpTimeoutCmd(TestPropertySetterCmd): @@ -1746,35 +2107,30 @@ class TestStartCheckerCmd(TestPropertySetterCmd): command = command.StartChecker propname = "CheckerRunning" - values_to_set = [dbus.Boolean(True)] + values_to_set = [True] class TestStopCheckerCmd(TestPropertySetterCmd): command = command.StopChecker propname = "CheckerRunning" - values_to_set = [dbus.Boolean(False)] + values_to_set = [False] class TestApproveByDefaultCmd(TestPropertySetterCmd): command = command.ApproveByDefault propname = "ApprovedByDefault" - values_to_set = [dbus.Boolean(True)] + values_to_set = [True] class TestDenyByDefaultCmd(TestPropertySetterCmd): command = command.DenyByDefault propname = "ApprovedByDefault" - values_to_set = [dbus.Boolean(False)] + values_to_set = [False] class TestPropertySetterValueCmd(TestPropertySetterCmd): """Abstract class for tests of PropertySetterValueCmd classes""" - def runTest(self): - if type(self) is TestPropertySetterValueCmd: - return - return super(TestPropertySetterValueCmd, self).runTest() - def run_command(self, value, clients): self.command(value).run(clients, self.bus)