891
891
# tests, which is run by doctest.
893
893
def test_rfc3339_zero_seconds(self):
894
self.assertEqual(string_to_delta("PT0S"),
895
datetime.timedelta())
894
self.assertEqual(datetime.timedelta(),
895
string_to_delta("PT0S"))
897
897
def test_rfc3339_zero_days(self):
898
self.assertEqual(string_to_delta("P0D"),
899
datetime.timedelta())
898
self.assertEqual(datetime.timedelta(), string_to_delta("P0D"))
901
900
def test_rfc3339_one_second(self):
902
self.assertEqual(string_to_delta("PT1S"),
903
datetime.timedelta(0, 1))
901
self.assertEqual(datetime.timedelta(0, 1),
902
string_to_delta("PT1S"))
905
904
def test_rfc3339_two_hours(self):
906
self.assertEqual(string_to_delta("PT2H"),
907
datetime.timedelta(0, 7200))
905
self.assertEqual(datetime.timedelta(0, 7200),
906
string_to_delta("PT2H"))
909
908
def test_falls_back_to_pre_1_6_1_with_warning(self):
910
909
with self.assertLogs(log, logging.WARNING):
911
910
value = string_to_delta("2h")
912
self.assertEqual(value, datetime.timedelta(0, 7200))
911
self.assertEqual(datetime.timedelta(0, 7200), value)
915
914
class Test_check_option_syntax(unittest.TestCase):
958
957
# Exit code from argparse is guaranteed to be "2". Reference:
959
958
# https://docs.python.org/3/library
960
959
# /argparse.html#exiting-methods
961
self.assertEqual(e.exception.code, 2)
960
self.assertEqual(2, e.exception.code)
964
963
@contextlib.contextmanager
965
964
def redirect_stderr_to_devnull():
966
null = os.open(os.path.devnull, os.O_RDWR)
967
stderrcopy = os.dup(sys.stderr.fileno())
968
os.dup2(null, sys.stderr.fileno())
974
os.dup2(stderrcopy, sys.stderr.fileno())
965
old_stderr = sys.stderr
966
with contextlib.closing(open(os.devnull, "w")) as null:
971
sys.stderr = old_stderr
977
973
def check_option_syntax(self, options):
978
974
check_option_syntax(self.parser, options)
1089
1085
def get_object(mockbus_self, busname, dbus_path):
1090
1086
# Note that "self" is still the testcase instance,
1091
1087
# this MockBus instance is in "mockbus_self".
1092
self.assertEqual(busname, dbus_busname)
1093
self.assertEqual(dbus_path, server_dbus_path)
1088
self.assertEqual(dbus_busname, busname)
1089
self.assertEqual(server_dbus_path, dbus_path)
1094
1090
mockbus_self.called = True
1095
1091
return mockbus_self
1108
1104
bus = get_mandos_dbus_object(bus=MockBusFailing())
1110
1106
if isinstance(e.exception.code, int):
1111
self.assertNotEqual(e.exception.code, 0)
1107
self.assertNotEqual(0, e.exception.code)
1113
1109
self.assertIsNotNone(e.exception.code)
1147
1143
dbus_logger.removeFilter(counting_handler)
1149
1145
# Make sure the dbus logger was suppressed
1150
self.assertEqual(counting_handler.count, 0)
1146
self.assertEqual(0, counting_handler.count)
1152
1148
# Test that the dbus_logger still works
1153
1149
with self.assertLogs(dbus_logger, logging.ERROR):
1154
1150
dbus_logger.error("Test")
1156
1152
if isinstance(e.exception.code, int):
1157
self.assertNotEqual(e.exception.code, 0)
1153
self.assertNotEqual(0, e.exception.code)
1159
1155
self.assertIsNotNone(e.exception.code)
1175
1171
options = self.parser.parse_args(args)
1176
1172
check_option_syntax(self.parser, options)
1177
1173
commands = commands_from_options(options)
1178
self.assertEqual(len(commands), 1)
1174
self.assertEqual(1, len(commands))
1179
1175
command = commands[0]
1180
1176
self.assertIsInstance(command, command_cls)
1181
1177
for key, value in cmd_attrs.items():
1182
self.assertEqual(getattr(command, key), value)
1178
self.assertEqual(value, getattr(command, key))
1184
1180
def test_is_enabled_short(self):
1185
1181
self.assert_command_from_args(["-V", "foo"],
1208
1204
check_option_syntax(self.parser, options)
1209
1205
commands = commands_from_options(options)
1210
self.assertEqual(len(commands), 2)
1206
self.assertEqual(2, len(commands))
1211
1207
self.assertIsInstance(commands[0], command.Deny)
1212
1208
self.assertIsInstance(commands[1], command.Remove)
1217
1213
check_option_syntax(self.parser, options)
1218
1214
commands = commands_from_options(options)
1219
self.assertEqual(len(commands), 2)
1215
self.assertEqual(2, len(commands))
1220
1216
self.assertIsInstance(commands[0], command.Deny)
1221
1217
self.assertIsInstance(commands[1], command.Remove)
1382
1378
self.attributes["Name"] = name
1383
1379
self.calls = []
1384
1380
def Set(self, interface, propname, value, dbus_interface):
1385
testcase.assertEqual(interface, client_dbus_interface)
1386
testcase.assertEqual(dbus_interface,
1387
dbus.PROPERTIES_IFACE)
1381
testcase.assertEqual(client_dbus_interface, interface)
1382
testcase.assertEqual(dbus.PROPERTIES_IFACE,
1388
1384
self.attributes[propname] = value
1389
1385
def Approve(self, approve, dbus_interface):
1390
testcase.assertEqual(dbus_interface,
1391
client_dbus_interface)
1386
testcase.assertEqual(client_dbus_interface,
1392
1388
self.calls.append(("Approve", (approve,
1393
1389
dbus_interface)))
1394
1390
self.client = MockClient(
1451
1447
class Bus(object):
1453
1449
def get_object(client_bus_name, path):
1454
self.assertEqual(client_bus_name, dbus_busname)
1450
self.assertEqual(dbus_busname, client_bus_name)
1456
1452
# Note: "self" here is the TestCmd instance, not
1457
1453
# the Bus instance, since this is a static method!
1464
1460
class TestBaseCommands(TestCommand):
1466
def test_IsEnabled(self):
1467
self.assertTrue(all(command.IsEnabled().is_enabled(client,
1469
for client, properties
1470
in self.clients.items()))
1472
def test_IsEnabled_run_exits_successfully(self):
1462
def test_IsEnabled_exits_successfully(self):
1473
1463
with self.assertRaises(SystemExit) as e:
1474
1464
command.IsEnabled().run(self.one_client)
1475
1465
if e.exception.code is not None:
1476
self.assertEqual(e.exception.code, 0)
1466
self.assertEqual(0, e.exception.code)
1478
1468
self.assertIsNone(e.exception.code)
1480
def test_IsEnabled_run_exits_with_failure(self):
1470
def test_IsEnabled_exits_with_failure(self):
1481
1471
self.client.attributes["Enabled"] = dbus.Boolean(False)
1482
1472
with self.assertRaises(SystemExit) as e:
1483
1473
command.IsEnabled().run(self.one_client)
1484
1474
if isinstance(e.exception.code, int):
1485
self.assertNotEqual(e.exception.code, 0)
1475
self.assertNotEqual(0, e.exception.code)
1487
1477
self.assertIsNotNone(e.exception.code)
1566
1556
def test_DumpJSON_normal(self):
1567
output = command.DumpJSON().output(self.clients.values())
1568
json_data = json.loads(output)
1569
self.assertDictEqual(json_data, self.expected_json)
1557
with self.capture_stdout_to_buffer() as buffer:
1558
command.DumpJSON().run(self.clients)
1559
json_data = json.loads(buffer.getvalue())
1560
self.assertDictEqual(self.expected_json, json_data)
1563
@contextlib.contextmanager
1564
def capture_stdout_to_buffer():
1565
capture_buffer = io.StringIO()
1566
old_stdout = sys.stdout
1567
sys.stdout = capture_buffer
1569
yield capture_buffer
1571
sys.stdout = old_stdout
1571
1573
def test_DumpJSON_one_client(self):
1572
output = command.DumpJSON().output(self.one_client.values())
1573
json_data = json.loads(output)
1574
with self.capture_stdout_to_buffer() as buffer:
1575
command.DumpJSON().run(self.one_client)
1576
json_data = json.loads(buffer.getvalue())
1574
1577
expected_json = {"foo": self.expected_json["foo"]}
1575
self.assertDictEqual(json_data, expected_json)
1578
self.assertDictEqual(expected_json, json_data)
1577
1580
def test_PrintTable_normal(self):
1578
output = command.PrintTable().output(self.clients.values())
1581
with self.capture_stdout_to_buffer() as buffer:
1582
command.PrintTable().run(self.clients)
1579
1583
expected_output = "\n".join((
1580
1584
"Name Enabled Timeout Last Successful Check",
1581
1585
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1582
1586
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1584
self.assertEqual(output, expected_output)
1588
self.assertEqual(expected_output, buffer.getvalue())
1586
1590
def test_PrintTable_verbose(self):
1587
output = command.PrintTable(verbose=True).output(
1588
self.clients.values())
1591
with self.capture_stdout_to_buffer() as buffer:
1592
command.PrintTable(verbose=True).run(self.clients)
1675
1679
num_lines = max(len(rows) for rows in columns)
1676
expected_output = "\n".join("".join(rows[line]
1677
for rows in columns)
1678
for line in range(num_lines))
1679
self.assertEqual(output, expected_output)
1680
expected_output = ("\n".join("".join(rows[line]
1681
for rows in columns)
1682
for line in range(num_lines))
1684
self.assertEqual(expected_output, buffer.getvalue())
1681
1686
def test_PrintTable_one_client(self):
1682
output = command.PrintTable().output(self.one_client.values())
1687
with self.capture_stdout_to_buffer() as buffer:
1688
command.PrintTable().run(self.one_client)
1683
1689
expected_output = "\n".join((
1684
1690
"Name Enabled Timeout Last Successful Check",
1685
1691
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1687
self.assertEqual(output, expected_output)
1693
self.assertEqual(expected_output, buffer.getvalue())
1690
1696
class TestPropertyCmd(TestCommand):
1705
1711
client = self.bus.get_object(dbus_busname, clientpath)
1706
1712
value = client.attributes[self.propname]
1707
1713
self.assertNotIsInstance(value, self.Unique)
1708
self.assertEqual(value, value_to_get)
1714
self.assertEqual(value_to_get, value)
1710
1716
class Unique(object):
1711
1717
"""Class for objects which exist only to be unique objects,