438
421
class IsEnabledCmd(Command):
439
def run(self, clients, bus=None, mandos=None):
440
client, properties = next(iter(clients.items()))
422
def run_on_one_client(self, client, properties):
441
423
if self.is_enabled(client, properties):
444
426
def is_enabled(self, client, properties):
445
return properties["Enabled"]
427
return bool(properties["Enabled"])
447
429
class RemoveCmd(Command):
448
430
def run_on_one_client(self, client, properties):
449
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
450
server_path, server_interface,
451
str(client.__dbus_object_path__))
452
431
self.mandos.RemoveClient(client.__dbus_object_path__)
454
433
class ApproveCmd(Command):
455
434
def run_on_one_client(self, client, properties):
456
log.debug("D-Bus: %s:%s:%s.Approve(True)", busname,
457
client.__dbus_object_path__, client_interface)
458
435
client.Approve(dbus.Boolean(True),
459
436
dbus_interface=client_interface)
461
438
class DenyCmd(Command):
462
439
def run_on_one_client(self, client, properties):
463
log.debug("D-Bus: %s:%s:%s.Approve(False)", busname,
464
client.__dbus_object_path__, client_interface)
465
440
client.Approve(dbus.Boolean(False),
466
441
dbus_interface=client_interface)
468
443
class EnableCmd(PropertyCmd):
470
445
value_to_set = dbus.Boolean(True)
472
447
class DisableCmd(PropertyCmd):
474
449
value_to_set = dbus.Boolean(False)
476
451
class BumpTimeoutCmd(PropertyCmd):
477
propname = "LastCheckedOK"
452
property = "LastCheckedOK"
478
453
value_to_set = ""
480
455
class StartCheckerCmd(PropertyCmd):
481
propname = "CheckerRunning"
456
property = "CheckerRunning"
482
457
value_to_set = dbus.Boolean(True)
484
459
class StopCheckerCmd(PropertyCmd):
485
propname = "CheckerRunning"
460
property = "CheckerRunning"
486
461
value_to_set = dbus.Boolean(False)
488
463
class ApproveByDefaultCmd(PropertyCmd):
489
propname = "ApprovedByDefault"
464
property = "ApprovedByDefault"
490
465
value_to_set = dbus.Boolean(True)
492
467
class DenyByDefaultCmd(PropertyCmd):
493
propname = "ApprovedByDefault"
468
property = "ApprovedByDefault"
494
469
value_to_set = dbus.Boolean(False)
496
471
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
499
474
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
502
477
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
505
def value_to_set(self):
508
def value_to_set(self, value):
509
"""When setting, read data from supplied file object"""
510
self._vts = value.read()
513
480
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
516
483
class SetExtendedTimeoutCmd(PropertyCmd,
517
484
MillisecondsValueArgumentMixIn):
518
propname = "ExtendedTimeout"
485
property = "ExtendedTimeout"
520
487
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
521
propname = "Interval"
488
property = "Interval"
523
490
class SetApprovalDelayCmd(PropertyCmd,
524
491
MillisecondsValueArgumentMixIn):
525
propname = "ApprovalDelay"
492
property = "ApprovalDelay"
527
494
class SetApprovalDurationCmd(PropertyCmd,
528
495
MillisecondsValueArgumentMixIn):
529
propname = "ApprovalDuration"
496
property = "ApprovalDuration"
498
def has_actions(options):
499
return any((options.enable,
501
options.bump_timeout,
502
options.start_checker,
503
options.stop_checker,
506
options.checker is not None,
507
options.timeout is not None,
508
options.extended_timeout is not None,
509
options.interval is not None,
510
options.approved_by_default is not None,
511
options.approval_delay is not None,
512
options.approval_duration is not None,
513
options.host is not None,
514
options.secret is not None,
531
518
def add_command_line_options(parser):
532
519
parser.add_argument("--version", action="version",
1037
969
self.calls.append(("RemoveClient", (dbus_path,)))
1038
970
mandos = MockMandos()
1039
971
super(TestRemoveCmd, self).setUp()
1040
RemoveCmd().run(self.clients, self.bus, mandos)
972
RemoveCmd().run(mandos, self.clients)
1041
973
self.assertEqual(len(mandos.calls), 2)
1042
for clientpath in self.clients:
1043
self.assertIn(("RemoveClient", (clientpath,)),
974
for client in self.clients:
975
self.assertIn(("RemoveClient",
976
(client.__dbus_object_path__,)),
1046
979
class TestApproveCmd(TestCmd):
1047
980
def test_approve(self):
1048
ApproveCmd().run(self.clients, self.bus)
1049
for clientpath in self.clients:
1050
client = self.bus.get_object(busname, clientpath)
981
ApproveCmd().run(None, self.clients)
982
for client in self.clients:
1051
983
self.assertIn(("Approve", (True, client_interface)),
1054
986
class TestDenyCmd(TestCmd):
1055
987
def test_deny(self):
1056
DenyCmd().run(self.clients, self.bus)
1057
for clientpath in self.clients:
1058
client = self.bus.get_object(busname, clientpath)
988
DenyCmd().run(None, self.clients)
989
for client in self.clients:
1059
990
self.assertIn(("Approve", (False, client_interface)),
1062
993
class TestEnableCmd(TestCmd):
1063
994
def test_enable(self):
1064
for clientpath in self.clients:
1065
client = self.bus.get_object(busname, clientpath)
995
for client in self.clients:
1066
996
client.attributes["Enabled"] = False
1068
EnableCmd().run(self.clients, self.bus)
998
EnableCmd().run(None, self.clients)
1070
for clientpath in self.clients:
1071
client = self.bus.get_object(busname, clientpath)
1000
for client in self.clients:
1072
1001
self.assertTrue(client.attributes["Enabled"])
1074
1003
class TestDisableCmd(TestCmd):
1075
1004
def test_disable(self):
1076
DisableCmd().run(self.clients, self.bus)
1077
for clientpath in self.clients:
1078
client = self.bus.get_object(busname, clientpath)
1005
DisableCmd().run(None, self.clients)
1007
for client in self.clients:
1079
1008
self.assertFalse(client.attributes["Enabled"])
1081
class Unique(object):
1082
"""Class for objects which exist only to be unique objects, since
1083
unittest.mock.sentinel only exists in Python 3.3"""
1085
class TestPropertyCmd(TestCmd):
1086
"""Abstract class for tests of PropertyCmd classes"""
1088
if not hasattr(self, "command"):
1090
values_to_get = getattr(self, "values_to_get",
1092
for value_to_set, value_to_get in zip(self.values_to_set,
1094
for clientpath in self.clients:
1095
client = self.bus.get_object(busname, clientpath)
1096
old_value = client.attributes[self.propname]
1097
self.assertNotIsInstance(old_value, Unique)
1098
client.attributes[self.propname] = Unique()
1099
self.run_command(value_to_set, self.clients)
1100
for clientpath in self.clients:
1101
client = self.bus.get_object(busname, clientpath)
1102
value = client.attributes[self.propname]
1103
self.assertNotIsInstance(value, Unique)
1104
self.assertEqual(value, value_to_get)
1105
def run_command(self, value, clients):
1106
self.command().run(clients, self.bus)
1108
class TestBumpTimeoutCmd(TestPropertyCmd):
1109
command = BumpTimeoutCmd
1110
propname = "LastCheckedOK"
1111
values_to_set = [""]
1113
class TestStartCheckerCmd(TestPropertyCmd):
1114
command = StartCheckerCmd
1115
propname = "CheckerRunning"
1116
values_to_set = [dbus.Boolean(True)]
1118
class TestStopCheckerCmd(TestPropertyCmd):
1119
command = StopCheckerCmd
1120
propname = "CheckerRunning"
1121
values_to_set = [dbus.Boolean(False)]
1123
class TestApproveByDefaultCmd(TestPropertyCmd):
1124
command = ApproveByDefaultCmd
1125
propname = "ApprovedByDefault"
1126
values_to_set = [dbus.Boolean(True)]
1128
class TestDenyByDefaultCmd(TestPropertyCmd):
1129
command = DenyByDefaultCmd
1130
propname = "ApprovedByDefault"
1131
values_to_set = [dbus.Boolean(False)]
1133
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1134
"""Abstract class for tests of PropertyCmd classes using the
1135
ValueArgumentMixIn"""
1137
if type(self) is TestValueArgumentPropertyCmd:
1139
return super(TestValueArgumentPropertyCmd, self).runTest()
1140
def run_command(self, value, clients):
1141
self.command(value).run(clients, self.bus)
1143
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1144
command = SetCheckerCmd
1145
propname = "Checker"
1146
values_to_set = ["", ":", "fping -q -- %s"]
1148
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1149
command = SetHostCmd
1151
values_to_set = ["192.0.2.3", "foo.example.org"]
1153
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1154
command = SetSecretCmd
1156
values_to_set = [io.BytesIO(b""),
1157
io.BytesIO(b"secret\0xyzzy\nbar")]
1158
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1160
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1161
command = SetTimeoutCmd
1162
propname = "Timeout"
1163
values_to_set = [datetime.timedelta(),
1164
datetime.timedelta(minutes=5),
1165
datetime.timedelta(seconds=1),
1166
datetime.timedelta(weeks=1),
1167
datetime.timedelta(weeks=52)]
1168
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1170
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1171
command = SetExtendedTimeoutCmd
1172
propname = "ExtendedTimeout"
1173
values_to_set = [datetime.timedelta(),
1174
datetime.timedelta(minutes=5),
1175
datetime.timedelta(seconds=1),
1176
datetime.timedelta(weeks=1),
1177
datetime.timedelta(weeks=52)]
1178
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1180
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1181
command = SetIntervalCmd
1182
propname = "Interval"
1183
values_to_set = [datetime.timedelta(),
1184
datetime.timedelta(minutes=5),
1185
datetime.timedelta(seconds=1),
1186
datetime.timedelta(weeks=1),
1187
datetime.timedelta(weeks=52)]
1188
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1190
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1191
command = SetApprovalDelayCmd
1192
propname = "ApprovalDelay"
1193
values_to_set = [datetime.timedelta(),
1194
datetime.timedelta(minutes=5),
1195
datetime.timedelta(seconds=1),
1196
datetime.timedelta(weeks=1),
1197
datetime.timedelta(weeks=52)]
1198
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1200
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1201
command = SetApprovalDurationCmd
1202
propname = "ApprovalDuration"
1203
values_to_set = [datetime.timedelta(),
1204
datetime.timedelta(minutes=5),
1205
datetime.timedelta(seconds=1),
1206
datetime.timedelta(weeks=1),
1207
datetime.timedelta(weeks=52)]
1208
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1210
class Test_command_from_options(unittest.TestCase):
1212
self.parser = argparse.ArgumentParser()
1213
add_command_line_options(self.parser)
1214
def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1215
"""Assert that parsing ARGS should result in an instance of
1216
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1217
options = self.parser.parse_args(args)
1218
check_option_syntax(self.parser, options)
1219
commands = commands_from_options(options)
1220
self.assertEqual(len(commands), 1)
1221
command = commands[0]
1222
self.assertIsInstance(command, command_cls)
1223
for key, value in cmd_attrs.items():
1224
self.assertEqual(getattr(command, key), value)
1225
def test_print_table(self):
1226
self.assert_command_from_args([], PrintTableCmd,
1229
def test_print_table_verbose(self):
1230
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1233
def test_print_table_verbose_short(self):
1234
self.assert_command_from_args(["-v"], PrintTableCmd,
1237
def test_enable(self):
1238
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1240
def test_enable_short(self):
1241
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1243
def test_disable(self):
1244
self.assert_command_from_args(["--disable", "foo"],
1247
def test_disable_short(self):
1248
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1250
def test_bump_timeout(self):
1251
self.assert_command_from_args(["--bump-timeout", "foo"],
1254
def test_bump_timeout_short(self):
1255
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1257
def test_start_checker(self):
1258
self.assert_command_from_args(["--start-checker", "foo"],
1261
def test_stop_checker(self):
1262
self.assert_command_from_args(["--stop-checker", "foo"],
1265
def test_remove(self):
1266
self.assert_command_from_args(["--remove", "foo"],
1269
def test_remove_short(self):
1270
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1272
def test_checker(self):
1273
self.assert_command_from_args(["--checker", ":", "foo"],
1274
SetCheckerCmd, value_to_set=":")
1276
def test_checker_empty(self):
1277
self.assert_command_from_args(["--checker", "", "foo"],
1278
SetCheckerCmd, value_to_set="")
1280
def test_checker_short(self):
1281
self.assert_command_from_args(["-c", ":", "foo"],
1282
SetCheckerCmd, value_to_set=":")
1284
def test_timeout(self):
1285
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1287
value_to_set=300000)
1289
def test_timeout_short(self):
1290
self.assert_command_from_args(["-t", "PT5M", "foo"],
1292
value_to_set=300000)
1294
def test_extended_timeout(self):
1295
self.assert_command_from_args(["--extended-timeout", "PT15M",
1297
SetExtendedTimeoutCmd,
1298
value_to_set=900000)
1300
def test_interval(self):
1301
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1303
value_to_set=120000)
1305
def test_interval_short(self):
1306
self.assert_command_from_args(["-i", "PT2M", "foo"],
1308
value_to_set=120000)
1310
def test_approve_by_default(self):
1311
self.assert_command_from_args(["--approve-by-default", "foo"],
1312
ApproveByDefaultCmd)
1314
def test_deny_by_default(self):
1315
self.assert_command_from_args(["--deny-by-default", "foo"],
1318
def test_approval_delay(self):
1319
self.assert_command_from_args(["--approval-delay", "PT30S",
1320
"foo"], SetApprovalDelayCmd,
1323
def test_approval_duration(self):
1324
self.assert_command_from_args(["--approval-duration", "PT1S",
1325
"foo"], SetApprovalDurationCmd,
1328
def test_host(self):
1329
self.assert_command_from_args(["--host", "foo.example.org",
1331
value_to_set="foo.example.org")
1333
def test_host_short(self):
1334
self.assert_command_from_args(["-H", "foo.example.org",
1336
value_to_set="foo.example.org")
1338
def test_secret_devnull(self):
1339
self.assert_command_from_args(["--secret", os.path.devnull,
1340
"foo"], SetSecretCmd,
1343
def test_secret_tempfile(self):
1344
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1345
value = b"secret\0xyzzy\nbar"
1348
self.assert_command_from_args(["--secret", f.name,
1349
"foo"], SetSecretCmd,
1352
def test_secret_devnull_short(self):
1353
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1354
SetSecretCmd, value_to_set=b"")
1356
def test_secret_tempfile_short(self):
1357
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1358
value = b"secret\0xyzzy\nbar"
1361
self.assert_command_from_args(["-s", f.name, "foo"],
1365
def test_approve(self):
1366
self.assert_command_from_args(["--approve", "foo"],
1369
def test_approve_short(self):
1370
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1372
def test_deny(self):
1373
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1375
def test_deny_short(self):
1376
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1378
def test_dump_json(self):
1379
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1381
def test_is_enabled(self):
1382
self.assert_command_from_args(["--is-enabled", "foo"],
1385
def test_is_enabled_short(self):
1386
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1388
def test_deny_before_remove(self):
1389
options = self.parser.parse_args(["--deny", "--remove", "foo"])
1390
check_option_syntax(self.parser, options)
1391
commands = commands_from_options(options)
1392
self.assertEqual(len(commands), 2)
1393
self.assertIsInstance(commands[0], DenyCmd)
1394
self.assertIsInstance(commands[1], RemoveCmd)
1396
def test_deny_before_remove_reversed(self):
1397
options = self.parser.parse_args(["--remove", "--deny", "--all"])
1398
check_option_syntax(self.parser, options)
1399
commands = commands_from_options(options)
1400
self.assertEqual(len(commands), 2)
1401
self.assertIsInstance(commands[0], DenyCmd)
1402
self.assertIsInstance(commands[1], RemoveCmd)
1405
class Test_check_option_syntax(unittest.TestCase):
1406
# This mostly corresponds to the definition from has_actions() in
1407
# check_option_syntax()
1409
# The actual values set here are not that important, but we do
1410
# at least stick to the correct types, even though they are
1414
"bump_timeout": True,
1415
"start_checker": True,
1416
"stop_checker": True,
1420
"timeout": datetime.timedelta(),
1421
"extended_timeout": datetime.timedelta(),
1422
"interval": datetime.timedelta(),
1423
"approved_by_default": True,
1424
"approval_delay": datetime.timedelta(),
1425
"approval_duration": datetime.timedelta(),
1427
"secret": io.BytesIO(b"x"),
1433
self.parser = argparse.ArgumentParser()
1434
add_command_line_options(self.parser)
1436
@contextlib.contextmanager
1437
def assertParseError(self):
1438
with self.assertRaises(SystemExit) as e:
1439
with self.temporarily_suppress_stderr():
1441
# Exit code from argparse is guaranteed to be "2". Reference:
1442
# https://docs.python.org/3/library/argparse.html#exiting-methods
1443
self.assertEqual(e.exception.code, 2)
1446
@contextlib.contextmanager
1447
def temporarily_suppress_stderr():
1448
null = os.open(os.path.devnull, os.O_RDWR)
1449
stderrcopy = os.dup(sys.stderr.fileno())
1450
os.dup2(null, sys.stderr.fileno())
1456
os.dup2(stderrcopy, sys.stderr.fileno())
1457
os.close(stderrcopy)
1459
def check_option_syntax(self, options):
1460
check_option_syntax(self.parser, options)
1462
def test_actions_requires_client_or_all(self):
1463
for action, value in self.actions.items():
1464
options = self.parser.parse_args()
1465
setattr(options, action, value)
1466
with self.assertParseError():
1467
self.check_option_syntax(options)
1469
def test_actions_conflicts_with_verbose(self):
1470
for action, value in self.actions.items():
1471
options = self.parser.parse_args()
1472
setattr(options, action, value)
1473
options.verbose = True
1474
with self.assertParseError():
1475
self.check_option_syntax(options)
1477
def test_dump_json_conflicts_with_verbose(self):
1478
options = self.parser.parse_args()
1479
options.dump_json = True
1480
options.verbose = True
1481
with self.assertParseError():
1482
self.check_option_syntax(options)
1484
def test_dump_json_conflicts_with_action(self):
1485
for action, value in self.actions.items():
1486
options = self.parser.parse_args()
1487
setattr(options, action, value)
1488
options.dump_json = True
1489
with self.assertParseError():
1490
self.check_option_syntax(options)
1492
def test_all_can_not_be_alone(self):
1493
options = self.parser.parse_args()
1495
with self.assertParseError():
1496
self.check_option_syntax(options)
1498
def test_all_is_ok_with_any_action(self):
1499
for action, value in self.actions.items():
1500
options = self.parser.parse_args()
1501
setattr(options, action, value)
1503
self.check_option_syntax(options)
1505
def test_is_enabled_fails_without_client(self):
1506
options = self.parser.parse_args()
1507
options.is_enabled = True
1508
with self.assertParseError():
1509
self.check_option_syntax(options)
1511
def test_is_enabled_works_with_one_client(self):
1512
options = self.parser.parse_args()
1513
options.is_enabled = True
1514
options.client = ["foo"]
1515
self.check_option_syntax(options)
1517
def test_is_enabled_fails_with_two_clients(self):
1518
options = self.parser.parse_args()
1519
options.is_enabled = True
1520
options.client = ["foo", "barbar"]
1521
with self.assertParseError():
1522
self.check_option_syntax(options)
1524
def test_remove_can_only_be_combined_with_action_deny(self):
1525
for action, value in self.actions.items():
1526
if action in {"remove", "deny"}:
1528
options = self.parser.parse_args()
1529
setattr(options, action, value)
1531
options.remove = True
1532
with self.assertParseError():
1533
self.check_option_syntax(options)
1537
1012
def should_only_run_tests():