293
294
"ApprovalDuration", "Checker", "ExtendedTimeout",
294
295
"Expires", "LastCheckerStatus")
295
296
def run(self, mandos, clients):
296
print(self.output(clients))
297
print(self.output(clients.values()))
298
299
class PropertyCmd(Command):
299
300
"""Abstract class for Actions for setting one client property"""
300
301
def run_on_one_client(self, client, properties):
301
302
"""Set the Client's D-Bus property"""
302
client.Set(client_interface, self.property, self.value_to_set,
303
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
304
client.__dbus_object_path__,
305
dbus.PROPERTIES_IFACE, client_interface,
306
self.propname, self.value_to_set
307
if not isinstance(self.value_to_set, dbus.Boolean)
308
else bool(self.value_to_set))
309
client.Set(client_interface, self.propname, self.value_to_set,
303
310
dbus_interface=dbus.PROPERTIES_IFACE)
305
312
class ValueArgumentMixIn(object):
428
435
def is_enabled(self, client, properties):
429
return bool(properties["Enabled"])
436
log.debug("D-Bus: %s:%s:%s.Get(%r, %r)", busname,
437
client.__dbus_object_path__,
438
dbus.PROPERTIES_IFACE, client_interface,
440
return bool(client.Get(client_interface, "Enabled",
441
dbus_interface=dbus.PROPERTIES_IFACE))
431
443
class RemoveCmd(Command):
432
444
def run_on_one_client(self, client, properties):
445
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
446
server_path, server_interface,
447
str(client.__dbus_object_path__))
433
448
self.mandos.RemoveClient(client.__dbus_object_path__)
435
450
class ApproveCmd(Command):
436
451
def run_on_one_client(self, client, properties):
452
log.debug("D-Bus: %s:%s.Approve(True)",
453
client.__dbus_object_path__, client_interface)
437
454
client.Approve(dbus.Boolean(True),
438
455
dbus_interface=client_interface)
440
457
class DenyCmd(Command):
441
458
def run_on_one_client(self, client, properties):
459
log.debug("D-Bus: %s:%s.Approve(False)",
460
client.__dbus_object_path__, client_interface)
442
461
client.Approve(dbus.Boolean(False),
443
462
dbus_interface=client_interface)
445
464
class EnableCmd(PropertyCmd):
447
466
value_to_set = dbus.Boolean(True)
449
468
class DisableCmd(PropertyCmd):
451
470
value_to_set = dbus.Boolean(False)
453
472
class BumpTimeoutCmd(PropertyCmd):
454
property = "LastCheckedOK"
473
propname = "LastCheckedOK"
455
474
value_to_set = ""
457
476
class StartCheckerCmd(PropertyCmd):
458
property = "CheckerRunning"
477
propname = "CheckerRunning"
459
478
value_to_set = dbus.Boolean(True)
461
480
class StopCheckerCmd(PropertyCmd):
462
property = "CheckerRunning"
481
propname = "CheckerRunning"
463
482
value_to_set = dbus.Boolean(False)
465
484
class ApproveByDefaultCmd(PropertyCmd):
466
property = "ApprovedByDefault"
485
propname = "ApprovedByDefault"
467
486
value_to_set = dbus.Boolean(True)
469
488
class DenyByDefaultCmd(PropertyCmd):
470
property = "ApprovedByDefault"
489
propname = "ApprovedByDefault"
471
490
value_to_set = dbus.Boolean(False)
473
492
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
476
495
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
479
498
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
481
501
def value_to_set(self):
485
505
"""When setting, read data from supplied file object"""
486
506
self._vts = value.read()
490
509
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
493
512
class SetExtendedTimeoutCmd(PropertyCmd,
494
513
MillisecondsValueArgumentMixIn):
495
property = "ExtendedTimeout"
514
propname = "ExtendedTimeout"
497
516
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
498
property = "Interval"
517
propname = "Interval"
500
519
class SetApprovalDelayCmd(PropertyCmd,
501
520
MillisecondsValueArgumentMixIn):
502
property = "ApprovalDelay"
521
propname = "ApprovalDelay"
504
523
class SetApprovalDurationCmd(PropertyCmd,
505
524
MillisecondsValueArgumentMixIn):
506
property = "ApprovalDuration"
508
def has_actions(options):
509
return any((options.enable,
511
options.bump_timeout,
512
options.start_checker,
513
options.stop_checker,
516
options.checker is not None,
517
options.timeout is not None,
518
options.extended_timeout is not None,
519
options.interval is not None,
520
options.approved_by_default is not None,
521
options.approval_delay is not None,
522
options.approval_duration is not None,
523
options.host is not None,
524
options.secret is not None,
525
propname = "ApprovalDuration"
528
527
def add_command_line_options(parser):
529
528
parser.add_argument("--version", action="version",
668
parser = argparse.ArgumentParser()
670
add_command_line_options(parser)
672
options = parser.parse_args()
668
def check_option_syntax(parser, options):
669
"""Apply additional restrictions on options, not expressible in
672
def has_actions(options):
673
return any((options.enable,
675
options.bump_timeout,
676
options.start_checker,
677
options.stop_checker,
680
options.checker is not None,
681
options.timeout is not None,
682
options.extended_timeout is not None,
683
options.interval is not None,
684
options.approved_by_default is not None,
685
options.approval_delay is not None,
686
options.approval_duration is not None,
687
options.host is not None,
688
options.secret is not None,
674
692
if has_actions(options) and not (options.client or options.all):
675
693
parser.error("Options require clients names or --all.")
797
841
self.attributes = attributes
798
842
self.attributes["Name"] = name
800
def Set(self, interface, property, value, dbus_interface):
801
testcase.assertEqual(interface, client_interface)
802
testcase.assertEqual(dbus_interface,
803
dbus.PROPERTIES_IFACE)
804
self.attributes[property] = value
805
def Get(self, interface, property, dbus_interface):
806
testcase.assertEqual(interface, client_interface)
807
testcase.assertEqual(dbus_interface,
808
dbus.PROPERTIES_IFACE)
809
return self.attributes[property]
844
def Set(self, interface, propname, value, dbus_interface):
845
testcase.assertEqual(interface, client_interface)
846
testcase.assertEqual(dbus_interface,
847
dbus.PROPERTIES_IFACE)
848
self.attributes[propname] = value
849
def Get(self, interface, propname, dbus_interface):
850
testcase.assertEqual(interface, client_interface)
851
testcase.assertEqual(dbus_interface,
852
dbus.PROPERTIES_IFACE)
853
return self.attributes[propname]
810
854
def Approve(self, approve, dbus_interface):
811
855
testcase.assertEqual(dbus_interface, client_interface)
812
856
self.calls.append(("Approve", (approve,
1048
1093
class TestBumpTimeoutCmd(TestPropertyCmd):
1049
1094
command = BumpTimeoutCmd
1050
property = "LastCheckedOK"
1095
propname = "LastCheckedOK"
1051
1096
values_to_set = [""]
1053
1098
class TestStartCheckerCmd(TestPropertyCmd):
1054
1099
command = StartCheckerCmd
1055
property = "CheckerRunning"
1100
propname = "CheckerRunning"
1056
1101
values_to_set = [dbus.Boolean(True)]
1058
1103
class TestStopCheckerCmd(TestPropertyCmd):
1059
1104
command = StopCheckerCmd
1060
property = "CheckerRunning"
1105
propname = "CheckerRunning"
1061
1106
values_to_set = [dbus.Boolean(False)]
1063
1108
class TestApproveByDefaultCmd(TestPropertyCmd):
1064
1109
command = ApproveByDefaultCmd
1065
property = "ApprovedByDefault"
1110
propname = "ApprovedByDefault"
1066
1111
values_to_set = [dbus.Boolean(True)]
1068
1113
class TestDenyByDefaultCmd(TestPropertyCmd):
1069
1114
command = DenyByDefaultCmd
1070
property = "ApprovedByDefault"
1115
propname = "ApprovedByDefault"
1071
1116
values_to_set = [dbus.Boolean(False)]
1073
1118
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1083
1128
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1084
1129
command = SetCheckerCmd
1085
property = "Checker"
1130
propname = "Checker"
1086
1131
values_to_set = ["", ":", "fping -q -- %s"]
1088
1133
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1089
1134
command = SetHostCmd
1091
1136
values_to_set = ["192.0.2.3", "foo.example.org"]
1093
1138
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1094
1139
command = SetSecretCmd
1096
values_to_set = [open("/dev/null", "rb"),
1141
values_to_set = [io.BytesIO(b""),
1097
1142
io.BytesIO(b"secret\0xyzzy\nbar")]
1098
1143
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1100
1145
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1101
1146
command = SetTimeoutCmd
1102
property = "Timeout"
1103
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1104
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1147
propname = "Timeout"
1148
values_to_set = [datetime.timedelta(),
1149
datetime.timedelta(minutes=5),
1150
datetime.timedelta(seconds=1),
1151
datetime.timedelta(weeks=1),
1152
datetime.timedelta(weeks=52)]
1153
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1106
1155
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1107
1156
command = SetExtendedTimeoutCmd
1108
property = "ExtendedTimeout"
1109
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1110
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1157
propname = "ExtendedTimeout"
1158
values_to_set = [datetime.timedelta(),
1159
datetime.timedelta(minutes=5),
1160
datetime.timedelta(seconds=1),
1161
datetime.timedelta(weeks=1),
1162
datetime.timedelta(weeks=52)]
1163
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1112
1165
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1113
1166
command = SetIntervalCmd
1114
property = "Interval"
1115
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1116
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1167
propname = "Interval"
1168
values_to_set = [datetime.timedelta(),
1169
datetime.timedelta(minutes=5),
1170
datetime.timedelta(seconds=1),
1171
datetime.timedelta(weeks=1),
1172
datetime.timedelta(weeks=52)]
1173
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1118
1175
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1119
1176
command = SetApprovalDelayCmd
1120
property = "ApprovalDelay"
1121
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1122
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1177
propname = "ApprovalDelay"
1178
values_to_set = [datetime.timedelta(),
1179
datetime.timedelta(minutes=5),
1180
datetime.timedelta(seconds=1),
1181
datetime.timedelta(weeks=1),
1182
datetime.timedelta(weeks=52)]
1183
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1124
1185
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1125
1186
command = SetApprovalDurationCmd
1126
property = "ApprovalDuration"
1127
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1128
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1187
propname = "ApprovalDuration"
1188
values_to_set = [datetime.timedelta(),
1189
datetime.timedelta(minutes=5),
1190
datetime.timedelta(seconds=1),
1191
datetime.timedelta(weeks=1),
1192
datetime.timedelta(weeks=52)]
1193
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1130
1195
class Test_command_from_options(unittest.TestCase):
1131
1196
def setUp(self):
1149
1215
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1218
def test_print_table_verbose_short(self):
1219
self.assert_command_from_args(["-v"], PrintTableCmd,
1152
1222
def test_enable(self):
1153
1223
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1225
def test_enable_short(self):
1226
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1155
1228
def test_disable(self):
1156
1229
self.assert_command_from_args(["--disable", "foo"],
1232
def test_disable_short(self):
1233
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1159
1235
def test_bump_timeout(self):
1160
1236
self.assert_command_from_args(["--bump-timeout", "foo"],
1161
1237
BumpTimeoutCmd)
1239
def test_bump_timeout_short(self):
1240
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1163
1242
def test_start_checker(self):
1164
1243
self.assert_command_from_args(["--start-checker", "foo"],
1165
1244
StartCheckerCmd)
1172
1251
self.assert_command_from_args(["--remove", "foo"],
1254
def test_remove_short(self):
1255
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1175
1257
def test_checker(self):
1176
1258
self.assert_command_from_args(["--checker", ":", "foo"],
1177
1259
SetCheckerCmd, value_to_set=":")
1261
def test_checker_empty(self):
1262
self.assert_command_from_args(["--checker", "", "foo"],
1263
SetCheckerCmd, value_to_set="")
1265
def test_checker_short(self):
1266
self.assert_command_from_args(["-c", ":", "foo"],
1267
SetCheckerCmd, value_to_set=":")
1179
1269
def test_timeout(self):
1180
1270
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1182
1272
value_to_set=300000)
1274
def test_timeout_short(self):
1275
self.assert_command_from_args(["-t", "PT5M", "foo"],
1277
value_to_set=300000)
1184
1279
def test_extended_timeout(self):
1185
1280
self.assert_command_from_args(["--extended-timeout", "PT15M",
1229
1334
"foo"], SetSecretCmd,
1230
1335
value_to_set=value)
1337
def test_secret_devnull_short(self):
1338
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1339
SetSecretCmd, value_to_set=b"")
1341
def test_secret_tempfile_short(self):
1342
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1343
value = b"secret\0xyzzy\nbar"
1346
self.assert_command_from_args(["-s", f.name, "foo"],
1232
1350
def test_approve(self):
1233
1351
self.assert_command_from_args(["--approve", "foo"],
1354
def test_approve_short(self):
1355
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1236
1357
def test_deny(self):
1237
1358
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1360
def test_deny_short(self):
1361
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1239
1363
def test_dump_json(self):
1240
1364
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1243
1367
self.assert_command_from_args(["--is-enabled", "foo"],
1370
def test_is_enabled_short(self):
1371
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1373
def test_deny_before_remove(self):
1374
options = self.parser.parse_args(["--deny", "--remove", "foo"])
1375
check_option_syntax(self.parser, options)
1376
commands = commands_from_options(options)
1377
self.assertEqual(len(commands), 2)
1378
self.assertIsInstance(commands[0], DenyCmd)
1379
self.assertIsInstance(commands[1], RemoveCmd)
1381
def test_deny_before_remove_reversed(self):
1382
options = self.parser.parse_args(["--remove", "--deny", "--all"])
1383
check_option_syntax(self.parser, options)
1384
commands = commands_from_options(options)
1385
self.assertEqual(len(commands), 2)
1386
self.assertIsInstance(commands[0], DenyCmd)
1387
self.assertIsInstance(commands[1], RemoveCmd)
1390
class Test_check_option_syntax(unittest.TestCase):
1391
# This mostly corresponds to the definition from has_actions() in
1392
# check_option_syntax()
1394
# The actual values set here are not that important, but we do
1395
# at least stick to the correct types, even though they are
1399
"bump_timeout": True,
1400
"start_checker": True,
1401
"stop_checker": True,
1405
"timeout": datetime.timedelta(),
1406
"extended_timeout": datetime.timedelta(),
1407
"interval": datetime.timedelta(),
1408
"approved_by_default": True,
1409
"approval_delay": datetime.timedelta(),
1410
"approval_duration": datetime.timedelta(),
1412
"secret": io.BytesIO(b"x"),
1418
self.parser = argparse.ArgumentParser()
1419
add_command_line_options(self.parser)
1421
@contextlib.contextmanager
1422
def assertParseError(self):
1423
with self.assertRaises(SystemExit) as e:
1424
with self.temporarily_suppress_stderr():
1426
# Exit code from argparse is guaranteed to be "2". Reference:
1427
# https://docs.python.org/3/library/argparse.html#exiting-methods
1428
self.assertEqual(e.exception.code, 2)
1431
@contextlib.contextmanager
1432
def temporarily_suppress_stderr():
1433
null = os.open(os.path.devnull, os.O_RDWR)
1434
stderrcopy = os.dup(sys.stderr.fileno())
1435
os.dup2(null, sys.stderr.fileno())
1441
os.dup2(stderrcopy, sys.stderr.fileno())
1442
os.close(stderrcopy)
1444
def check_option_syntax(self, options):
1445
check_option_syntax(self.parser, options)
1447
def test_actions_requires_client_or_all(self):
1448
for action, value in self.actions.items():
1449
options = self.parser.parse_args()
1450
setattr(options, action, value)
1451
with self.assertParseError():
1452
self.check_option_syntax(options)
1454
def test_actions_conflicts_with_verbose(self):
1455
for action, value in self.actions.items():
1456
options = self.parser.parse_args()
1457
setattr(options, action, value)
1458
options.verbose = True
1459
with self.assertParseError():
1460
self.check_option_syntax(options)
1462
def test_dump_json_conflicts_with_verbose(self):
1463
options = self.parser.parse_args()
1464
options.dump_json = True
1465
options.verbose = True
1466
with self.assertParseError():
1467
self.check_option_syntax(options)
1469
def test_dump_json_conflicts_with_action(self):
1470
for action, value in self.actions.items():
1471
options = self.parser.parse_args()
1472
setattr(options, action, value)
1473
options.dump_json = True
1474
with self.assertParseError():
1475
self.check_option_syntax(options)
1477
def test_all_can_not_be_alone(self):
1478
options = self.parser.parse_args()
1480
with self.assertParseError():
1481
self.check_option_syntax(options)
1483
def test_all_is_ok_with_any_action(self):
1484
for action, value in self.actions.items():
1485
options = self.parser.parse_args()
1486
setattr(options, action, value)
1488
self.check_option_syntax(options)
1490
def test_is_enabled_fails_without_client(self):
1491
options = self.parser.parse_args()
1492
options.is_enabled = True
1493
with self.assertParseError():
1494
self.check_option_syntax(options)
1496
def test_is_enabled_works_with_one_client(self):
1497
options = self.parser.parse_args()
1498
options.is_enabled = True
1499
options.client = ["foo"]
1500
self.check_option_syntax(options)
1502
def test_is_enabled_fails_with_two_clients(self):
1503
options = self.parser.parse_args()
1504
options.is_enabled = True
1505
options.client = ["foo", "barbar"]
1506
with self.assertParseError():
1507
self.check_option_syntax(options)
1509
def test_remove_can_only_be_combined_with_action_deny(self):
1510
for action, value in self.actions.items():
1511
if action in {"remove", "deny"}:
1513
options = self.parser.parse_args()
1514
setattr(options, action, value)
1516
options.remove = True
1517
with self.assertParseError():
1518
self.check_option_syntax(options)
1248
1522
def should_only_run_tests():