292
296
"LastApprovalRequest", "ApprovalDelay",
293
297
"ApprovalDuration", "Checker", "ExtendedTimeout",
294
298
"Expires", "LastCheckerStatus")
295
def run(self, mandos, clients):
296
print(self.output(clients))
299
def run(self, clients, bus=None, mandos=None):
300
print(self.output(clients.values()))
301
def output(self, clients):
302
raise NotImplementedError()
298
304
class PropertyCmd(Command):
299
305
"""Abstract class for Actions for setting one client property"""
300
306
def run_on_one_client(self, client, properties):
301
307
"""Set the Client's D-Bus property"""
302
client.Set(client_interface, self.property, self.value_to_set,
308
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
309
client.__dbus_object_path__,
310
dbus.PROPERTIES_IFACE, client_interface,
311
self.propname, self.value_to_set
312
if not isinstance(self.value_to_set, dbus.Boolean)
313
else bool(self.value_to_set))
314
client.Set(client_interface, self.propname, self.value_to_set,
303
315
dbus_interface=dbus.PROPERTIES_IFACE)
318
raise NotImplementedError()
305
class ValueArgumentMixIn(object):
306
"""Mixin class for commands taking a value as argument"""
320
class PropertyValueCmd(PropertyCmd):
321
"""Abstract class for PropertyCmd recieving a value as argument"""
307
322
def __init__(self, value):
308
323
self.value_to_set = value
310
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
311
"""Mixin class for commands taking a value argument as
325
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
326
"""Abstract class for PropertyValueCmd taking a value argument as
327
a datetime.timedelta() but should store it as milliseconds."""
314
329
def value_to_set(self):
316
331
@value_to_set.setter
317
332
def value_to_set(self, value):
318
"""When setting, convert value to a datetime.timedelta"""
333
"""When setting, convert value from a datetime.timedelta"""
319
334
self._vts = int(round(value.total_seconds() * 1000))
321
336
# Actual (non-abstract) command classes
423
438
class IsEnabledCmd(Command):
424
def run_on_one_client(self, client, properties):
439
def run(self, clients, bus=None, mandos=None):
440
client, properties = next(iter(clients.items()))
425
441
if self.is_enabled(client, properties):
428
444
def is_enabled(self, client, properties):
429
return bool(properties["Enabled"])
445
return properties["Enabled"]
431
447
class RemoveCmd(Command):
432
448
def run_on_one_client(self, client, properties):
449
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
450
server_path, server_interface,
451
str(client.__dbus_object_path__))
433
452
self.mandos.RemoveClient(client.__dbus_object_path__)
435
454
class ApproveCmd(Command):
436
455
def run_on_one_client(self, client, properties):
456
log.debug("D-Bus: %s:%s:%s.Approve(True)", busname,
457
client.__dbus_object_path__, client_interface)
437
458
client.Approve(dbus.Boolean(True),
438
459
dbus_interface=client_interface)
440
461
class DenyCmd(Command):
441
462
def run_on_one_client(self, client, properties):
463
log.debug("D-Bus: %s:%s:%s.Approve(False)", busname,
464
client.__dbus_object_path__, client_interface)
442
465
client.Approve(dbus.Boolean(False),
443
466
dbus_interface=client_interface)
445
468
class EnableCmd(PropertyCmd):
447
470
value_to_set = dbus.Boolean(True)
449
472
class DisableCmd(PropertyCmd):
451
474
value_to_set = dbus.Boolean(False)
453
476
class BumpTimeoutCmd(PropertyCmd):
454
property = "LastCheckedOK"
477
propname = "LastCheckedOK"
455
478
value_to_set = ""
457
480
class StartCheckerCmd(PropertyCmd):
458
property = "CheckerRunning"
481
propname = "CheckerRunning"
459
482
value_to_set = dbus.Boolean(True)
461
484
class StopCheckerCmd(PropertyCmd):
462
property = "CheckerRunning"
485
propname = "CheckerRunning"
463
486
value_to_set = dbus.Boolean(False)
465
488
class ApproveByDefaultCmd(PropertyCmd):
466
property = "ApprovedByDefault"
489
propname = "ApprovedByDefault"
467
490
value_to_set = dbus.Boolean(True)
469
492
class DenyByDefaultCmd(PropertyCmd):
470
property = "ApprovedByDefault"
493
propname = "ApprovedByDefault"
471
494
value_to_set = dbus.Boolean(False)
473
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
476
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
479
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
496
class SetCheckerCmd(PropertyValueCmd):
499
class SetHostCmd(PropertyValueCmd):
502
class SetSecretCmd(PropertyValueCmd):
481
505
def value_to_set(self):
485
509
"""When setting, read data from supplied file object"""
486
510
self._vts = value.read()
490
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
493
class SetExtendedTimeoutCmd(PropertyCmd,
494
MillisecondsValueArgumentMixIn):
495
property = "ExtendedTimeout"
497
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
498
property = "Interval"
500
class SetApprovalDelayCmd(PropertyCmd,
501
MillisecondsValueArgumentMixIn):
502
property = "ApprovalDelay"
504
class SetApprovalDurationCmd(PropertyCmd,
505
MillisecondsValueArgumentMixIn):
506
property = "ApprovalDuration"
513
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
516
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
517
propname = "ExtendedTimeout"
519
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
520
propname = "Interval"
522
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
523
propname = "ApprovalDelay"
525
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
526
propname = "ApprovalDuration"
508
528
def add_command_line_options(parser):
509
529
parser.add_argument("--version", action="version",
799
834
class MockClient(object):
800
835
def __init__(self, name, **attributes):
801
self.__dbus_object_path__ = "objpath_{}".format(name)
836
self.__dbus_object_path__ = "/clients/{}".format(name)
802
837
self.attributes = attributes
803
838
self.attributes["Name"] = name
805
def Set(self, interface, property, value, dbus_interface):
806
testcase.assertEqual(interface, client_interface)
807
testcase.assertEqual(dbus_interface,
808
dbus.PROPERTIES_IFACE)
809
self.attributes[property] = value
810
def Get(self, interface, property, dbus_interface):
811
testcase.assertEqual(interface, client_interface)
812
testcase.assertEqual(dbus_interface,
813
dbus.PROPERTIES_IFACE)
814
return self.attributes[property]
840
def Set(self, interface, propname, value, dbus_interface):
841
testcase.assertEqual(interface, client_interface)
842
testcase.assertEqual(dbus_interface,
843
dbus.PROPERTIES_IFACE)
844
self.attributes[propname] = value
845
def Get(self, interface, propname, dbus_interface):
846
testcase.assertEqual(interface, client_interface)
847
testcase.assertEqual(dbus_interface,
848
dbus.PROPERTIES_IFACE)
849
return self.attributes[propname]
815
850
def Approve(self, approve, dbus_interface):
816
851
testcase.assertEqual(dbus_interface, client_interface)
817
852
self.calls.append(("Approve", (approve,
866
901
LastCheckerStatus=-2)
867
902
self.clients = collections.OrderedDict(
869
(self.client, self.client.attributes),
870
(self.other_client, self.other_client.attributes),
904
("/clients/foo", self.client.attributes),
905
("/clients/barbar", self.other_client.attributes),
872
self.one_client = {self.client: self.client.attributes}
907
self.one_client = {"/clients/foo": self.client.attributes}
912
def get_object(client_bus_name, path):
913
self.assertEqual(client_bus_name, busname)
915
"/clients/foo": self.client,
916
"/clients/barbar": self.other_client,
874
920
class TestPrintTableCmd(TestCmd):
875
921
def test_normal(self):
876
output = PrintTableCmd().output(self.clients)
922
output = PrintTableCmd().output(self.clients.values())
877
923
expected_output = """
878
924
Name Enabled Timeout Last Successful Check
879
925
foo Yes 00:05:00 2019-02-03T00:00:00
987
1034
self.calls.append(("RemoveClient", (dbus_path,)))
988
1035
mandos = MockMandos()
989
1036
super(TestRemoveCmd, self).setUp()
990
RemoveCmd().run(mandos, self.clients)
1037
RemoveCmd().run(self.clients, self.bus, mandos)
991
1038
self.assertEqual(len(mandos.calls), 2)
992
for client in self.clients:
993
self.assertIn(("RemoveClient",
994
(client.__dbus_object_path__,)),
1039
for clientpath in self.clients:
1040
self.assertIn(("RemoveClient", (clientpath,)),
997
1043
class TestApproveCmd(TestCmd):
998
1044
def test_approve(self):
999
ApproveCmd().run(None, self.clients)
1000
for client in self.clients:
1045
ApproveCmd().run(self.clients, self.bus)
1046
for clientpath in self.clients:
1047
client = self.bus.get_object(busname, clientpath)
1001
1048
self.assertIn(("Approve", (True, client_interface)),
1004
1051
class TestDenyCmd(TestCmd):
1005
1052
def test_deny(self):
1006
DenyCmd().run(None, self.clients)
1007
for client in self.clients:
1053
DenyCmd().run(self.clients, self.bus)
1054
for clientpath in self.clients:
1055
client = self.bus.get_object(busname, clientpath)
1008
1056
self.assertIn(("Approve", (False, client_interface)),
1011
1059
class TestEnableCmd(TestCmd):
1012
1060
def test_enable(self):
1013
for client in self.clients:
1061
for clientpath in self.clients:
1062
client = self.bus.get_object(busname, clientpath)
1014
1063
client.attributes["Enabled"] = False
1016
EnableCmd().run(None, self.clients)
1065
EnableCmd().run(self.clients, self.bus)
1018
for client in self.clients:
1067
for clientpath in self.clients:
1068
client = self.bus.get_object(busname, clientpath)
1019
1069
self.assertTrue(client.attributes["Enabled"])
1021
1071
class TestDisableCmd(TestCmd):
1022
1072
def test_disable(self):
1023
DisableCmd().run(None, self.clients)
1025
for client in self.clients:
1073
DisableCmd().run(self.clients, self.bus)
1074
for clientpath in self.clients:
1075
client = self.bus.get_object(busname, clientpath)
1026
1076
self.assertFalse(client.attributes["Enabled"])
1028
1078
class Unique(object):
1038
1088
self.values_to_set)
1039
1089
for value_to_set, value_to_get in zip(self.values_to_set,
1040
1090
values_to_get):
1041
for client in self.clients:
1042
old_value = client.attributes[self.property]
1091
for clientpath in self.clients:
1092
client = self.bus.get_object(busname, clientpath)
1093
old_value = client.attributes[self.propname]
1043
1094
self.assertNotIsInstance(old_value, Unique)
1044
client.attributes[self.property] = Unique()
1095
client.attributes[self.propname] = Unique()
1045
1096
self.run_command(value_to_set, self.clients)
1046
for client in self.clients:
1047
value = client.attributes[self.property]
1097
for clientpath in self.clients:
1098
client = self.bus.get_object(busname, clientpath)
1099
value = client.attributes[self.propname]
1048
1100
self.assertNotIsInstance(value, Unique)
1049
1101
self.assertEqual(value, value_to_get)
1050
1102
def run_command(self, value, clients):
1051
self.command().run(None, clients)
1103
self.command().run(clients, self.bus)
1053
1105
class TestBumpTimeoutCmd(TestPropertyCmd):
1054
1106
command = BumpTimeoutCmd
1055
property = "LastCheckedOK"
1107
propname = "LastCheckedOK"
1056
1108
values_to_set = [""]
1058
1110
class TestStartCheckerCmd(TestPropertyCmd):
1059
1111
command = StartCheckerCmd
1060
property = "CheckerRunning"
1112
propname = "CheckerRunning"
1061
1113
values_to_set = [dbus.Boolean(True)]
1063
1115
class TestStopCheckerCmd(TestPropertyCmd):
1064
1116
command = StopCheckerCmd
1065
property = "CheckerRunning"
1117
propname = "CheckerRunning"
1066
1118
values_to_set = [dbus.Boolean(False)]
1068
1120
class TestApproveByDefaultCmd(TestPropertyCmd):
1069
1121
command = ApproveByDefaultCmd
1070
property = "ApprovedByDefault"
1122
propname = "ApprovedByDefault"
1071
1123
values_to_set = [dbus.Boolean(True)]
1073
1125
class TestDenyByDefaultCmd(TestPropertyCmd):
1074
1126
command = DenyByDefaultCmd
1075
property = "ApprovedByDefault"
1127
propname = "ApprovedByDefault"
1076
1128
values_to_set = [dbus.Boolean(False)]
1078
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1079
"""Abstract class for tests of PropertyCmd classes using the
1080
ValueArgumentMixIn"""
1130
class TestPropertyValueCmd(TestPropertyCmd):
1131
"""Abstract class for tests of PropertyValueCmd classes"""
1081
1132
def runTest(self):
1082
if type(self) is TestValueArgumentPropertyCmd:
1133
if type(self) is TestPropertyValueCmd:
1084
return super(TestValueArgumentPropertyCmd, self).runTest()
1135
return super(TestPropertyValueCmd, self).runTest()
1085
1136
def run_command(self, value, clients):
1086
self.command(value).run(None, clients)
1137
self.command(value).run(clients, self.bus)
1088
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1139
class TestSetCheckerCmd(TestPropertyValueCmd):
1089
1140
command = SetCheckerCmd
1090
property = "Checker"
1141
propname = "Checker"
1091
1142
values_to_set = ["", ":", "fping -q -- %s"]
1093
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1144
class TestSetHostCmd(TestPropertyValueCmd):
1094
1145
command = SetHostCmd
1096
1147
values_to_set = ["192.0.2.3", "foo.example.org"]
1098
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1149
class TestSetSecretCmd(TestPropertyValueCmd):
1099
1150
command = SetSecretCmd
1101
values_to_set = [open("/dev/null", "rb"),
1152
values_to_set = [io.BytesIO(b""),
1102
1153
io.BytesIO(b"secret\0xyzzy\nbar")]
1103
1154
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1105
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1156
class TestSetTimeoutCmd(TestPropertyValueCmd):
1106
1157
command = SetTimeoutCmd
1107
property = "Timeout"
1158
propname = "Timeout"
1108
1159
values_to_set = [datetime.timedelta(),
1109
1160
datetime.timedelta(minutes=5),
1110
1161
datetime.timedelta(seconds=1),
1330
1381
def test_is_enabled_short(self):
1331
1382
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1384
def test_deny_before_remove(self):
1385
options = self.parser.parse_args(["--deny", "--remove", "foo"])
1386
check_option_syntax(self.parser, options)
1387
commands = commands_from_options(options)
1388
self.assertEqual(len(commands), 2)
1389
self.assertIsInstance(commands[0], DenyCmd)
1390
self.assertIsInstance(commands[1], RemoveCmd)
1392
def test_deny_before_remove_reversed(self):
1393
options = self.parser.parse_args(["--remove", "--deny", "--all"])
1394
check_option_syntax(self.parser, options)
1395
commands = commands_from_options(options)
1396
self.assertEqual(len(commands), 2)
1397
self.assertIsInstance(commands[0], DenyCmd)
1398
self.assertIsInstance(commands[1], RemoveCmd)
1401
class Test_check_option_syntax(unittest.TestCase):
1402
# This mostly corresponds to the definition from has_actions() in
1403
# check_option_syntax()
1405
# The actual values set here are not that important, but we do
1406
# at least stick to the correct types, even though they are
1410
"bump_timeout": True,
1411
"start_checker": True,
1412
"stop_checker": True,
1416
"timeout": datetime.timedelta(),
1417
"extended_timeout": datetime.timedelta(),
1418
"interval": datetime.timedelta(),
1419
"approved_by_default": True,
1420
"approval_delay": datetime.timedelta(),
1421
"approval_duration": datetime.timedelta(),
1423
"secret": io.BytesIO(b"x"),
1429
self.parser = argparse.ArgumentParser()
1430
add_command_line_options(self.parser)
1432
@contextlib.contextmanager
1433
def assertParseError(self):
1434
with self.assertRaises(SystemExit) as e:
1435
with self.temporarily_suppress_stderr():
1437
# Exit code from argparse is guaranteed to be "2". Reference:
1438
# https://docs.python.org/3/library/argparse.html#exiting-methods
1439
self.assertEqual(e.exception.code, 2)
1442
@contextlib.contextmanager
1443
def temporarily_suppress_stderr():
1444
null = os.open(os.path.devnull, os.O_RDWR)
1445
stderrcopy = os.dup(sys.stderr.fileno())
1446
os.dup2(null, sys.stderr.fileno())
1452
os.dup2(stderrcopy, sys.stderr.fileno())
1453
os.close(stderrcopy)
1455
def check_option_syntax(self, options):
1456
check_option_syntax(self.parser, options)
1458
def test_actions_requires_client_or_all(self):
1459
for action, value in self.actions.items():
1460
options = self.parser.parse_args()
1461
setattr(options, action, value)
1462
with self.assertParseError():
1463
self.check_option_syntax(options)
1465
def test_actions_conflicts_with_verbose(self):
1466
for action, value in self.actions.items():
1467
options = self.parser.parse_args()
1468
setattr(options, action, value)
1469
options.verbose = True
1470
with self.assertParseError():
1471
self.check_option_syntax(options)
1473
def test_dump_json_conflicts_with_verbose(self):
1474
options = self.parser.parse_args()
1475
options.dump_json = True
1476
options.verbose = True
1477
with self.assertParseError():
1478
self.check_option_syntax(options)
1480
def test_dump_json_conflicts_with_action(self):
1481
for action, value in self.actions.items():
1482
options = self.parser.parse_args()
1483
setattr(options, action, value)
1484
options.dump_json = True
1485
with self.assertParseError():
1486
self.check_option_syntax(options)
1488
def test_all_can_not_be_alone(self):
1489
options = self.parser.parse_args()
1491
with self.assertParseError():
1492
self.check_option_syntax(options)
1494
def test_all_is_ok_with_any_action(self):
1495
for action, value in self.actions.items():
1496
options = self.parser.parse_args()
1497
setattr(options, action, value)
1499
self.check_option_syntax(options)
1501
def test_is_enabled_fails_without_client(self):
1502
options = self.parser.parse_args()
1503
options.is_enabled = True
1504
with self.assertParseError():
1505
self.check_option_syntax(options)
1507
def test_is_enabled_works_with_one_client(self):
1508
options = self.parser.parse_args()
1509
options.is_enabled = True
1510
options.client = ["foo"]
1511
self.check_option_syntax(options)
1513
def test_is_enabled_fails_with_two_clients(self):
1514
options = self.parser.parse_args()
1515
options.is_enabled = True
1516
options.client = ["foo", "barbar"]
1517
with self.assertParseError():
1518
self.check_option_syntax(options)
1520
def test_remove_can_only_be_combined_with_action_deny(self):
1521
for action, value in self.actions.items():
1522
if action in {"remove", "deny"}:
1524
options = self.parser.parse_args()
1525
setattr(options, action, value)
1527
options.remove = True
1528
with self.assertParseError():
1529
self.check_option_syntax(options)
1335
1533
def should_only_run_tests():