441
421
class IsEnabledCmd(Command):
442
def run(self, clients, bus=None, mandos=None):
443
client, properties = next(iter(clients.items()))
422
def run_on_one_client(self, client, properties):
444
423
if self.is_enabled(client, properties):
447
426
def is_enabled(self, client, properties):
448
return properties["Enabled"]
427
return bool(properties["Enabled"])
450
429
class RemoveCmd(Command):
451
430
def run_on_one_client(self, client, properties):
452
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname,
453
server_dbus_path, server_dbus_interface,
454
str(client.__dbus_object_path__))
455
431
self.mandos.RemoveClient(client.__dbus_object_path__)
457
433
class ApproveCmd(Command):
458
434
def run_on_one_client(self, client, properties):
459
log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
460
client.__dbus_object_path__, client_dbus_interface)
461
435
client.Approve(dbus.Boolean(True),
462
dbus_interface=client_dbus_interface)
436
dbus_interface=client_interface)
464
438
class DenyCmd(Command):
465
439
def run_on_one_client(self, client, properties):
466
log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
467
client.__dbus_object_path__, client_dbus_interface)
468
440
client.Approve(dbus.Boolean(False),
469
dbus_interface=client_dbus_interface)
441
dbus_interface=client_interface)
471
443
class EnableCmd(PropertyCmd):
473
445
value_to_set = dbus.Boolean(True)
475
447
class DisableCmd(PropertyCmd):
477
449
value_to_set = dbus.Boolean(False)
479
451
class BumpTimeoutCmd(PropertyCmd):
480
propname = "LastCheckedOK"
452
property = "LastCheckedOK"
481
453
value_to_set = ""
483
455
class StartCheckerCmd(PropertyCmd):
484
propname = "CheckerRunning"
456
property = "CheckerRunning"
485
457
value_to_set = dbus.Boolean(True)
487
459
class StopCheckerCmd(PropertyCmd):
488
propname = "CheckerRunning"
460
property = "CheckerRunning"
489
461
value_to_set = dbus.Boolean(False)
491
463
class ApproveByDefaultCmd(PropertyCmd):
492
propname = "ApprovedByDefault"
464
property = "ApprovedByDefault"
493
465
value_to_set = dbus.Boolean(True)
495
467
class DenyByDefaultCmd(PropertyCmd):
496
propname = "ApprovedByDefault"
468
property = "ApprovedByDefault"
497
469
value_to_set = dbus.Boolean(False)
499
class SetCheckerCmd(PropertyValueCmd):
502
class SetHostCmd(PropertyValueCmd):
505
class SetSecretCmd(PropertyValueCmd):
508
def value_to_set(self):
511
def value_to_set(self, value):
512
"""When setting, read data from supplied file object"""
513
self._vts = value.read()
516
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
519
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
520
propname = "ExtendedTimeout"
522
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
523
propname = "Interval"
525
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
526
propname = "ApprovalDelay"
528
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
529
propname = "ApprovalDuration"
471
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
474
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
477
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
480
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
483
class SetExtendedTimeoutCmd(PropertyCmd,
484
MillisecondsValueArgumentMixIn):
485
property = "ExtendedTimeout"
487
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
488
property = "Interval"
490
class SetApprovalDelayCmd(PropertyCmd,
491
MillisecondsValueArgumentMixIn):
492
property = "ApprovalDelay"
494
class SetApprovalDurationCmd(PropertyCmd,
495
MillisecondsValueArgumentMixIn):
496
property = "ApprovalDuration"
498
def has_actions(options):
499
return any((options.enable,
501
options.bump_timeout,
502
options.start_checker,
503
options.stop_checker,
506
options.checker is not None,
507
options.timeout is not None,
508
options.extended_timeout is not None,
509
options.interval is not None,
510
options.approved_by_default is not None,
511
options.approval_delay is not None,
512
options.approval_duration is not None,
513
options.host is not None,
514
options.secret is not None,
531
518
def add_command_line_options(parser):
532
519
parser.add_argument("--version", action="version",
1065
969
self.calls.append(("RemoveClient", (dbus_path,)))
1066
970
mandos = MockMandos()
1067
971
super(TestRemoveCmd, self).setUp()
1068
RemoveCmd().run(self.clients, self.bus, mandos)
972
RemoveCmd().run(mandos, self.clients)
1069
973
self.assertEqual(len(mandos.calls), 2)
1070
for clientpath in self.clients:
1071
self.assertIn(("RemoveClient", (clientpath,)),
974
for client in self.clients:
975
self.assertIn(("RemoveClient",
976
(client.__dbus_object_path__,)),
1074
979
class TestApproveCmd(TestCmd):
1075
980
def test_approve(self):
1076
ApproveCmd().run(self.clients, self.bus)
1077
for clientpath in self.clients:
1078
client = self.bus.get_object(dbus_busname, clientpath)
1079
self.assertIn(("Approve", (True, client_dbus_interface)),
981
ApproveCmd().run(None, self.clients)
982
for client in self.clients:
983
self.assertIn(("Approve", (True, client_interface)),
1082
986
class TestDenyCmd(TestCmd):
1083
987
def test_deny(self):
1084
DenyCmd().run(self.clients, self.bus)
1085
for clientpath in self.clients:
1086
client = self.bus.get_object(dbus_busname, clientpath)
1087
self.assertIn(("Approve", (False, client_dbus_interface)),
988
DenyCmd().run(None, self.clients)
989
for client in self.clients:
990
self.assertIn(("Approve", (False, client_interface)),
1090
993
class TestEnableCmd(TestCmd):
1091
994
def test_enable(self):
1092
for clientpath in self.clients:
1093
client = self.bus.get_object(dbus_busname, clientpath)
995
for client in self.clients:
1094
996
client.attributes["Enabled"] = False
1096
EnableCmd().run(self.clients, self.bus)
998
EnableCmd().run(None, self.clients)
1098
for clientpath in self.clients:
1099
client = self.bus.get_object(dbus_busname, clientpath)
1000
for client in self.clients:
1100
1001
self.assertTrue(client.attributes["Enabled"])
1102
1003
class TestDisableCmd(TestCmd):
1103
1004
def test_disable(self):
1104
DisableCmd().run(self.clients, self.bus)
1105
for clientpath in self.clients:
1106
client = self.bus.get_object(dbus_busname, clientpath)
1005
DisableCmd().run(None, self.clients)
1007
for client in self.clients:
1107
1008
self.assertFalse(client.attributes["Enabled"])
1109
class Unique(object):
1110
"""Class for objects which exist only to be unique objects, since
1111
unittest.mock.sentinel only exists in Python 3.3"""
1113
class TestPropertyCmd(TestCmd):
1114
"""Abstract class for tests of PropertyCmd classes"""
1116
if not hasattr(self, "command"):
1118
values_to_get = getattr(self, "values_to_get",
1120
for value_to_set, value_to_get in zip(self.values_to_set,
1122
for clientpath in self.clients:
1123
client = self.bus.get_object(dbus_busname, clientpath)
1124
old_value = client.attributes[self.propname]
1125
self.assertNotIsInstance(old_value, Unique)
1126
client.attributes[self.propname] = Unique()
1127
self.run_command(value_to_set, self.clients)
1128
for clientpath in self.clients:
1129
client = self.bus.get_object(dbus_busname, clientpath)
1130
value = client.attributes[self.propname]
1131
self.assertNotIsInstance(value, Unique)
1132
self.assertEqual(value, value_to_get)
1133
def run_command(self, value, clients):
1134
self.command().run(clients, self.bus)
1136
class TestBumpTimeoutCmd(TestPropertyCmd):
1137
command = BumpTimeoutCmd
1138
propname = "LastCheckedOK"
1139
values_to_set = [""]
1141
class TestStartCheckerCmd(TestPropertyCmd):
1142
command = StartCheckerCmd
1143
propname = "CheckerRunning"
1144
values_to_set = [dbus.Boolean(True)]
1146
class TestStopCheckerCmd(TestPropertyCmd):
1147
command = StopCheckerCmd
1148
propname = "CheckerRunning"
1149
values_to_set = [dbus.Boolean(False)]
1151
class TestApproveByDefaultCmd(TestPropertyCmd):
1152
command = ApproveByDefaultCmd
1153
propname = "ApprovedByDefault"
1154
values_to_set = [dbus.Boolean(True)]
1156
class TestDenyByDefaultCmd(TestPropertyCmd):
1157
command = DenyByDefaultCmd
1158
propname = "ApprovedByDefault"
1159
values_to_set = [dbus.Boolean(False)]
1161
class TestPropertyValueCmd(TestPropertyCmd):
1162
"""Abstract class for tests of PropertyValueCmd classes"""
1164
if type(self) is TestPropertyValueCmd:
1166
return super(TestPropertyValueCmd, self).runTest()
1167
def run_command(self, value, clients):
1168
self.command(value).run(clients, self.bus)
1170
class TestSetCheckerCmd(TestPropertyValueCmd):
1171
command = SetCheckerCmd
1172
propname = "Checker"
1173
values_to_set = ["", ":", "fping -q -- %s"]
1175
class TestSetHostCmd(TestPropertyValueCmd):
1176
command = SetHostCmd
1178
values_to_set = ["192.0.2.3", "foo.example.org"]
1180
class TestSetSecretCmd(TestPropertyValueCmd):
1181
command = SetSecretCmd
1183
values_to_set = [io.BytesIO(b""),
1184
io.BytesIO(b"secret\0xyzzy\nbar")]
1185
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1187
class TestSetTimeoutCmd(TestPropertyValueCmd):
1188
command = SetTimeoutCmd
1189
propname = "Timeout"
1190
values_to_set = [datetime.timedelta(),
1191
datetime.timedelta(minutes=5),
1192
datetime.timedelta(seconds=1),
1193
datetime.timedelta(weeks=1),
1194
datetime.timedelta(weeks=52)]
1195
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1197
class TestSetExtendedTimeoutCmd(TestPropertyValueCmd):
1198
command = SetExtendedTimeoutCmd
1199
propname = "ExtendedTimeout"
1200
values_to_set = [datetime.timedelta(),
1201
datetime.timedelta(minutes=5),
1202
datetime.timedelta(seconds=1),
1203
datetime.timedelta(weeks=1),
1204
datetime.timedelta(weeks=52)]
1205
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1207
class TestSetIntervalCmd(TestPropertyValueCmd):
1208
command = SetIntervalCmd
1209
propname = "Interval"
1210
values_to_set = [datetime.timedelta(),
1211
datetime.timedelta(minutes=5),
1212
datetime.timedelta(seconds=1),
1213
datetime.timedelta(weeks=1),
1214
datetime.timedelta(weeks=52)]
1215
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1217
class TestSetApprovalDelayCmd(TestPropertyValueCmd):
1218
command = SetApprovalDelayCmd
1219
propname = "ApprovalDelay"
1220
values_to_set = [datetime.timedelta(),
1221
datetime.timedelta(minutes=5),
1222
datetime.timedelta(seconds=1),
1223
datetime.timedelta(weeks=1),
1224
datetime.timedelta(weeks=52)]
1225
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1227
class TestSetApprovalDurationCmd(TestPropertyValueCmd):
1228
command = SetApprovalDurationCmd
1229
propname = "ApprovalDuration"
1230
values_to_set = [datetime.timedelta(),
1231
datetime.timedelta(minutes=5),
1232
datetime.timedelta(seconds=1),
1233
datetime.timedelta(weeks=1),
1234
datetime.timedelta(weeks=52)]
1235
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1237
class Test_command_from_options(unittest.TestCase):
1239
self.parser = argparse.ArgumentParser()
1240
add_command_line_options(self.parser)
1241
def assert_command_from_args(self, args, command_cls,
1243
"""Assert that parsing ARGS should result in an instance of
1244
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1245
options = self.parser.parse_args(args)
1246
check_option_syntax(self.parser, options)
1247
commands = commands_from_options(options)
1248
self.assertEqual(len(commands), 1)
1249
command = commands[0]
1250
self.assertIsInstance(command, command_cls)
1251
for key, value in cmd_attrs.items():
1252
self.assertEqual(getattr(command, key), value)
1253
def test_print_table(self):
1254
self.assert_command_from_args([], PrintTableCmd,
1257
def test_print_table_verbose(self):
1258
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1261
def test_print_table_verbose_short(self):
1262
self.assert_command_from_args(["-v"], PrintTableCmd,
1265
def test_enable(self):
1266
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1268
def test_enable_short(self):
1269
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1271
def test_disable(self):
1272
self.assert_command_from_args(["--disable", "foo"],
1275
def test_disable_short(self):
1276
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1278
def test_bump_timeout(self):
1279
self.assert_command_from_args(["--bump-timeout", "foo"],
1282
def test_bump_timeout_short(self):
1283
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1285
def test_start_checker(self):
1286
self.assert_command_from_args(["--start-checker", "foo"],
1289
def test_stop_checker(self):
1290
self.assert_command_from_args(["--stop-checker", "foo"],
1293
def test_remove(self):
1294
self.assert_command_from_args(["--remove", "foo"],
1297
def test_remove_short(self):
1298
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1300
def test_checker(self):
1301
self.assert_command_from_args(["--checker", ":", "foo"],
1302
SetCheckerCmd, value_to_set=":")
1304
def test_checker_empty(self):
1305
self.assert_command_from_args(["--checker", "", "foo"],
1306
SetCheckerCmd, value_to_set="")
1308
def test_checker_short(self):
1309
self.assert_command_from_args(["-c", ":", "foo"],
1310
SetCheckerCmd, value_to_set=":")
1312
def test_timeout(self):
1313
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1315
value_to_set=300000)
1317
def test_timeout_short(self):
1318
self.assert_command_from_args(["-t", "PT5M", "foo"],
1320
value_to_set=300000)
1322
def test_extended_timeout(self):
1323
self.assert_command_from_args(["--extended-timeout", "PT15M",
1325
SetExtendedTimeoutCmd,
1326
value_to_set=900000)
1328
def test_interval(self):
1329
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1331
value_to_set=120000)
1333
def test_interval_short(self):
1334
self.assert_command_from_args(["-i", "PT2M", "foo"],
1336
value_to_set=120000)
1338
def test_approve_by_default(self):
1339
self.assert_command_from_args(["--approve-by-default", "foo"],
1340
ApproveByDefaultCmd)
1342
def test_deny_by_default(self):
1343
self.assert_command_from_args(["--deny-by-default", "foo"],
1346
def test_approval_delay(self):
1347
self.assert_command_from_args(["--approval-delay", "PT30S",
1348
"foo"], SetApprovalDelayCmd,
1351
def test_approval_duration(self):
1352
self.assert_command_from_args(["--approval-duration", "PT1S",
1353
"foo"], SetApprovalDurationCmd,
1356
def test_host(self):
1357
self.assert_command_from_args(["--host", "foo.example.org",
1359
value_to_set="foo.example.org")
1361
def test_host_short(self):
1362
self.assert_command_from_args(["-H", "foo.example.org",
1364
value_to_set="foo.example.org")
1366
def test_secret_devnull(self):
1367
self.assert_command_from_args(["--secret", os.path.devnull,
1368
"foo"], SetSecretCmd,
1371
def test_secret_tempfile(self):
1372
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1373
value = b"secret\0xyzzy\nbar"
1376
self.assert_command_from_args(["--secret", f.name,
1377
"foo"], SetSecretCmd,
1380
def test_secret_devnull_short(self):
1381
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1382
SetSecretCmd, value_to_set=b"")
1384
def test_secret_tempfile_short(self):
1385
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1386
value = b"secret\0xyzzy\nbar"
1389
self.assert_command_from_args(["-s", f.name, "foo"],
1393
def test_approve(self):
1394
self.assert_command_from_args(["--approve", "foo"],
1397
def test_approve_short(self):
1398
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1400
def test_deny(self):
1401
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1403
def test_deny_short(self):
1404
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1406
def test_dump_json(self):
1407
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1409
def test_is_enabled(self):
1410
self.assert_command_from_args(["--is-enabled", "foo"],
1413
def test_is_enabled_short(self):
1414
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1416
def test_deny_before_remove(self):
1417
options = self.parser.parse_args(["--deny", "--remove",
1419
check_option_syntax(self.parser, options)
1420
commands = commands_from_options(options)
1421
self.assertEqual(len(commands), 2)
1422
self.assertIsInstance(commands[0], DenyCmd)
1423
self.assertIsInstance(commands[1], RemoveCmd)
1425
def test_deny_before_remove_reversed(self):
1426
options = self.parser.parse_args(["--remove", "--deny",
1428
check_option_syntax(self.parser, options)
1429
commands = commands_from_options(options)
1430
self.assertEqual(len(commands), 2)
1431
self.assertIsInstance(commands[0], DenyCmd)
1432
self.assertIsInstance(commands[1], RemoveCmd)
1435
class Test_check_option_syntax(unittest.TestCase):
1436
# This mostly corresponds to the definition from has_actions() in
1437
# check_option_syntax()
1439
# The actual values set here are not that important, but we do
1440
# at least stick to the correct types, even though they are
1444
"bump_timeout": True,
1445
"start_checker": True,
1446
"stop_checker": True,
1450
"timeout": datetime.timedelta(),
1451
"extended_timeout": datetime.timedelta(),
1452
"interval": datetime.timedelta(),
1453
"approved_by_default": True,
1454
"approval_delay": datetime.timedelta(),
1455
"approval_duration": datetime.timedelta(),
1457
"secret": io.BytesIO(b"x"),
1463
self.parser = argparse.ArgumentParser()
1464
add_command_line_options(self.parser)
1466
@contextlib.contextmanager
1467
def assertParseError(self):
1468
with self.assertRaises(SystemExit) as e:
1469
with self.temporarily_suppress_stderr():
1471
# Exit code from argparse is guaranteed to be "2". Reference:
1472
# https://docs.python.org/3/library
1473
# /argparse.html#exiting-methods
1474
self.assertEqual(e.exception.code, 2)
1477
@contextlib.contextmanager
1478
def temporarily_suppress_stderr():
1479
null = os.open(os.path.devnull, os.O_RDWR)
1480
stderrcopy = os.dup(sys.stderr.fileno())
1481
os.dup2(null, sys.stderr.fileno())
1487
os.dup2(stderrcopy, sys.stderr.fileno())
1488
os.close(stderrcopy)
1490
def check_option_syntax(self, options):
1491
check_option_syntax(self.parser, options)
1493
def test_actions_requires_client_or_all(self):
1494
for action, value in self.actions.items():
1495
options = self.parser.parse_args()
1496
setattr(options, action, value)
1497
with self.assertParseError():
1498
self.check_option_syntax(options)
1500
def test_actions_conflicts_with_verbose(self):
1501
for action, value in self.actions.items():
1502
options = self.parser.parse_args()
1503
setattr(options, action, value)
1504
options.verbose = True
1505
with self.assertParseError():
1506
self.check_option_syntax(options)
1508
def test_dump_json_conflicts_with_verbose(self):
1509
options = self.parser.parse_args()
1510
options.dump_json = True
1511
options.verbose = True
1512
with self.assertParseError():
1513
self.check_option_syntax(options)
1515
def test_dump_json_conflicts_with_action(self):
1516
for action, value in self.actions.items():
1517
options = self.parser.parse_args()
1518
setattr(options, action, value)
1519
options.dump_json = True
1520
with self.assertParseError():
1521
self.check_option_syntax(options)
1523
def test_all_can_not_be_alone(self):
1524
options = self.parser.parse_args()
1526
with self.assertParseError():
1527
self.check_option_syntax(options)
1529
def test_all_is_ok_with_any_action(self):
1530
for action, value in self.actions.items():
1531
options = self.parser.parse_args()
1532
setattr(options, action, value)
1534
self.check_option_syntax(options)
1536
def test_is_enabled_fails_without_client(self):
1537
options = self.parser.parse_args()
1538
options.is_enabled = True
1539
with self.assertParseError():
1540
self.check_option_syntax(options)
1542
def test_is_enabled_works_with_one_client(self):
1543
options = self.parser.parse_args()
1544
options.is_enabled = True
1545
options.client = ["foo"]
1546
self.check_option_syntax(options)
1548
def test_is_enabled_fails_with_two_clients(self):
1549
options = self.parser.parse_args()
1550
options.is_enabled = True
1551
options.client = ["foo", "barbar"]
1552
with self.assertParseError():
1553
self.check_option_syntax(options)
1555
def test_remove_can_only_be_combined_with_action_deny(self):
1556
for action, value in self.actions.items():
1557
if action in {"remove", "deny"}:
1559
options = self.parser.parse_args()
1560
setattr(options, action, value)
1562
options.remove = True
1563
with self.assertParseError():
1564
self.check_option_syntax(options)
1568
1012
def should_only_run_tests():