842
class Test_string_to_delta(unittest.TestCase):
842
class TestCaseWithAssertLogs(unittest.TestCase):
843
"""unittest.TestCase.assertLogs only exists in Python 3.4"""
845
if not hasattr(unittest.TestCase, "assertLogs"):
846
@contextlib.contextmanager
847
def assertLogs(self, logger, level=logging.INFO):
848
capturing_handler = self.CapturingLevelHandler(level)
849
old_level = logger.level
850
old_propagate = logger.propagate
851
logger.addHandler(capturing_handler)
852
logger.setLevel(level)
853
logger.propagate = False
855
yield capturing_handler.watcher
857
logger.propagate = old_propagate
858
logger.removeHandler(capturing_handler)
859
logger.setLevel(old_level)
860
self.assertGreater(len(capturing_handler.watcher.records),
863
class CapturingLevelHandler(logging.Handler):
864
def __init__(self, level, *args, **kwargs):
865
logging.Handler.__init__(self, *args, **kwargs)
866
self.watcher = self.LoggingWatcher([], [])
867
def emit(self, record):
868
self.watcher.records.append(record)
869
self.watcher.output.append(self.format(record))
871
LoggingWatcher = collections.namedtuple("LoggingWatcher",
875
class Test_string_to_delta(TestCaseWithAssertLogs):
843
876
def test_handles_basic_rfc3339(self):
844
877
self.assertEqual(string_to_delta("PT0S"),
845
878
datetime.timedelta())
851
884
datetime.timedelta(0, 7200))
853
886
def test_falls_back_to_pre_1_6_1_with_warning(self):
854
# assertLogs only exists in Python 3.4
855
if hasattr(self, "assertLogs"):
856
with self.assertLogs(log, logging.WARNING):
857
value = string_to_delta("2h")
859
class WarningFilter(logging.Filter):
860
"""Don't show, but record the presence of, warnings"""
861
def filter(self, record):
862
is_warning = record.levelno >= logging.WARNING
863
self.found = is_warning or getattr(self, "found",
865
return not is_warning
866
warning_filter = WarningFilter()
867
log.addFilter(warning_filter)
869
value = string_to_delta("2h")
871
log.removeFilter(warning_filter)
872
self.assertTrue(getattr(warning_filter, "found", False))
887
with self.assertLogs(log, logging.WARNING):
888
value = string_to_delta("2h")
873
889
self.assertEqual(value, datetime.timedelta(0, 7200))
1026
1042
def get_object(self, busname, dbus_path):
1027
1043
raise dbus.exceptions.DBusException("Test")
1029
# assertLogs only exists in Python 3.4
1030
if hasattr(self, "assertLogs"):
1031
with self.assertLogs(log, logging.CRITICAL):
1032
with self.assertRaises(SystemExit) as e:
1033
bus = get_mandos_dbus_object(bus=MockBus())
1035
critical_filter = self.CriticalFilter()
1036
log.addFilter(critical_filter)
1038
with self.assertRaises(SystemExit) as e:
1039
get_mandos_dbus_object(bus=MockBusFailing())
1041
log.removeFilter(critical_filter)
1042
self.assertTrue(critical_filter.found)
1045
with self.assertLogs(log, logging.CRITICAL):
1046
with self.assertRaises(SystemExit) as e:
1047
bus = get_mandos_dbus_object(bus=MockBusFailing())
1043
1049
if isinstance(e.exception.code, int):
1044
1050
self.assertNotEqual(e.exception.code, 0)
1046
1052
self.assertIsNotNone(e.exception.code)
1048
class CriticalFilter(logging.Filter):
1049
"""Don't show, but register, critical messages"""
1051
def filter(self, record):
1052
is_critical = record.levelno >= logging.CRITICAL
1053
self.found = is_critical or self.found
1054
return not is_critical
1057
class Test_get_managed_objects(unittest.TestCase):
1055
class Test_get_managed_objects(TestCaseWithAssertLogs):
1058
1056
def test_calls_and_returns_GetManagedObjects(self):
1059
1057
managed_objects = {"/clients/foo": { "Name": "foo"}}
1060
1058
class MockObjectManager(object):
1065
1063
self.assertDictEqual(managed_objects, retval)
1067
1065
def test_logs_and_exits_on_dbus_error(self):
1066
dbus_logger = logging.getLogger("dbus.proxies")
1068
1068
class MockObjectManagerFailing(object):
1070
1070
def GetManagedObjects():
1071
dbus_logger.error("Test")
1071
1072
raise dbus.exceptions.DBusException("Test")
1073
if hasattr(self, "assertLogs"):
1074
with self.assertLogs(log, logging.CRITICAL):
1075
with self.assertRaises(SystemExit):
1076
get_managed_objects(MockObjectManagerFailing())
1078
critical_filter = self.CriticalFilter()
1079
log.addFilter(critical_filter)
1074
class CountingHandler(logging.Handler):
1076
def emit(self, record):
1079
counting_handler = CountingHandler()
1081
dbus_logger.addHandler(counting_handler)
1084
with self.assertLogs(log, logging.CRITICAL) as watcher:
1081
1085
with self.assertRaises(SystemExit) as e:
1082
1086
get_managed_objects(MockObjectManagerFailing())
1084
log.removeFilter(critical_filter)
1085
self.assertTrue(critical_filter.found)
1088
dbus_logger.removeFilter(counting_handler)
1089
self.assertEqual(counting_handler.count, 0)
1091
# Test that the dbus_logger still works
1092
with self.assertLogs(dbus_logger, logging.ERROR):
1093
dbus_logger.error("Test")
1086
1095
if isinstance(e.exception.code, int):
1087
1096
self.assertNotEqual(e.exception.code, 0)
1089
1098
self.assertIsNotNone(e.exception.code)
1091
class CriticalFilter(logging.Filter):
1092
"""Don't show, but register, critical messages"""
1094
def filter(self, record):
1095
is_critical = record.levelno >= logging.CRITICAL
1096
self.found = is_critical or self.found
1097
return not is_critical
1100
class Test_SilenceLogger(unittest.TestCase):
1101
loggername = "mandos-ctl.Test_SilenceLogger"
1102
log = logging.getLogger(loggername)
1103
log.propagate = False
1104
log.addHandler(logging.NullHandler())
1107
self.counting_filter = self.CountingFilter()
1109
class CountingFilter(logging.Filter):
1110
"Count number of records"
1112
def filter(self, record):
1116
def test_should_filter_records_only_when_active(self):
1118
with SilenceLogger(self.loggername):
1119
self.log.addFilter(self.counting_filter)
1120
self.log.info("Filtered log message 1")
1121
self.log.info("Non-filtered message 2")
1122
self.log.info("Non-filtered message 3")
1124
self.log.removeFilter(self.counting_filter)
1125
self.assertEqual(self.counting_filter.count, 2)
1128
1101
class Test_commands_from_options(unittest.TestCase):
1129
1102
def setUp(self):