/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-16 04:32:51 UTC
  • mto: This revision was merged to the branch mainline in revision 382.
  • Revision ID: teddy@recompile.se-20190316043251-lqkgv164b7x6bsj5
mandos-ctl: Refactor tests

* mandos-ctl (TestCaseWithAssertLogs): New.
  (Test_string_to_delta): Inherit from TestCaseWithAssertLogs and
                          simply use self.assertLogs() when needed.
  (Test_get_mandos_dbus_object): - '' -
  (Test_get_mandos_dbus_object.CriticalFilter): Remove.
  (Test_get_managed_objects): - '' - and use a CountingHandler instead
                              of a CountingFilter.
  (Test_get_managed_objects.CriticalFilter): Remove.
  (Test_SilenceLogger): Remove.

Show diffs side-by-side

added added

removed removed

Lines of Context:
93
93
    if options.debug:
94
94
        log.setLevel(logging.DEBUG)
95
95
 
96
 
    try:
97
 
        bus = dbus.SystemBus()
98
 
        log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
99
 
                  dbus_busname, server_dbus_path)
100
 
        mandos_dbus_objc = bus.get_object(dbus_busname,
101
 
                                          server_dbus_path)
102
 
    except dbus.exceptions.DBusException:
103
 
        log.critical("Could not connect to Mandos server")
104
 
        sys.exit(1)
105
 
 
106
 
    mandos_serv = dbus.Interface(mandos_dbus_objc,
107
 
                                 dbus_interface=server_dbus_interface)
 
96
    bus = dbus.SystemBus()
 
97
 
 
98
    mandos_dbus_object = get_mandos_dbus_object(bus)
 
99
 
 
100
    mandos_serv = dbus.Interface(
 
101
        mandos_dbus_object, dbus_interface=server_dbus_interface)
108
102
    mandos_serv_object_manager = dbus.Interface(
109
 
        mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
110
 
 
111
 
    try:
112
 
        log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
113
 
                  server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
114
 
        with SilenceLogger("dbus.proxies"):
115
 
            all_clients = {path: ifs_and_props[client_dbus_interface]
116
 
                              for path, ifs_and_props in
117
 
                              mandos_serv_object_manager
118
 
                              .GetManagedObjects().items()
119
 
                              if client_dbus_interface in ifs_and_props}
120
 
    except dbus.exceptions.DBusException as e:
121
 
        log.critical("Failed to access Mandos server through D-Bus:"
122
 
                     "\n%s", e)
123
 
        sys.exit(1)
124
 
 
125
 
    # Compile dict of (clients: properties) to process
126
 
    clients = {}
127
 
 
 
103
        mandos_dbus_object, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
 
104
 
 
105
    managed_objects = get_managed_objects(mandos_serv_object_manager)
 
106
 
 
107
    all_clients = {}
 
108
    for path, ifs_and_props in managed_objects.items():
 
109
        try:
 
110
            all_clients[path] = ifs_and_props[client_dbus_interface]
 
111
        except KeyError:
 
112
            pass
 
113
 
 
114
    # Compile dict of (clientpath: properties) to process
128
115
    if not clientnames:
129
116
        clients = all_clients
130
117
    else:
 
118
        clients = {}
131
119
        for name in clientnames:
132
120
            for objpath, properties in all_clients.items():
133
121
                if properties["Name"] == name:
436
424
        options.remove = True
437
425
 
438
426
 
 
427
def get_mandos_dbus_object(bus):
 
428
    log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
 
429
              dbus_busname, server_dbus_path)
 
430
    with if_dbus_exception_log_with_exception_and_exit(
 
431
            "Could not connect to Mandos server: %s"):
 
432
        mandos_dbus_object = bus.get_object(dbus_busname,
 
433
                                            server_dbus_path)
 
434
    return mandos_dbus_object
 
435
 
 
436
 
 
437
@contextlib.contextmanager
 
438
def if_dbus_exception_log_with_exception_and_exit(*args, **kwargs):
 
439
    try:
 
440
        yield
 
441
    except dbus.exceptions.DBusException as e:
 
442
        log.critical(*(args + (e,)), **kwargs)
 
443
        sys.exit(1)
 
444
 
 
445
 
 
446
def get_managed_objects(object_manager):
 
447
    log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
 
448
              server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
 
449
    with if_dbus_exception_log_with_exception_and_exit(
 
450
            "Failed to access Mandos server through D-Bus:\n%s"):
 
451
        with SilenceLogger("dbus.proxies"):
 
