=== modified file 'mandos-ctl' --- mandos-ctl 2019-03-15 19:59:49 +0000 +++ mandos-ctl 2019-03-15 21:59:44 +0000 @@ -93,20 +93,14 @@ if options.debug: log.setLevel(logging.DEBUG) - try: - bus = dbus.SystemBus() - log.debug("D-Bus: Connect to: (busname=%r, path=%r)", - dbus_busname, server_dbus_path) - mandos_dbus_objc = bus.get_object(dbus_busname, - server_dbus_path) - except dbus.exceptions.DBusException: - log.critical("Could not connect to Mandos server") - sys.exit(1) - - mandos_serv = dbus.Interface(mandos_dbus_objc, - dbus_interface=server_dbus_interface) + bus = dbus.SystemBus() + + mandos_dbus_object = get_mandos_dbus_object(bus) + + mandos_serv = dbus.Interface( + mandos_dbus_object, dbus_interface=server_dbus_interface) mandos_serv_object_manager = dbus.Interface( - mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE) + mandos_dbus_object, dbus_interface=dbus.OBJECT_MANAGER_IFACE) try: log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname, @@ -436,6 +430,19 @@ options.remove = True +def get_mandos_dbus_object(bus): + try: + log.debug("D-Bus: Connect to: (busname=%r, path=%r)", + dbus_busname, server_dbus_path) + mandos_dbus_object = bus.get_object(dbus_busname, + server_dbus_path) + except dbus.exceptions.DBusException: + log.critical("Could not connect to Mandos server") + sys.exit(1) + + return mandos_dbus_object + + class SilenceLogger(object): "Simple context manager to silence a particular logger" def __init__(self, loggername): @@ -988,6 +995,55 @@ self.check_option_syntax(options) +class Test_get_mandos_dbus_object(unittest.TestCase): + def test_calls_and_returns_get_object_on_bus(self): + class MockBus(object): + called = False + def get_object(mockbus_self, busname, dbus_path): + # Note that "self" is still the testcase instance, + # this MockBus instance is in "mockbus_self". + self.assertEqual(busname, dbus_busname) + self.assertEqual(dbus_path, server_dbus_path) + mockbus_self.called = True + return mockbus_self + + mockbus = get_mandos_dbus_object(bus=MockBus()) + self.assertIsInstance(mockbus, MockBus) + self.assertTrue(mockbus.called) + + def test_logs_and_exits_on_dbus_error(self): + class MockBusFailing(object): + def get_object(self, busname, dbus_path): + raise dbus.exceptions.DBusException("Test") + + # assertLogs only exists in Python 3.4 + if hasattr(self, "assertLogs"): + with self.assertLogs(log, logging.CRITICAL): + with self.assertRaises(SystemExit) as e: + bus = get_mandos_dbus_object(bus=MockBus()) + else: + critical_filter = self.CriticalFilter() + log.addFilter(critical_filter) + try: + with self.assertRaises(SystemExit) as e: + get_mandos_dbus_object(bus=MockBusFailing()) + finally: + log.removeFilter(critical_filter) + self.assertTrue(critical_filter.found) + if isinstance(e.exception.code, int): + self.assertNotEqual(e.exception.code, 0) + else: + self.assertIsNotNone(e.exception.code) + + class CriticalFilter(logging.Filter): + """Don't show, but register, critical messages""" + found = False + def filter(self, record): + is_critical = record.levelno >= logging.CRITICAL + self.found = is_critical or self.found + return not is_critical + + class Test_SilenceLogger(unittest.TestCase): loggername = "mandos-ctl.Test_SilenceLogger" log = logging.getLogger(loggername)