441
421
class IsEnabledCmd(Command):
442
def run(self, clients, bus=None, mandos=None):
443
client, properties = next(iter(clients.items()))
422
def run_on_one_client(self, client, properties):
444
423
if self.is_enabled(client, properties):
447
426
def is_enabled(self, client, properties):
448
return properties["Enabled"]
427
return bool(properties["Enabled"])
450
429
class RemoveCmd(Command):
451
430
def run_on_one_client(self, client, properties):
452
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname,
453
server_dbus_path, server_dbus_interface,
454
str(client.__dbus_object_path__))
455
431
self.mandos.RemoveClient(client.__dbus_object_path__)
457
433
class ApproveCmd(Command):
458
434
def run_on_one_client(self, client, properties):
459
log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
460
client.__dbus_object_path__, client_dbus_interface)
461
435
client.Approve(dbus.Boolean(True),
462
dbus_interface=client_dbus_interface)
436
dbus_interface=client_interface)
464
438
class DenyCmd(Command):
465
439
def run_on_one_client(self, client, properties):
466
log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
467
client.__dbus_object_path__, client_dbus_interface)
468
440
client.Approve(dbus.Boolean(False),
469
dbus_interface=client_dbus_interface)
441
dbus_interface=client_interface)
471
443
class EnableCmd(PropertyCmd):
473
445
value_to_set = dbus.Boolean(True)
475
447
class DisableCmd(PropertyCmd):
477
449
value_to_set = dbus.Boolean(False)
479
451
class BumpTimeoutCmd(PropertyCmd):
480
propname = "LastCheckedOK"
452
property = "LastCheckedOK"
481
453
value_to_set = ""
483
455
class StartCheckerCmd(PropertyCmd):
484
propname = "CheckerRunning"
456
property = "CheckerRunning"
485
457
value_to_set = dbus.Boolean(True)
487
459
class StopCheckerCmd(PropertyCmd):
488
propname = "CheckerRunning"
460
property = "CheckerRunning"
489
461
value_to_set = dbus.Boolean(False)
491
463
class ApproveByDefaultCmd(PropertyCmd):
492
propname = "ApprovedByDefault"
464
property = "ApprovedByDefault"
493
465
value_to_set = dbus.Boolean(True)
495
467
class DenyByDefaultCmd(PropertyCmd):
496
propname = "ApprovedByDefault"
468
property = "ApprovedByDefault"
497
469
value_to_set = dbus.Boolean(False)
499
class SetCheckerCmd(PropertyValueCmd):
502
class SetHostCmd(PropertyValueCmd):
505
class SetSecretCmd(PropertyValueCmd):
508
def value_to_set(self):
511
def value_to_set(self, value):
512
"""When setting, read data from supplied file object"""
513
self._vts = value.read()
516
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
519
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
520
propname = "ExtendedTimeout"
522
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
523
propname = "Interval"
525
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
526
propname = "ApprovalDelay"
528
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
529
propname = "ApprovalDuration"
471
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
474
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
477
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
480
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
483
class SetExtendedTimeoutCmd(PropertyCmd,
484
MillisecondsValueArgumentMixIn):
485
property = "ExtendedTimeout"
487
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
488
property = "Interval"
490
class SetApprovalDelayCmd(PropertyCmd,
491
MillisecondsValueArgumentMixIn):
492
property = "ApprovalDelay"
494
class SetApprovalDurationCmd(PropertyCmd,
495
MillisecondsValueArgumentMixIn):
496
property = "ApprovalDuration"
498
def has_actions(options):
499
return any((options.enable,
501
options.bump_timeout,
502
options.start_checker,
503
options.stop_checker,
506
options.checker is not None,
507
options.timeout is not None,
508
options.extended_timeout is not None,
509
options.interval is not None,
510
options.approved_by_default is not None,
511
options.approval_delay is not None,
512
options.approval_duration is not None,
513
options.host is not None,
514
options.secret is not None,
531
518
def add_command_line_options(parser):
532
519
parser.add_argument("--version", action="version",
1126
969
self.calls.append(("RemoveClient", (dbus_path,)))
1127
970
mandos = MockMandos()
1128
971
super(TestRemoveCmd, self).setUp()
1129
RemoveCmd().run(self.clients, self.bus, mandos)
972
RemoveCmd().run(mandos, self.clients)
1130
973
self.assertEqual(len(mandos.calls), 2)
1131
for clientpath in self.clients:
1132
self.assertIn(("RemoveClient", (clientpath,)),
974
for client in self.clients:
975
self.assertIn(("RemoveClient",
976
(client.__dbus_object_path__,)),
1135
979
class TestApproveCmd(TestCmd):
1136
980
def test_approve(self):
1137
ApproveCmd().run(self.clients, self.bus)
1138
for clientpath in self.clients:
1139
client = self.bus.get_object(dbus_busname, clientpath)
1140
self.assertIn(("Approve", (True, client_dbus_interface)),
981
ApproveCmd().run(None, self.clients)
982
for client in self.clients:
983
self.assertIn(("Approve", (True, client_interface)),
1143
986
class TestDenyCmd(TestCmd):
1144
987
def test_deny(self):
1145
DenyCmd().run(self.clients, self.bus)
1146
for clientpath in self.clients:
1147
client = self.bus.get_object(dbus_busname, clientpath)
1148
self.assertIn(("Approve", (False, client_dbus_interface)),
988
DenyCmd().run(None, self.clients)
989
for client in self.clients:
990
self.assertIn(("Approve", (False, client_interface)),
1151
class TestEnableCmd(TestCmd):
1152
def test_enable(self):
1153
for clientpath in self.clients:
1154
client = self.bus.get_object(dbus_busname, clientpath)
1155
client.attributes["Enabled"] = False
1157
EnableCmd().run(self.clients, self.bus)
1159
for clientpath in self.clients:
1160
client = self.bus.get_object(dbus_busname, clientpath)
1161
self.assertTrue(client.attributes["Enabled"])
1163
class TestDisableCmd(TestCmd):
1164
def test_disable(self):
1165
DisableCmd().run(self.clients, self.bus)
1166
for clientpath in self.clients:
1167
client = self.bus.get_object(dbus_busname, clientpath)
1168
self.assertFalse(client.attributes["Enabled"])
1170
class Unique(object):
1171
"""Class for objects which exist only to be unique objects, since
1172
unittest.mock.sentinel only exists in Python 3.3"""
1174
class TestPropertyCmd(TestCmd):
1175
"""Abstract class for tests of PropertyCmd classes"""
1177
if not hasattr(self, "command"):
1179
values_to_get = getattr(self, "values_to_get",
1181
for value_to_set, value_to_get in zip(self.values_to_set,
1183
for clientpath in self.clients:
1184
client = self.bus.get_object(dbus_busname, clientpath)
1185
old_value = client.attributes[self.propname]
1186
self.assertNotIsInstance(old_value, Unique)
1187
client.attributes[self.propname] = Unique()
1188
self.run_command(value_to_set, self.clients)
1189
for clientpath in self.clients:
1190
client = self.bus.get_object(dbus_busname, clientpath)
1191
value = client.attributes[self.propname]
1192
self.assertNotIsInstance(value, Unique)
1193
self.assertEqual(value, value_to_get)
1194
def run_command(self, value, clients):
1195
self.command().run(clients, self.bus)
1197
class TestBumpTimeoutCmd(TestPropertyCmd):
1198
command = BumpTimeoutCmd
1199
propname = "LastCheckedOK"
1200
values_to_set = [""]
1202
class TestStartCheckerCmd(TestPropertyCmd):
1203
command = StartCheckerCmd
1204
propname = "CheckerRunning"
1205
values_to_set = [dbus.Boolean(True)]
1207
class TestStopCheckerCmd(TestPropertyCmd):
1208
command = StopCheckerCmd
1209
propname = "CheckerRunning"
1210
values_to_set = [dbus.Boolean(False)]
1212
class TestApproveByDefaultCmd(TestPropertyCmd):
1213
command = ApproveByDefaultCmd
1214
propname = "ApprovedByDefault"
1215
values_to_set = [dbus.Boolean(True)]
1217
class TestDenyByDefaultCmd(TestPropertyCmd):
1218
command = DenyByDefaultCmd
1219
propname = "ApprovedByDefault"
1220
values_to_set = [dbus.Boolean(False)]
1222
class TestPropertyValueCmd(TestPropertyCmd):
1223
"""Abstract class for tests of PropertyValueCmd classes"""
1225
if type(self) is TestPropertyValueCmd:
1227
return super(TestPropertyValueCmd, self).runTest()
1228
def run_command(self, value, clients):
1229
self.command(value).run(clients, self.bus)
1231
class TestSetCheckerCmd(TestPropertyValueCmd):
1232
command = SetCheckerCmd
1233
propname = "Checker"
1234
values_to_set = ["", ":", "fping -q -- %s"]
1236
class TestSetHostCmd(TestPropertyValueCmd):
1237
command = SetHostCmd
1239
values_to_set = ["192.0.2.3", "foo.example.org"]
1241
class TestSetSecretCmd(TestPropertyValueCmd):
1242
command = SetSecretCmd
1244
values_to_set = [io.BytesIO(b""),
1245
io.BytesIO(b"secret\0xyzzy\nbar")]
1246
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1248
class TestSetTimeoutCmd(TestPropertyValueCmd):
1249
command = SetTimeoutCmd
1250
propname = "Timeout"
1251
values_to_set = [datetime.timedelta(),
1252
datetime.timedelta(minutes=5),
1253
datetime.timedelta(seconds=1),
1254
datetime.timedelta(weeks=1),
1255
datetime.timedelta(weeks=52)]
1256
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1258
class TestSetExtendedTimeoutCmd(TestPropertyValueCmd):
1259
command = SetExtendedTimeoutCmd
1260
propname = "ExtendedTimeout"
1261
values_to_set = [datetime.timedelta(),
1262
datetime.timedelta(minutes=5),
1263
datetime.timedelta(seconds=1),
1264
datetime.timedelta(weeks=1),
1265
datetime.timedelta(weeks=52)]
1266
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1268
class TestSetIntervalCmd(TestPropertyValueCmd):
1269
command = SetIntervalCmd
1270
propname = "Interval"
1271
values_to_set = [datetime.timedelta(),
1272
datetime.timedelta(minutes=5),
1273
datetime.timedelta(seconds=1),
1274
datetime.timedelta(weeks=1),
1275
datetime.timedelta(weeks=52)]
1276
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1278
class TestSetApprovalDelayCmd(TestPropertyValueCmd):
1279
command = SetApprovalDelayCmd
1280
propname = "ApprovalDelay"
1281
values_to_set = [datetime.timedelta(),
1282
datetime.timedelta(minutes=5),
1283
datetime.timedelta(seconds=1),
1284
datetime.timedelta(weeks=1),
1285
datetime.timedelta(weeks=52)]
1286
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1288
class TestSetApprovalDurationCmd(TestPropertyValueCmd):
1289
command = SetApprovalDurationCmd
1290
propname = "ApprovalDuration"
1291
values_to_set = [datetime.timedelta(),
1292
datetime.timedelta(minutes=5),
1293
datetime.timedelta(seconds=1),
1294
datetime.timedelta(weeks=1),
1295
datetime.timedelta(weeks=52)]
1296
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1298
class Test_command_from_options(unittest.TestCase):
1300
self.parser = argparse.ArgumentParser()
1301
add_command_line_options(self.parser)
1302
def assert_command_from_args(self, args, command_cls,
1304
"""Assert that parsing ARGS should result in an instance of
1305
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1306
options = self.parser.parse_args(args)
1307
check_option_syntax(self.parser, options)
1308
commands = commands_from_options(options)
1309
self.assertEqual(len(commands), 1)
1310
command = commands[0]
1311
self.assertIsInstance(command, command_cls)
1312
for key, value in cmd_attrs.items():
1313
self.assertEqual(getattr(command, key), value)
1314
def test_print_table(self):
1315
self.assert_command_from_args([], PrintTableCmd,
1318
def test_print_table_verbose(self):
1319
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1322
def test_print_table_verbose_short(self):
1323
self.assert_command_from_args(["-v"], PrintTableCmd,
1326
def test_enable(self):
1327
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1329
def test_enable_short(self):
1330
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1332
def test_disable(self):
1333
self.assert_command_from_args(["--disable", "foo"],
1336
def test_disable_short(self):
1337
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1339
def test_bump_timeout(self):
1340
self.assert_command_from_args(["--bump-timeout", "foo"],
1343
def test_bump_timeout_short(self):
1344
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1346
def test_start_checker(self):
1347
self.assert_command_from_args(["--start-checker", "foo"],
1350
def test_stop_checker(self):
1351
self.assert_command_from_args(["--stop-checker", "foo"],
1354
def test_remove(self):
1355
self.assert_command_from_args(["--remove", "foo"],
1358
def test_remove_short(self):
1359
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1361
def test_checker(self):
1362
self.assert_command_from_args(["--checker", ":", "foo"],
1363
SetCheckerCmd, value_to_set=":")
1365
def test_checker_empty(self):
1366
self.assert_command_from_args(["--checker", "", "foo"],
1367
SetCheckerCmd, value_to_set="")
1369
def test_checker_short(self):
1370
self.assert_command_from_args(["-c", ":", "foo"],
1371
SetCheckerCmd, value_to_set=":")
1373
def test_timeout(self):
1374
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1376
value_to_set=300000)
1378
def test_timeout_short(self):
1379
self.assert_command_from_args(["-t", "PT5M", "foo"],
1381
value_to_set=300000)
1383
def test_extended_timeout(self):
1384
self.assert_command_from_args(["--extended-timeout", "PT15M",
1386
SetExtendedTimeoutCmd,
1387
value_to_set=900000)
1389
def test_interval(self):
1390
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1392
value_to_set=120000)
1394
def test_interval_short(self):
1395
self.assert_command_from_args(["-i", "PT2M", "foo"],
1397
value_to_set=120000)
1399
def test_approve_by_default(self):
1400
self.assert_command_from_args(["--approve-by-default", "foo"],
1401
ApproveByDefaultCmd)
1403
def test_deny_by_default(self):
1404
self.assert_command_from_args(["--deny-by-default", "foo"],
1407
def test_approval_delay(self):
1408
self.assert_command_from_args(["--approval-delay", "PT30S",
1409
"foo"], SetApprovalDelayCmd,
1412
def test_approval_duration(self):
1413
self.assert_command_from_args(["--approval-duration", "PT1S",
1414
"foo"], SetApprovalDurationCmd,
1417
def test_host(self):
1418
self.assert_command_from_args(["--host", "foo.example.org",
1420
value_to_set="foo.example.org")
1422
def test_host_short(self):
1423
self.assert_command_from_args(["-H", "foo.example.org",
1425
value_to_set="foo.example.org")
1427
def test_secret_devnull(self):
1428
self.assert_command_from_args(["--secret", os.path.devnull,
1429
"foo"], SetSecretCmd,
1432
def test_secret_tempfile(self):
1433
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1434
value = b"secret\0xyzzy\nbar"
1437
self.assert_command_from_args(["--secret", f.name,
1438
"foo"], SetSecretCmd,
1441
def test_secret_devnull_short(self):
1442
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1443
SetSecretCmd, value_to_set=b"")
1445
def test_secret_tempfile_short(self):
1446
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1447
value = b"secret\0xyzzy\nbar"
1450
self.assert_command_from_args(["-s", f.name, "foo"],
1454
def test_approve(self):
1455
self.assert_command_from_args(["--approve", "foo"],
1458
def test_approve_short(self):
1459
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1461
def test_deny(self):
1462
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1464
def test_deny_short(self):
1465
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1467
def test_dump_json(self):
1468
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1470
def test_is_enabled(self):
1471
self.assert_command_from_args(["--is-enabled", "foo"],
1474
def test_is_enabled_short(self):
1475
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1477
def test_deny_before_remove(self):
1478
options = self.parser.parse_args(["--deny", "--remove",
1480
check_option_syntax(self.parser, options)
1481
commands = commands_from_options(options)
1482
self.assertEqual(len(commands), 2)
1483
self.assertIsInstance(commands[0], DenyCmd)
1484
self.assertIsInstance(commands[1], RemoveCmd)
1486
def test_deny_before_remove_reversed(self):
1487
options = self.parser.parse_args(["--remove", "--deny",
1489
check_option_syntax(self.parser, options)
1490
commands = commands_from_options(options)
1491
self.assertEqual(len(commands), 2)
1492
self.assertIsInstance(commands[0], DenyCmd)
1493
self.assertIsInstance(commands[1], RemoveCmd)
1496
class Test_check_option_syntax(unittest.TestCase):
1497
# This mostly corresponds to the definition from has_actions() in
1498
# check_option_syntax()
1500
# The actual values set here are not that important, but we do
1501
# at least stick to the correct types, even though they are
1505
"bump_timeout": True,
1506
"start_checker": True,
1507
"stop_checker": True,
1511
"timeout": datetime.timedelta(),
1512
"extended_timeout": datetime.timedelta(),
1513
"interval": datetime.timedelta(),
1514
"approved_by_default": True,
1515
"approval_delay": datetime.timedelta(),
1516
"approval_duration": datetime.timedelta(),
1518
"secret": io.BytesIO(b"x"),
1524
self.parser = argparse.ArgumentParser()
1525
add_command_line_options(self.parser)
1527
@contextlib.contextmanager
1528
def assertParseError(self):
1529
with self.assertRaises(SystemExit) as e:
1530
with self.temporarily_suppress_stderr():
1532
# Exit code from argparse is guaranteed to be "2". Reference:
1533
# https://docs.python.org/3/library
1534
# /argparse.html#exiting-methods
1535
self.assertEqual(e.exception.code, 2)
1538
@contextlib.contextmanager
1539
def temporarily_suppress_stderr():
1540
null = os.open(os.path.devnull, os.O_RDWR)
1541
stderrcopy = os.dup(sys.stderr.fileno())
1542
os.dup2(null, sys.stderr.fileno())
1548
os.dup2(stderrcopy, sys.stderr.fileno())
1549
os.close(stderrcopy)
1551
def check_option_syntax(self, options):
1552
check_option_syntax(self.parser, options)
1554
def test_actions_requires_client_or_all(self):
1555
for action, value in self.actions.items():
1556
options = self.parser.parse_args()
1557
setattr(options, action, value)
1558
with self.assertParseError():
1559
self.check_option_syntax(options)
1561
def test_actions_conflicts_with_verbose(self):
1562
for action, value in self.actions.items():
1563
options = self.parser.parse_args()
1564
setattr(options, action, value)
1565
options.verbose = True
1566
with self.assertParseError():
1567
self.check_option_syntax(options)
1569
def test_dump_json_conflicts_with_verbose(self):
1570
options = self.parser.parse_args()
1571
options.dump_json = True
1572
options.verbose = True
1573
with self.assertParseError():
1574
self.check_option_syntax(options)
1576
def test_dump_json_conflicts_with_action(self):
1577
for action, value in self.actions.items():
1578
options = self.parser.parse_args()
1579
setattr(options, action, value)
1580
options.dump_json = True
1581
with self.assertParseError():
1582
self.check_option_syntax(options)
1584
def test_all_can_not_be_alone(self):
1585
options = self.parser.parse_args()
1587
with self.assertParseError():
1588
self.check_option_syntax(options)
1590
def test_all_is_ok_with_any_action(self):
1591
for action, value in self.actions.items():
1592
options = self.parser.parse_args()
1593
setattr(options, action, value)
1595
self.check_option_syntax(options)
1597
def test_is_enabled_fails_without_client(self):
1598
options = self.parser.parse_args()
1599
options.is_enabled = True
1600
with self.assertParseError():
1601
self.check_option_syntax(options)
1603
def test_is_enabled_works_with_one_client(self):
1604
options = self.parser.parse_args()
1605
options.is_enabled = True
1606
options.client = ["foo"]
1607
self.check_option_syntax(options)
1609
def test_is_enabled_fails_with_two_clients(self):
1610
options = self.parser.parse_args()
1611
options.is_enabled = True
1612
options.client = ["foo", "barbar"]
1613
with self.assertParseError():
1614
self.check_option_syntax(options)
1616
def test_remove_can_only_be_combined_with_action_deny(self):
1617
for action, value in self.actions.items():
1618
if action in {"remove", "deny"}:
1620
options = self.parser.parse_args()
1621
setattr(options, action, value)
1623
options.remove = True
1624
with self.assertParseError():
1625
self.check_option_syntax(options)
1629
995
def should_only_run_tests():