452
            managed_objects = object_manager.GetManagedObjects()
 
453
    return managed_objects
 
454
 
 
455
 
439
456
class SilenceLogger(object):
440
457
    "Simple context manager to silence a particular logger"
441
458
    def __init__(self, loggername):
822
839
 
823
840
 
824
841
 
825
 
class Test_string_to_delta(unittest.TestCase):
 
842
class TestCaseWithAssertLogs(unittest.TestCase):
 
843
    """unittest.TestCase.assertLogs only exists in Python 3.4"""
 
844
 
 
845
    if not hasattr(unittest.TestCase, "assertLogs"):
 
846
        @contextlib.contextmanager
 
847
        def assertLogs(self, logger, level=logging.INFO):
 
848
            capturing_handler = self.CapturingLevelHandler(level)
 
849
            old_level = logger.level
 
850
            old_propagate = logger.propagate
 
851
            logger.addHandler(capturing_handler)
 
852
            logger.setLevel(level)
 
853
            logger.propagate = False
 
854
            try:
 
855
                yield capturing_handler.watcher
 
856
            finally:
 
857
                logger.propagate = old_propagate
 
858
                logger.removeHandler(capturing_handler)
 
859
                logger.setLevel(old_level)
 
860
            self.assertGreater(len(capturing_handler.watcher.records),
 
861
                               0)
 
862
 
 
863
        class CapturingLevelHandler(logging.Handler):
 
864
            def __init__(self, level, *args, **kwargs):
 
865
                logging.Handler.__init__(self, *args, **kwargs)
 
866
                self.watcher = self.LoggingWatcher([], [])
 
867
            def emit(self, record):
 
868
                self.watcher.records.append(record)
 
869
                self.watcher.output.append(self.format(record))
 
870
 
 
871
            LoggingWatcher = collections.namedtuple("LoggingWatcher",
 
872
                                                    ("records",
 
873
                                                     "output"))
 
874
 
 
875
class Test_string_to_delta(TestCaseWithAssertLogs):
826
876
    def test_handles_basic_rfc3339(self):
827
877
        self.assertEqual(string_to_delta("PT0S"),
828
878
                         datetime.timedelta())
834
884
                         datetime.timedelta(0, 7200))
835
885
 
836
886
    def test_falls_back_to_pre_1_6_1_with_warning(self):
837
 
        # assertLogs only exists in Python 3.4
838
 
        if hasattr(self, "assertLogs"):
839
 
            with self.assertLogs(log, logging.WARNING):
840
 
                value = string_to_delta("2h")
841
 
        else:
842
 
            class WarningFilter(logging.Filter):
843
 
                """Don't show, but record the presence of, warnings"""
844
 
                def filter(self, record):
845
 
                    is_warning = record.levelno >= logging.WARNING
846
 
                    self.found = is_warning or getattr(self, "found",
847
 
                                                       False)
848
 
                    return not is_warning
849
 
            warning_filter = WarningFilter()
850
 
            log.addFilter(warning_filter)
851
 
            try:
852
 
                value = string_to_delta("2h")
853
 
            finally:
854
 
                log.removeFilter(warning_filter)
855
 
            self.assertTrue(getattr(warning_filter, "found", False))
 
887
        with self.assertLogs(log, logging.WARNING):
 
888
            value = string_to_delta("2h")
856
889
        self.assertEqual(value, datetime.timedelta(0, 7200))
857
890
 
858
891
 
988
1021
                self.check_option_syntax(options)
989
1022
 
990
1023
 
991
 
class Test_SilenceLogger(unittest.TestCase):
992
 
    loggername = "mandos-ctl.Test_SilenceLogger"
993
 
    log = logging.getLogger(loggername)
994
 
    log.propagate = False
995
 
    log.addHandler(logging.NullHandler())
996
 
 
997
 
    def setUp(self):
998
 
        self.counting_filter = self.CountingFilter()
999
 
 
1000
 
    class CountingFilter(logging.Filter):
1001
 
        "Count number of records"
1002
 
        count = 0
1003
 
        def filter(self, record):
1004
 
            self.count += 1
1005
 
            return True
1006
 
 
1007
 
    def test_should_filter_records_only_when_active(self):
 
1024
class Test_get_mandos_dbus_object(TestCaseWithAssertLogs):
 
1025
    def test_calls_and_returns_get_object_on_bus(self):
 
1026
        class MockBus(object):
 
1027
            called = False
 
1028
            def get_object(mockbus_self, busname, dbus_path):
 
