294
293
"ApprovalDuration", "Checker", "ExtendedTimeout",
295
294
"Expires", "LastCheckerStatus")
296
295
def run(self, mandos, clients):
297
print(self.output(clients.values()))
298
def output(self, clients):
299
raise NotImplementedError()
296
print(self.output(clients))
301
298
class PropertyCmd(Command):
302
299
"""Abstract class for Actions for setting one client property"""
303
300
def run_on_one_client(self, client, properties):
304
301
"""Set the Client's D-Bus property"""
305
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
306
client.__dbus_object_path__,
307
dbus.PROPERTIES_IFACE, client_interface,
308
self.propname, self.value_to_set
309
if not isinstance(self.value_to_set, dbus.Boolean)
310
else bool(self.value_to_set))
311
client.Set(client_interface, self.propname, self.value_to_set,
302
client.Set(client_interface, self.property, self.value_to_set,
312
303
dbus_interface=dbus.PROPERTIES_IFACE)
315
raise NotImplementedError()
317
305
class ValueArgumentMixIn(object):
318
306
"""Mixin class for commands taking a value as argument"""
440
428
def is_enabled(self, client, properties):
441
log.debug("D-Bus: %s:%s:%s.Get(%r, %r)", busname,
442
client.__dbus_object_path__,
443
dbus.PROPERTIES_IFACE, client_interface,
445
return bool(client.Get(client_interface, "Enabled",
446
dbus_interface=dbus.PROPERTIES_IFACE))
429
return bool(properties["Enabled"])
448
431
class RemoveCmd(Command):
449
432
def run_on_one_client(self, client, properties):
450
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
451
server_path, server_interface,
452
str(client.__dbus_object_path__))
453
433
self.mandos.RemoveClient(client.__dbus_object_path__)
455
435
class ApproveCmd(Command):
456
436
def run_on_one_client(self, client, properties):
457
log.debug("D-Bus: %s:%s.Approve(True)",
458
client.__dbus_object_path__, client_interface)
459
437
client.Approve(dbus.Boolean(True),
460
438
dbus_interface=client_interface)
462
440
class DenyCmd(Command):
463
441
def run_on_one_client(self, client, properties):
464
log.debug("D-Bus: %s:%s.Approve(False)",
465
client.__dbus_object_path__, client_interface)
466
442
client.Approve(dbus.Boolean(False),
467
443
dbus_interface=client_interface)
469
445
class EnableCmd(PropertyCmd):
471
447
value_to_set = dbus.Boolean(True)
473
449
class DisableCmd(PropertyCmd):
475
451
value_to_set = dbus.Boolean(False)
477
453
class BumpTimeoutCmd(PropertyCmd):
478
propname = "LastCheckedOK"
454
property = "LastCheckedOK"
479
455
value_to_set = ""
481
457
class StartCheckerCmd(PropertyCmd):
482
propname = "CheckerRunning"
458
property = "CheckerRunning"
483
459
value_to_set = dbus.Boolean(True)
485
461
class StopCheckerCmd(PropertyCmd):
486
propname = "CheckerRunning"
462
property = "CheckerRunning"
487
463
value_to_set = dbus.Boolean(False)
489
465
class ApproveByDefaultCmd(PropertyCmd):
490
propname = "ApprovedByDefault"
466
property = "ApprovedByDefault"
491
467
value_to_set = dbus.Boolean(True)
493
469
class DenyByDefaultCmd(PropertyCmd):
494
propname = "ApprovedByDefault"
470
property = "ApprovedByDefault"
495
471
value_to_set = dbus.Boolean(False)
497
473
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
500
476
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
503
479
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
506
481
def value_to_set(self):
510
485
"""When setting, read data from supplied file object"""
511
486
self._vts = value.read()
514
490
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
517
493
class SetExtendedTimeoutCmd(PropertyCmd,
518
494
MillisecondsValueArgumentMixIn):
519
propname = "ExtendedTimeout"
495
property = "ExtendedTimeout"
521
497
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
522
propname = "Interval"
498
property = "Interval"
524
500
class SetApprovalDelayCmd(PropertyCmd,
525
501
MillisecondsValueArgumentMixIn):
526
propname = "ApprovalDelay"
502
property = "ApprovalDelay"
528
504
class SetApprovalDurationCmd(PropertyCmd,
529
505
MillisecondsValueArgumentMixIn):
530
propname = "ApprovalDuration"
506
property = "ApprovalDuration"
508
def has_actions(options):
509
return any((options.enable,
511
options.bump_timeout,
512
options.start_checker,
513
options.stop_checker,
516
options.checker is not None,
517
options.timeout is not None,
518
options.extended_timeout is not None,
519
options.interval is not None,
520
options.approved_by_default is not None,
521
options.approval_delay is not None,
522
options.approval_duration is not None,
523
options.host is not None,
524
options.secret is not None,
532
528
def add_command_line_options(parser):
533
529
parser.add_argument("--version", action="version",
673
def check_option_syntax(parser, options):
674
"""Apply additional restrictions on options, not expressible in
677
def has_actions(options):
678
return any((options.enable,
680
options.bump_timeout,
681
options.start_checker,
682
options.stop_checker,
685
options.checker is not None,
686
options.timeout is not None,
687
options.extended_timeout is not None,
688
options.interval is not None,
689
options.approved_by_default is not None,
690
options.approval_delay is not None,
691
options.approval_duration is not None,
692
options.host is not None,
693
options.secret is not None,
668
parser = argparse.ArgumentParser()
670
add_command_line_options(parser)
672
options = parser.parse_args()
697
674
if has_actions(options) and not (options.client or options.all):
698
675
parser.error("Options require clients names or --all.")
846
797
self.attributes = attributes
847
798
self.attributes["Name"] = name
849
def Set(self, interface, propname, value, dbus_interface):
850
testcase.assertEqual(interface, client_interface)
851
testcase.assertEqual(dbus_interface,
852
dbus.PROPERTIES_IFACE)
853
self.attributes[propname] = value
854
def Get(self, interface, propname, dbus_interface):
855
testcase.assertEqual(interface, client_interface)
856
testcase.assertEqual(dbus_interface,
857
dbus.PROPERTIES_IFACE)
858
return self.attributes[propname]
800
def Set(self, interface, property, value, dbus_interface):
801
testcase.assertEqual(interface, client_interface)
802
testcase.assertEqual(dbus_interface,
803
dbus.PROPERTIES_IFACE)
804
self.attributes[property] = value
805
def Get(self, interface, property, dbus_interface):
806
testcase.assertEqual(interface, client_interface)
807
testcase.assertEqual(dbus_interface,
808
dbus.PROPERTIES_IFACE)
809
return self.attributes[property]
859
810
def Approve(self, approve, dbus_interface):
860
811
testcase.assertEqual(dbus_interface, client_interface)
861
812
self.calls.append(("Approve", (approve,
1098
1048
class TestBumpTimeoutCmd(TestPropertyCmd):
1099
1049
command = BumpTimeoutCmd
1100
propname = "LastCheckedOK"
1050
property = "LastCheckedOK"
1101
1051
values_to_set = [""]
1103
1053
class TestStartCheckerCmd(TestPropertyCmd):
1104
1054
command = StartCheckerCmd
1105
propname = "CheckerRunning"
1055
property = "CheckerRunning"
1106
1056
values_to_set = [dbus.Boolean(True)]
1108
1058
class TestStopCheckerCmd(TestPropertyCmd):
1109
1059
command = StopCheckerCmd
1110
propname = "CheckerRunning"
1060
property = "CheckerRunning"
1111
1061
values_to_set = [dbus.Boolean(False)]
1113
1063
class TestApproveByDefaultCmd(TestPropertyCmd):
1114
1064
command = ApproveByDefaultCmd
1115
propname = "ApprovedByDefault"
1065
property = "ApprovedByDefault"
1116
1066
values_to_set = [dbus.Boolean(True)]
1118
1068
class TestDenyByDefaultCmd(TestPropertyCmd):
1119
1069
command = DenyByDefaultCmd
1120
propname = "ApprovedByDefault"
1070
property = "ApprovedByDefault"
1121
1071
values_to_set = [dbus.Boolean(False)]
1123
1073
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1133
1083
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1134
1084
command = SetCheckerCmd
1135
propname = "Checker"
1085
property = "Checker"
1136
1086
values_to_set = ["", ":", "fping -q -- %s"]
1138
1088
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1139
1089
command = SetHostCmd
1141
1091
values_to_set = ["192.0.2.3", "foo.example.org"]
1143
1093
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1144
1094
command = SetSecretCmd
1146
values_to_set = [io.BytesIO(b""),
1096
values_to_set = [open("/dev/null", "rb"),
1147
1097
io.BytesIO(b"secret\0xyzzy\nbar")]
1148
1098
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1150
1100
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1151
1101
command = SetTimeoutCmd
1152
propname = "Timeout"
1153
values_to_set = [datetime.timedelta(),
1154
datetime.timedelta(minutes=5),
1155
datetime.timedelta(seconds=1),
1156
datetime.timedelta(weeks=1),
1157
datetime.timedelta(weeks=52)]
1158
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1102
property = "Timeout"
1103
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1104
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1160
1106
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1161
1107
command = SetExtendedTimeoutCmd
1162
propname = "ExtendedTimeout"
1163
values_to_set = [datetime.timedelta(),
1164
datetime.timedelta(minutes=5),
1165
datetime.timedelta(seconds=1),
1166
datetime.timedelta(weeks=1),
1167
datetime.timedelta(weeks=52)]
1168
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1108
property = "ExtendedTimeout"
1109
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1110
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1170
1112
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1171
1113
command = SetIntervalCmd
1172
propname = "Interval"
1173
values_to_set = [datetime.timedelta(),
1174
datetime.timedelta(minutes=5),
1175
datetime.timedelta(seconds=1),
1176
datetime.timedelta(weeks=1),
1177
datetime.timedelta(weeks=52)]
1178
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1114
property = "Interval"
1115
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1116
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1180
1118
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1181
1119
command = SetApprovalDelayCmd
1182
propname = "ApprovalDelay"
1183
values_to_set = [datetime.timedelta(),
1184
datetime.timedelta(minutes=5),
1185
datetime.timedelta(seconds=1),
1186
datetime.timedelta(weeks=1),
1187
datetime.timedelta(weeks=52)]
1188
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1120
property = "ApprovalDelay"
1121
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1122
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1190
1124
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1191
1125
command = SetApprovalDurationCmd
1192
propname = "ApprovalDuration"
1193
values_to_set = [datetime.timedelta(),
1194
datetime.timedelta(minutes=5),
1195
datetime.timedelta(seconds=1),
1196
datetime.timedelta(weeks=1),
1197
datetime.timedelta(weeks=52)]
1198
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1126
property = "ApprovalDuration"
1127
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1128
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1200
1130
class Test_command_from_options(unittest.TestCase):
1201
1131
def setUp(self):
1220
1149
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1223
def test_print_table_verbose_short(self):
1224
self.assert_command_from_args(["-v"], PrintTableCmd,
1227
1152
def test_enable(self):
1228
1153
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1230
def test_enable_short(self):
1231
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1233
1155
def test_disable(self):
1234
1156
self.assert_command_from_args(["--disable", "foo"],
1237
def test_disable_short(self):
1238
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1240
1159
def test_bump_timeout(self):
1241
1160
self.assert_command_from_args(["--bump-timeout", "foo"],
1242
1161
BumpTimeoutCmd)
1244
def test_bump_timeout_short(self):
1245
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1247
1163
def test_start_checker(self):
1248
1164
self.assert_command_from_args(["--start-checker", "foo"],
1249
1165
StartCheckerCmd)
1256
1172
self.assert_command_from_args(["--remove", "foo"],
1259
def test_remove_short(self):
1260
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1262
1175
def test_checker(self):
1263
1176
self.assert_command_from_args(["--checker", ":", "foo"],
1264
1177
SetCheckerCmd, value_to_set=":")
1266
def test_checker_empty(self):
1267
self.assert_command_from_args(["--checker", "", "foo"],
1268
SetCheckerCmd, value_to_set="")
1270
def test_checker_short(self):
1271
self.assert_command_from_args(["-c", ":", "foo"],
1272
SetCheckerCmd, value_to_set=":")
1274
1179
def test_timeout(self):
1275
1180
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1277
1182
value_to_set=300000)
1279
def test_timeout_short(self):
1280
self.assert_command_from_args(["-t", "PT5M", "foo"],
1282
value_to_set=300000)
1284
1184
def test_extended_timeout(self):
1285
1185
self.assert_command_from_args(["--extended-timeout", "PT15M",
1339
1229
"foo"], SetSecretCmd,
1340
1230
value_to_set=value)
1342
def test_secret_devnull_short(self):
1343
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1344
SetSecretCmd, value_to_set=b"")
1346
def test_secret_tempfile_short(self):
1347
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1348
value = b"secret\0xyzzy\nbar"
1351
self.assert_command_from_args(["-s", f.name, "foo"],
1355
1232
def test_approve(self):
1356
1233
self.assert_command_from_args(["--approve", "foo"],
1359
def test_approve_short(self):
1360
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1362
1236
def test_deny(self):
1363
1237
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1365
def test_deny_short(self):
1366
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1368
1239
def test_dump_json(self):
1369
1240
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1372
1243
self.assert_command_from_args(["--is-enabled", "foo"],
1375
def test_is_enabled_short(self):
1376
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1378
def test_deny_before_remove(self):
1379
options = self.parser.parse_args(["--deny", "--remove", "foo"])
1380
check_option_syntax(self.parser, options)
1381
commands = commands_from_options(options)
1382
self.assertEqual(len(commands), 2)
1383
self.assertIsInstance(commands[0], DenyCmd)
1384
self.assertIsInstance(commands[1], RemoveCmd)
1386
def test_deny_before_remove_reversed(self):
1387
options = self.parser.parse_args(["--remove", "--deny", "--all"])
1388
check_option_syntax(self.parser, options)
1389
commands = commands_from_options(options)
1390
self.assertEqual(len(commands), 2)
1391
self.assertIsInstance(commands[0], DenyCmd)
1392
self.assertIsInstance(commands[1], RemoveCmd)
1395
class Test_check_option_syntax(unittest.TestCase):
1396
# This mostly corresponds to the definition from has_actions() in
1397
# check_option_syntax()
1399
# The actual values set here are not that important, but we do
1400
# at least stick to the correct types, even though they are
1404
"bump_timeout": True,
1405
"start_checker": True,
1406
"stop_checker": True,
1410
"timeout": datetime.timedelta(),
1411
"extended_timeout": datetime.timedelta(),
1412
"interval": datetime.timedelta(),
1413
"approved_by_default": True,
1414
"approval_delay": datetime.timedelta(),
1415
"approval_duration": datetime.timedelta(),
1417
"secret": io.BytesIO(b"x"),
1423
self.parser = argparse.ArgumentParser()
1424
add_command_line_options(self.parser)
1426
@contextlib.contextmanager
1427
def assertParseError(self):
1428
with self.assertRaises(SystemExit) as e:
1429
with self.temporarily_suppress_stderr():
1431
# Exit code from argparse is guaranteed to be "2". Reference:
1432
# https://docs.python.org/3/library/argparse.html#exiting-methods
1433
self.assertEqual(e.exception.code, 2)
1436
@contextlib.contextmanager
1437
def temporarily_suppress_stderr():
1438
null = os.open(os.path.devnull, os.O_RDWR)
1439
stderrcopy = os.dup(sys.stderr.fileno())
1440
os.dup2(null, sys.stderr.fileno())
1446
os.dup2(stderrcopy, sys.stderr.fileno())
1447
os.close(stderrcopy)
1449
def check_option_syntax(self, options):
1450
check_option_syntax(self.parser, options)
1452
def test_actions_requires_client_or_all(self):
1453
for action, value in self.actions.items():
1454
options = self.parser.parse_args()
1455
setattr(options, action, value)
1456
with self.assertParseError():
1457
self.check_option_syntax(options)
1459
def test_actions_conflicts_with_verbose(self):
1460
for action, value in self.actions.items():
1461
options = self.parser.parse_args()
1462
setattr(options, action, value)
1463
options.verbose = True
1464
with self.assertParseError():
1465
self.check_option_syntax(options)
1467
def test_dump_json_conflicts_with_verbose(self):
1468
options = self.parser.parse_args()
1469
options.dump_json = True
1470
options.verbose = True
1471
with self.assertParseError():
1472
self.check_option_syntax(options)
1474
def test_dump_json_conflicts_with_action(self):
1475
for action, value in self.actions.items():
1476
options = self.parser.parse_args()
1477
setattr(options, action, value)
1478
options.dump_json = True
1479
with self.assertParseError():
1480
self.check_option_syntax(options)
1482
def test_all_can_not_be_alone(self):
1483
options = self.parser.parse_args()
1485
with self.assertParseError():
1486
self.check_option_syntax(options)
1488
def test_all_is_ok_with_any_action(self):
1489
for action, value in self.actions.items():
1490
options = self.parser.parse_args()
1491
setattr(options, action, value)
1493
self.check_option_syntax(options)
1495
def test_is_enabled_fails_without_client(self):
1496
options = self.parser.parse_args()
1497
options.is_enabled = True
1498
with self.assertParseError():
1499
self.check_option_syntax(options)
1501
def test_is_enabled_works_with_one_client(self):
1502
options = self.parser.parse_args()
1503
options.is_enabled = True
1504
options.client = ["foo"]
1505
self.check_option_syntax(options)
1507
def test_is_enabled_fails_with_two_clients(self):
1508
options = self.parser.parse_args()
1509
options.is_enabled = True
1510
options.client = ["foo", "barbar"]
1511
with self.assertParseError():
1512
self.check_option_syntax(options)
1514
def test_remove_can_only_be_combined_with_action_deny(self):
1515
for action, value in self.actions.items():
1516
if action in {"remove", "deny"}:
1518
options = self.parser.parse_args()
1519
setattr(options, action, value)
1521
options.remove = True
1522
with self.assertParseError():
1523
self.check_option_syntax(options)
1527
1248
def should_only_run_tests():