613
615
"Checker", "ExtendedTimeout", "Expires",
614
616
"LastCheckerStatus")
616
def run(self, clients, bus=None, mandos=None):
617
print(self.output(clients.values()))
619
def output(self, clients):
620
raise NotImplementedError()
623
619
class DumpJSON(Output):
624
def output(self, clients):
620
def run(self, clients, bus=None, mandos=None):
625
621
data = {client["Name"]:
626
622
{key: self.dbus_boolean_to_bool(client[key])
627
623
for key in self.all_keywords}
628
for client in clients}
629
return json.dumps(data, indent=4, separators=(',', ': '))
624
for client in clients.values()}
625
print(json.dumps(data, indent=4, separators=(',', ': ')))
632
628
def dbus_boolean_to_bool(value):
888
884
class Test_string_to_delta(TestCaseWithAssertLogs):
889
def test_handles_basic_rfc3339(self):
890
self.assertEqual(string_to_delta("PT0S"),
891
datetime.timedelta())
892
self.assertEqual(string_to_delta("P0D"),
893
datetime.timedelta())
894
self.assertEqual(string_to_delta("PT1S"),
895
datetime.timedelta(0, 1))
896
self.assertEqual(string_to_delta("PT2H"),
897
datetime.timedelta(0, 7200))
885
# Just test basic RFC 3339 functionality here, the doc string for
886
# rfc3339_duration_to_delta() already has more comprehensive
887
# tests, which is run by doctest.
889
def test_rfc3339_zero_seconds(self):
890
self.assertEqual(datetime.timedelta(),
891
string_to_delta("PT0S"))
893
def test_rfc3339_zero_days(self):
894
self.assertEqual(datetime.timedelta(), string_to_delta("P0D"))
896
def test_rfc3339_one_second(self):
897
self.assertEqual(datetime.timedelta(0, 1),
898
string_to_delta("PT1S"))
900
def test_rfc3339_two_hours(self):
901
self.assertEqual(datetime.timedelta(0, 7200),
902
string_to_delta("PT2H"))
899
904
def test_falls_back_to_pre_1_6_1_with_warning(self):
900
905
with self.assertLogs(log, logging.WARNING):
901
906
value = string_to_delta("2h")
902
self.assertEqual(value, datetime.timedelta(0, 7200))
907
self.assertEqual(datetime.timedelta(0, 7200), value)
905
910
class Test_check_option_syntax(unittest.TestCase):
948
953
# Exit code from argparse is guaranteed to be "2". Reference:
949
954
# https://docs.python.org/3/library
950
955
# /argparse.html#exiting-methods
951
self.assertEqual(e.exception.code, 2)
956
self.assertEqual(2, e.exception.code)
954
959
@contextlib.contextmanager
955
960
def redirect_stderr_to_devnull():
956
null = os.open(os.path.devnull, os.O_RDWR)
957
stderrcopy = os.dup(sys.stderr.fileno())
958
os.dup2(null, sys.stderr.fileno())
964
os.dup2(stderrcopy, sys.stderr.fileno())
961
old_stderr = sys.stderr
962
with contextlib.closing(open(os.devnull, "w")) as null:
967
sys.stderr = old_stderr
967
969
def check_option_syntax(self, options):
968
970
check_option_syntax(self.parser, options)
1089
1091
self.assertTrue(mockbus.called)
1091
1093
def test_logs_and_exits_on_dbus_error(self):
1092
class MockBusFailing(object):
1094
class FailingBusStub(object):
1093
1095
def get_object(self, busname, dbus_path):
1094
1096
raise dbus.exceptions.DBusException("Test")
1096
1098
with self.assertLogs(log, logging.CRITICAL):
1097
1099
with self.assertRaises(SystemExit) as e:
1098
bus = get_mandos_dbus_object(bus=MockBusFailing())
1100
bus = get_mandos_dbus_object(bus=FailingBusStub())
1100
1102
if isinstance(e.exception.code, int):
1101
self.assertNotEqual(e.exception.code, 0)
1103
self.assertNotEqual(0, e.exception.code)
1103
1105
self.assertIsNotNone(e.exception.code)
1106
1108
class Test_get_managed_objects(TestCaseWithAssertLogs):
1107
1109
def test_calls_and_returns_GetManagedObjects(self):
1108
managed_objects = {"/clients/foo": { "Name": "foo"}}
1109
class MockObjectManager(object):
1110
managed_objects = {"/clients/client": { "Name": "client"}}
1111
class ObjectManagerStub(object):
1110
1112
def GetManagedObjects(self):
1111
1113
return managed_objects
1112
retval = get_managed_objects(MockObjectManager())
1114
retval = get_managed_objects(ObjectManagerStub())
1113
1115
self.assertDictEqual(managed_objects, retval)
1115
1117
def test_logs_and_exits_on_dbus_error(self):
1116
1118
dbus_logger = logging.getLogger("dbus.proxies")
1118
class MockObjectManagerFailing(object):
1120
class ObjectManagerFailingStub(object):
1119
1121
def GetManagedObjects(self):
1120
1122
dbus_logger.error("Test")
1121
1123
raise dbus.exceptions.DBusException("Test")
1133
1135
with self.assertLogs(log, logging.CRITICAL) as watcher:
1134
1136
with self.assertRaises(SystemExit) as e:
1135
get_managed_objects(MockObjectManagerFailing())
1137
get_managed_objects(ObjectManagerFailingStub())
1137
1139
dbus_logger.removeFilter(counting_handler)
1139
1141
# Make sure the dbus logger was suppressed
1140
self.assertEqual(counting_handler.count, 0)
1142
self.assertEqual(0, counting_handler.count)
1142
1144
# Test that the dbus_logger still works
1143
1145
with self.assertLogs(dbus_logger, logging.ERROR):
1144
1146
dbus_logger.error("Test")
1146
1148
if isinstance(e.exception.code, int):
1147
self.assertNotEqual(e.exception.code, 0)
1149
self.assertNotEqual(0, e.exception.code)
1149
1151
self.assertIsNotNone(e.exception.code)
1165
1167
options = self.parser.parse_args(args)
1166
1168
check_option_syntax(self.parser, options)
1167
1169
commands = commands_from_options(options)
1168
self.assertEqual(len(commands), 1)
1170
self.assertEqual(1, len(commands))
1169
1171
command = commands[0]
1170
1172
self.assertIsInstance(command, command_cls)
1171
1173
for key, value in cmd_attrs.items():
1172
self.assertEqual(getattr(command, key), value)
1174
self.assertEqual(value, getattr(command, key))
1174
1176
def test_is_enabled_short(self):
1175
self.assert_command_from_args(["-V", "foo"],
1177
self.assert_command_from_args(["-V", "client"],
1176
1178
command.IsEnabled)
1178
1180
def test_approve(self):
1179
self.assert_command_from_args(["--approve", "foo"],
1181
self.assert_command_from_args(["--approve", "client"],
1180
1182
command.Approve)
1182
1184
def test_approve_short(self):
1183
self.assert_command_from_args(["-A", "foo"], command.Approve)
1185
self.assert_command_from_args(["-A", "client"],
1185
1188
def test_deny(self):
1186
self.assert_command_from_args(["--deny", "foo"], command.Deny)
1189
self.assert_command_from_args(["--deny", "client"],
1188
1192
def test_deny_short(self):
1189
self.assert_command_from_args(["-D", "foo"], command.Deny)
1193
self.assert_command_from_args(["-D", "client"], command.Deny)
1191
1195
def test_remove(self):
1192
self.assert_command_from_args(["--remove", "foo"],
1196
self.assert_command_from_args(["--remove", "client"],
1193
1197
command.Remove)
1195
1199
def test_deny_before_remove(self):
1196
1200
options = self.parser.parse_args(["--deny", "--remove",
1198
1202
check_option_syntax(self.parser, options)
1199
1203
commands = commands_from_options(options)
1200
self.assertEqual(len(commands), 2)
1204
self.assertEqual(2, len(commands))
1201
1205
self.assertIsInstance(commands[0], command.Deny)
1202
1206
self.assertIsInstance(commands[1], command.Remove)
1207
1211
check_option_syntax(self.parser, options)
1208
1212
commands = commands_from_options(options)
1209
self.assertEqual(len(commands), 2)
1213
self.assertEqual(2, len(commands))
1210
1214
self.assertIsInstance(commands[0], command.Deny)
1211
1215
self.assertIsInstance(commands[1], command.Remove)
1213
1217
def test_remove_short(self):
1214
self.assert_command_from_args(["-r", "foo"], command.Remove)
1218
self.assert_command_from_args(["-r", "client"],
1216
1221
def test_dump_json(self):
1217
1222
self.assert_command_from_args(["--dump-json"],
1218
1223
command.DumpJSON)
1220
1225
def test_enable(self):
1221
self.assert_command_from_args(["--enable", "foo"],
1226
self.assert_command_from_args(["--enable", "client"],
1222
1227
command.Enable)
1224
1229
def test_enable_short(self):
1225
self.assert_command_from_args(["-e", "foo"], command.Enable)
1230
self.assert_command_from_args(["-e", "client"],
1227
1233
def test_disable(self):
1228
self.assert_command_from_args(["--disable", "foo"],
1234
self.assert_command_from_args(["--disable", "client"],
1229
1235
command.Disable)
1231
1237
def test_disable_short(self):
1232
self.assert_command_from_args(["-d", "foo"], command.Disable)
1238
self.assert_command_from_args(["-d", "client"],
1234
1241
def test_bump_timeout(self):
1235
self.assert_command_from_args(["--bump-timeout", "foo"],
1242
self.assert_command_from_args(["--bump-timeout", "client"],
1236
1243
command.BumpTimeout)
1238
1245
def test_bump_timeout_short(self):
1239
self.assert_command_from_args(["-b", "foo"],
1246
self.assert_command_from_args(["-b", "client"],
1240
1247
command.BumpTimeout)
1242
1249
def test_start_checker(self):
1243
self.assert_command_from_args(["--start-checker", "foo"],
1250
self.assert_command_from_args(["--start-checker", "client"],
1244
1251
command.StartChecker)
1246
1253
def test_stop_checker(self):
1247
self.assert_command_from_args(["--stop-checker", "foo"],
1254
self.assert_command_from_args(["--stop-checker", "client"],
1248
1255
command.StopChecker)
1250
1257
def test_approve_by_default(self):
1251
self.assert_command_from_args(["--approve-by-default", "foo"],
1258
self.assert_command_from_args(["--approve-by-default",
1252
1260
command.ApproveByDefault)
1254
1262
def test_deny_by_default(self):
1255
self.assert_command_from_args(["--deny-by-default", "foo"],
1263
self.assert_command_from_args(["--deny-by-default", "client"],
1256
1264
command.DenyByDefault)
1258
1266
def test_checker(self):
1259
self.assert_command_from_args(["--checker", ":", "foo"],
1267
self.assert_command_from_args(["--checker", ":", "client"],
1260
1268
command.SetChecker,
1261
1269
value_to_set=":")
1263
1271
def test_checker_empty(self):
1264
self.assert_command_from_args(["--checker", "", "foo"],
1272
self.assert_command_from_args(["--checker", "", "client"],
1265
1273
command.SetChecker,
1266
1274
value_to_set="")
1268
1276
def test_checker_short(self):
1269
self.assert_command_from_args(["-c", ":", "foo"],
1277
self.assert_command_from_args(["-c", ":", "client"],
1270
1278
command.SetChecker,
1271
1279
value_to_set=":")
1273
1281
def test_host(self):
1274
self.assert_command_from_args(["--host", "foo.example.org",
1275
"foo"], command.SetHost,
1276
value_to_set="foo.example.org")
1282
self.assert_command_from_args(
1283
["--host", "client.example.org", "client"],
1284
command.SetHost, value_to_set="client.example.org")
1278
1286
def test_host_short(self):
1279
self.assert_command_from_args(["-H", "foo.example.org",
1280
"foo"], command.SetHost,
1281
value_to_set="foo.example.org")
1287
self.assert_command_from_args(
1288
["-H", "client.example.org", "client"], command.SetHost,
1289
value_to_set="client.example.org")
1283
1291
def test_secret_devnull(self):
1284
1292
self.assert_command_from_args(["--secret", os.path.devnull,
1285
"foo"], command.SetSecret,
1293
"client"], command.SetSecret,
1286
1294
value_to_set=b"")
1288
1296
def test_secret_tempfile(self):
1304
1313
value = b"secret\0xyzzy\nbar"
1307
self.assert_command_from_args(["-s", f.name, "foo"],
1316
self.assert_command_from_args(["-s", f.name, "client"],
1308
1317
command.SetSecret,
1309
1318
value_to_set=value)
1311
1320
def test_timeout(self):
1312
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1321
self.assert_command_from_args(["--timeout", "PT5M", "client"],
1313
1322
command.SetTimeout,
1314
1323
value_to_set=300000)
1316
1325
def test_timeout_short(self):
1317
self.assert_command_from_args(["-t", "PT5M", "foo"],
1326
self.assert_command_from_args(["-t", "PT5M", "client"],
1318
1327
command.SetTimeout,
1319
1328
value_to_set=300000)
1321
1330
def test_extended_timeout(self):
1322
1331
self.assert_command_from_args(["--extended-timeout", "PT15M",
1324
1333
command.SetExtendedTimeout,
1325
1334
value_to_set=900000)
1327
1336
def test_interval(self):
1328
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1329
command.SetInterval,
1337
self.assert_command_from_args(["--interval", "PT2M",
1338
"client"], command.SetInterval,
1330
1339
value_to_set=120000)
1332
1341
def test_interval_short(self):
1333
self.assert_command_from_args(["-i", "PT2M", "foo"],
1342
self.assert_command_from_args(["-i", "PT2M", "client"],
1334
1343
command.SetInterval,
1335
1344
value_to_set=120000)
1337
1346
def test_approval_delay(self):
1338
1347
self.assert_command_from_args(["--approval-delay", "PT30S",
1340
1349
command.SetApprovalDelay,
1341
1350
value_to_set=30000)
1343
1352
def test_approval_duration(self):
1344
1353
self.assert_command_from_args(["--approval-duration", "PT1S",
1346
1355
command.SetApprovalDuration,
1347
1356
value_to_set=1000)
1372
1381
self.attributes["Name"] = name
1373
1382
self.calls = []
1374
1383
def Set(self, interface, propname, value, dbus_interface):
1375
testcase.assertEqual(interface, client_dbus_interface)
1376
testcase.assertEqual(dbus_interface,
1377
dbus.PROPERTIES_IFACE)
1384
testcase.assertEqual(client_dbus_interface, interface)
1385
testcase.assertEqual(dbus.PROPERTIES_IFACE,
1378
1387
self.attributes[propname] = value
1379
def Get(self, interface, propname, dbus_interface):
1380
testcase.assertEqual(interface, client_dbus_interface)
1381
testcase.assertEqual(dbus_interface,
1382
dbus.PROPERTIES_IFACE)
1383
return self.attributes[propname]
1384
1388
def Approve(self, approve, dbus_interface):
1385
testcase.assertEqual(dbus_interface,
1386
client_dbus_interface)
1389
testcase.assertEqual(client_dbus_interface,
1387
1391
self.calls.append(("Approve", (approve,
1388
1392
dbus_interface)))
1389
1393
self.client = MockClient(
1436
1440
LastCheckerStatus=-2)
1437
1441
self.clients = collections.OrderedDict(
1439
("/clients/foo", self.client.attributes),
1440
("/clients/barbar", self.other_client.attributes),
1443
(self.client.__dbus_object_path__,
1444
self.client.attributes),
1445
(self.other_client.__dbus_object_path__,
1446
self.other_client.attributes),
1442
self.one_client = {"/clients/foo": self.client.attributes}
1448
self.one_client = {self.client.__dbus_object_path__:
1449
self.client.attributes}
1453
class MockBus(object):
1448
1455
def get_object(client_bus_name, path):
1449
self.assertEqual(client_bus_name, dbus_busname)
1451
# Note: "self" here is the TestCmd instance, not
1452
# the Bus instance, since this is a static method!
1453
"/clients/foo": self.client,
1454
"/clients/barbar": self.other_client,
1456
self.assertEqual(dbus_busname, client_bus_name)
1457
# Note: "self" here is the TestCmd instance, not the
1458
# MockBus instance, since this is a static method!
1459
if path == self.client.__dbus_object_path__:
1461
elif path == self.other_client.__dbus_object_path__:
1462
return self.other_client
1459
1466
class TestBaseCommands(TestCommand):
1461
def test_is_enabled(self):
1462
self.assertTrue(all(command.IsEnabled().is_enabled(client,
1464
for client, properties
1465
in self.clients.items()))
1467
def test_is_enabled_run_exits_successfully(self):
1468
def test_IsEnabled_exits_successfully(self):
1468
1469
with self.assertRaises(SystemExit) as e:
1469
1470
command.IsEnabled().run(self.one_client)
1470
1471
if e.exception.code is not None:
1471
self.assertEqual(e.exception.code, 0)
1472
self.assertEqual(0, e.exception.code)
1473
1474
self.assertIsNone(e.exception.code)
1475
def test_is_enabled_run_exits_with_failure(self):
1476
def test_IsEnabled_exits_with_failure(self):
1476
1477
self.client.attributes["Enabled"] = dbus.Boolean(False)
1477
1478
with self.assertRaises(SystemExit) as e:
1478
1479
command.IsEnabled().run(self.one_client)
1479
1480
if isinstance(e.exception.code, int):
1480
self.assertNotEqual(e.exception.code, 0)
1481
self.assertNotEqual(0, e.exception.code)
1482
1483
self.assertIsNotNone(e.exception.code)
1484
def test_approve(self):
1485
def test_Approve(self):
1485
1486
command.Approve().run(self.clients, self.bus)
1486
1487
for clientpath in self.clients:
1487
1488
client = self.bus.get_object(dbus_busname, clientpath)
1488
1489
self.assertIn(("Approve", (True, client_dbus_interface)),
1491
def test_deny(self):
1492
def test_Deny(self):
1492
1493
command.Deny().run(self.clients, self.bus)
1493
1494
for clientpath in self.clients:
1494
1495
client = self.bus.get_object(dbus_busname, clientpath)
1495
1496
self.assertIn(("Approve", (False, client_dbus_interface)),
1498
def test_remove(self):
1499
class MockMandos(object):
1499
def test_Remove(self):
1500
class MandosSpy(object):
1500
1501
def __init__(self):
1501
1502
self.calls = []
1502
1503
def RemoveClient(self, dbus_path):
1503
1504
self.calls.append(("RemoveClient", (dbus_path,)))
1504
mandos = MockMandos()
1505
mandos = MandosSpy()
1505
1506
command.Remove().run(self.clients, self.bus, mandos)
1506
self.assertEqual(len(mandos.calls), 2)
1507
1507
for clientpath in self.clients:
1508
1508
self.assertIn(("RemoveClient", (clientpath,)),
1562
1562
def test_DumpJSON_normal(self):
1563
output = command.DumpJSON().output(self.clients.values())
1564
json_data = json.loads(output)
1565
self.assertDictEqual(json_data, self.expected_json)
1563
with self.capture_stdout_to_buffer() as buffer:
1564
command.DumpJSON().run(self.clients)
1565
json_data = json.loads(buffer.getvalue())
1566
self.assertDictEqual(self.expected_json, json_data)
1569
@contextlib.contextmanager
1570
def capture_stdout_to_buffer():
1571
capture_buffer = io.StringIO()
1572
old_stdout = sys.stdout
1573
sys.stdout = capture_buffer
1575
yield capture_buffer
1577
sys.stdout = old_stdout
1567
1579
def test_DumpJSON_one_client(self):
1568
output = command.DumpJSON().output(self.one_client.values())
1569
json_data = json.loads(output)
1580
with self.capture_stdout_to_buffer() as buffer:
1581
command.DumpJSON().run(self.one_client)
1582
json_data = json.loads(buffer.getvalue())
1570
1583
expected_json = {"foo": self.expected_json["foo"]}
1571
self.assertDictEqual(json_data, expected_json)
1584
self.assertDictEqual(expected_json, json_data)
1573
1586
def test_PrintTable_normal(self):
1574
output = command.PrintTable().output(self.clients.values())
1587
with self.capture_stdout_to_buffer() as buffer:
1588
command.PrintTable().run(self.clients)
1575
1589
expected_output = "\n".join((
1576
1590
"Name Enabled Timeout Last Successful Check",
1577
1591
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1578
1592
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1580
self.assertEqual(output, expected_output)
1594
self.assertEqual(expected_output, buffer.getvalue())
1582
1596
def test_PrintTable_verbose(self):
1583
output = command.PrintTable(verbose=True).output(
1584
self.clients.values())
1597
with self.capture_stdout_to_buffer() as buffer:
1598
command.PrintTable(verbose=True).run(self.clients)
1671
1685
num_lines = max(len(rows) for rows in columns)
1672
expected_output = "\n".join("".join(rows[line]
1673
for rows in columns)
1674
for line in range(num_lines))
1675
self.assertEqual(output, expected_output)
1686
expected_output = ("\n".join("".join(rows[line]
1687
for rows in columns)
1688
for line in range(num_lines))
1690
self.assertEqual(expected_output, buffer.getvalue())
1677
1692
def test_PrintTable_one_client(self):
1678
output = command.PrintTable().output(self.one_client.values())
1693
with self.capture_stdout_to_buffer() as buffer:
1694
command.PrintTable().run(self.one_client)
1679
1695
expected_output = "\n".join((
1680
1696
"Name Enabled Timeout Last Successful Check",
1681
1697
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1683
self.assertEqual(output, expected_output)
1699
self.assertEqual(expected_output, buffer.getvalue())
1686
1702
class TestPropertyCmd(TestCommand):