1029
                # Note that "self" is still the testcase instance,
 
1030
                # this MockBus instance is in "mockbus_self".
 
1031
                self.assertEqual(busname, dbus_busname)
 
1032
                self.assertEqual(dbus_path, server_dbus_path)
 
1033
                mockbus_self.called = True
 
1034
                return mockbus_self
 
1035
 
 
1036
        mockbus = get_mandos_dbus_object(bus=MockBus())
 
1037
        self.assertIsInstance(mockbus, MockBus)
 
1038
        self.assertTrue(mockbus.called)
 
1039
 
 
1040
    def test_logs_and_exits_on_dbus_error(self):
 
1041
        class MockBusFailing(object):
 
1042
            def get_object(self, busname, dbus_path):
 
1043
                raise dbus.exceptions.DBusException("Test")
 
1044
 
 
1045
        with self.assertLogs(log, logging.CRITICAL):
 
1046
            with self.assertRaises(SystemExit) as e:
 
1047
                bus = get_mandos_dbus_object(bus=MockBusFailing())
 
1048
 
 
1049
        if isinstance(e.exception.code, int):
 
1050
            self.assertNotEqual(e.exception.code, 0)
 
1051
        else:
 
1052
            self.assertIsNotNone(e.exception.code)
 
1053
 
 
1054
 
 
1055
class Test_get_managed_objects(TestCaseWithAssertLogs):
 
1056
    def test_calls_and_returns_GetManagedObjects(self):
 
1057
        managed_objects = {"/clients/foo": { "Name": "foo"}}
 
1058
        class MockObjectManager(object):
 
1059
            @staticmethod
 
1060
            def GetManagedObjects():
 
1061
                return managed_objects
 
1062
        retval = get_managed_objects(MockObjectManager())
 
1063
        self.assertDictEqual(managed_objects, retval)
 
1064
 
 
1065
    def test_logs_and_exits_on_dbus_error(self):
 
1066
        dbus_logger = logging.getLogger("dbus.proxies")
 
1067
 
 
1068
        class MockObjectManagerFailing(object):
 
1069
            @staticmethod
 
1070
            def GetManagedObjects():
 
1071
                dbus_logger.error("Test")
 
1072
                raise dbus.exceptions.DBusException("Test")
 
1073
 
 
1074
        class CountingHandler(logging.Handler):
 
1075
            count = 0
 
1076
            def emit(self, record):
 
1077
                self.count += 1
 
1078
 
 
1079
        counting_handler = CountingHandler()
 
1080
 
 
1081
        dbus_logger.addHandler(counting_handler)
 
1082
 
1008
1083
        try:
1009
 
            with SilenceLogger(self.loggername):
1010
 
                self.log.addFilter(self.counting_filter)
1011
 
                self.log.info("Filtered log message 1")
1012
 
            self.log.info("Non-filtered message 2")
1013
 
            self.log.info("Non-filtered message 3")
 
1084
            with self.assertLogs(log, logging.CRITICAL) as watcher:
 
1085
                with self.assertRaises(SystemExit) as e:
 
1086
                    get_managed_objects(MockObjectManagerFailing())
1014
1087
        finally:
1015
 
            self.log.removeFilter(self.counting_filter)
1016
 
        self.assertEqual(self.counting_filter.count, 2)
 
1088
            dbus_logger.removeFilter(counting_handler)
 
1089
        self.assertEqual(counting_handler.count, 0)
 
1090
 
 
1091
        # Test that the dbus_logger still works
 
1092
        with self.assertLogs(dbus_logger, logging.ERROR):
 
1093
            dbus_logger.error("Test")
 
1094
 
 
1095
        if isinstance(e.exception.code, int):
 
1096
            self.assertNotEqual(e.exception.code, 0)
 
1097
        else:
 
1098
            self.assertIsNotNone(e.exception.code)
1017
1099
 
1018
1100
 
1019
1101
class Test_commands_from_options(unittest.TestCase):
1727
1809
    return tests
1728
1810
 
1729
1811
if __name__ == "__main__":
1730
 
    if should_only_run_tests():
1731
 
        # Call using ./tdd-python-script --check [--verbose]
1732
 
        unittest.main()
1733
 
    else:
1734
 
        main()
 
1812
    try:
 
1813
        if should_only_run_tests():
 
1814
            # Call using ./tdd-python-script --check [--verbose]
 
1815
            unittest.main()
 
1816
        else:
 
1817
            main()
 
1818
    finally:
 
1819
        logging.shutdown()