891
891
# tests, which is run by doctest.
893
893
def test_rfc3339_zero_seconds(self):
894
self.assertEqual(datetime.timedelta(),
895
string_to_delta("PT0S"))
894
self.assertEqual(string_to_delta("PT0S"),
895
datetime.timedelta())
897
897
def test_rfc3339_zero_days(self):
898
self.assertEqual(datetime.timedelta(), string_to_delta("P0D"))
898
self.assertEqual(string_to_delta("P0D"),
899
datetime.timedelta())
900
901
def test_rfc3339_one_second(self):
901
self.assertEqual(datetime.timedelta(0, 1),
902
string_to_delta("PT1S"))
902
self.assertEqual(string_to_delta("PT1S"),
903
datetime.timedelta(0, 1))
904
905
def test_rfc3339_two_hours(self):
905
self.assertEqual(datetime.timedelta(0, 7200),
906
string_to_delta("PT2H"))
906
self.assertEqual(string_to_delta("PT2H"),
907
datetime.timedelta(0, 7200))
908
909
def test_falls_back_to_pre_1_6_1_with_warning(self):
909
910
with self.assertLogs(log, logging.WARNING):
910
911
value = string_to_delta("2h")
911
self.assertEqual(datetime.timedelta(0, 7200), value)
912
self.assertEqual(value, datetime.timedelta(0, 7200))
914
915
class Test_check_option_syntax(unittest.TestCase):
957
958
# Exit code from argparse is guaranteed to be "2". Reference:
958
959
# https://docs.python.org/3/library
959
960
# /argparse.html#exiting-methods
960
self.assertEqual(2, e.exception.code)
961
self.assertEqual(e.exception.code, 2)
963
964
@contextlib.contextmanager
964
965
def redirect_stderr_to_devnull():
965
old_stderr = sys.stderr
966
with contextlib.closing(open(os.devnull, "w")) as null:
971
sys.stderr = old_stderr
966
null = os.open(os.path.devnull, os.O_RDWR)
967
stderrcopy = os.dup(sys.stderr.fileno())
968
os.dup2(null, sys.stderr.fileno())
974
os.dup2(stderrcopy, sys.stderr.fileno())
973
977
def check_option_syntax(self, options):
974
978
check_option_syntax(self.parser, options)
1085
1089
def get_object(mockbus_self, busname, dbus_path):
1086
1090
# Note that "self" is still the testcase instance,
1087
1091
# this MockBus instance is in "mockbus_self".
1088
self.assertEqual(dbus_busname, busname)
1089
self.assertEqual(server_dbus_path, dbus_path)
1092
self.assertEqual(busname, dbus_busname)
1093
self.assertEqual(dbus_path, server_dbus_path)
1090
1094
mockbus_self.called = True
1091
1095
return mockbus_self
1104
1108
bus = get_mandos_dbus_object(bus=MockBusFailing())
1106
1110
if isinstance(e.exception.code, int):
1107
self.assertNotEqual(0, e.exception.code)
1111
self.assertNotEqual(e.exception.code, 0)
1109
1113
self.assertIsNotNone(e.exception.code)
1143
1147
dbus_logger.removeFilter(counting_handler)
1145
1149
# Make sure the dbus logger was suppressed
1146
self.assertEqual(0, counting_handler.count)
1150
self.assertEqual(counting_handler.count, 0)
1148
1152
# Test that the dbus_logger still works
1149
1153
with self.assertLogs(dbus_logger, logging.ERROR):
1150
1154
dbus_logger.error("Test")
1152
1156
if isinstance(e.exception.code, int):
1153
self.assertNotEqual(0, e.exception.code)
1157
self.assertNotEqual(e.exception.code, 0)
1155
1159
self.assertIsNotNone(e.exception.code)
1171
1175
options = self.parser.parse_args(args)
1172
1176
check_option_syntax(self.parser, options)
1173
1177
commands = commands_from_options(options)
1174
self.assertEqual(1, len(commands))
1178
self.assertEqual(len(commands), 1)
1175
1179
command = commands[0]
1176
1180
self.assertIsInstance(command, command_cls)
1177
1181
for key, value in cmd_attrs.items():
1178
self.assertEqual(value, getattr(command, key))
1182
self.assertEqual(getattr(command, key), value)
1180
1184
def test_is_enabled_short(self):
1181
1185
self.assert_command_from_args(["-V", "foo"],
1204
1208
check_option_syntax(self.parser, options)
1205
1209
commands = commands_from_options(options)
1206
self.assertEqual(2, len(commands))
1210
self.assertEqual(len(commands), 2)
1207
1211
self.assertIsInstance(commands[0], command.Deny)
1208
1212
self.assertIsInstance(commands[1], command.Remove)
1213
1217
check_option_syntax(self.parser, options)
1214
1218
commands = commands_from_options(options)
1215
self.assertEqual(2, len(commands))
1219
self.assertEqual(len(commands), 2)
1216
1220
self.assertIsInstance(commands[0], command.Deny)
1217
1221
self.assertIsInstance(commands[1], command.Remove)
1378
1382
self.attributes["Name"] = name
1379
1383
self.calls = []
1380
1384
def Set(self, interface, propname, value, dbus_interface):
1381
testcase.assertEqual(client_dbus_interface, interface)
1382
testcase.assertEqual(dbus.PROPERTIES_IFACE,
1385
testcase.assertEqual(interface, client_dbus_interface)
1386
testcase.assertEqual(dbus_interface,
1387
dbus.PROPERTIES_IFACE)
1384
1388
self.attributes[propname] = value
1385
1389
def Approve(self, approve, dbus_interface):
1386
testcase.assertEqual(client_dbus_interface,
1390
testcase.assertEqual(dbus_interface,
1391
client_dbus_interface)
1388
1392
self.calls.append(("Approve", (approve,
1389
1393
dbus_interface)))
1390
1394
self.client = MockClient(
1447
1451
class Bus(object):
1449
1453
def get_object(client_bus_name, path):
1450
self.assertEqual(dbus_busname, client_bus_name)
1454
self.assertEqual(client_bus_name, dbus_busname)
1452
1456
# Note: "self" here is the TestCmd instance, not
1453
1457
# the Bus instance, since this is a static method!
1460
1464
class TestBaseCommands(TestCommand):
1462
def test_IsEnabled_exits_successfully(self):
1466
def test_IsEnabled(self):
1467
self.assertTrue(all(command.IsEnabled().is_enabled(client,
1469
for client, properties
1470
in self.clients.items()))
1472
def test_IsEnabled_run_exits_successfully(self):
1463
1473
with self.assertRaises(SystemExit) as e:
1464
1474
command.IsEnabled().run(self.one_client)
1465
1475
if e.exception.code is not None:
1466
self.assertEqual(0, e.exception.code)
1476
self.assertEqual(e.exception.code, 0)
1468
1478
self.assertIsNone(e.exception.code)
1470
def test_IsEnabled_exits_with_failure(self):
1480
def test_IsEnabled_run_exits_with_failure(self):
1471
1481
self.client.attributes["Enabled"] = dbus.Boolean(False)
1472
1482
with self.assertRaises(SystemExit) as e:
1473
1483
command.IsEnabled().run(self.one_client)
1474
1484
if isinstance(e.exception.code, int):
1475
self.assertNotEqual(0, e.exception.code)
1485
self.assertNotEqual(e.exception.code, 0)
1477
1487
self.assertIsNotNone(e.exception.code)
1556
1566
def test_DumpJSON_normal(self):
1557
with self.capture_stdout_to_buffer() as buffer:
1558
command.DumpJSON().run(self.clients)
1559
json_data = json.loads(buffer.getvalue())
1560
self.assertDictEqual(self.expected_json, json_data)
1563
@contextlib.contextmanager
1564
def capture_stdout_to_buffer():
1565
capture_buffer = io.StringIO()
1566
old_stdout = sys.stdout
1567
sys.stdout = capture_buffer
1569
yield capture_buffer
1571
sys.stdout = old_stdout
1567
output = command.DumpJSON().output(self.clients.values())
1568
json_data = json.loads(output)
1569
self.assertDictEqual(json_data, self.expected_json)
1573
1571
def test_DumpJSON_one_client(self):
1574
with self.capture_stdout_to_buffer() as buffer:
1575
command.DumpJSON().run(self.one_client)
1576
json_data = json.loads(buffer.getvalue())
1572
output = command.DumpJSON().output(self.one_client.values())
1573
json_data = json.loads(output)
1577
1574
expected_json = {"foo": self.expected_json["foo"]}
1578
self.assertDictEqual(expected_json, json_data)
1575
self.assertDictEqual(json_data, expected_json)
1580
1577
def test_PrintTable_normal(self):
1581
with self.capture_stdout_to_buffer() as buffer:
1582
command.PrintTable().run(self.clients)
1578
output = command.PrintTable().output(self.clients.values())
1583
1579
expected_output = "\n".join((
1584
1580
"Name Enabled Timeout Last Successful Check",
1585
1581
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1586
1582
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1588
self.assertEqual(expected_output, buffer.getvalue())
1584
self.assertEqual(output, expected_output)
1590
1586
def test_PrintTable_verbose(self):
1591
with self.capture_stdout_to_buffer() as buffer:
1592
command.PrintTable(verbose=True).run(self.clients)
1587
output = command.PrintTable(verbose=True).output(
1588
self.clients.values())
1679
1675
num_lines = max(len(rows) for rows in columns)
1680
expected_output = ("\n".join("".join(rows[line]
1681
for rows in columns)
1682
for line in range(num_lines))
1684
self.assertEqual(expected_output, buffer.getvalue())
1676
expected_output = "\n".join("".join(rows[line]
1677
for rows in columns)
1678
for line in range(num_lines))
1679
self.assertEqual(output, expected_output)
1686
1681
def test_PrintTable_one_client(self):
1687
with self.capture_stdout_to_buffer() as buffer:
1688
command.PrintTable().run(self.one_client)
1682
output = command.PrintTable().output(self.one_client.values())
1689
1683
expected_output = "\n".join((
1690
1684
"Name Enabled Timeout Last Successful Check",
1691
1685
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1693
self.assertEqual(expected_output, buffer.getvalue())
1687
self.assertEqual(output, expected_output)
1696
1690
class TestPropertyCmd(TestCommand):
1711
1705
client = self.bus.get_object(dbus_busname, clientpath)
1712
1706
value = client.attributes[self.propname]
1713
1707
self.assertNotIsInstance(value, self.Unique)
1714
self.assertEqual(value_to_get, value)
1708
self.assertEqual(value, value_to_get)
1716
1710
class Unique(object):
1717
1711
"""Class for objects which exist only to be unique objects,