292
296
"LastApprovalRequest", "ApprovalDelay",
293
297
"ApprovalDuration", "Checker", "ExtendedTimeout",
294
298
"Expires", "LastCheckerStatus")
295
def run(self, mandos, clients):
296
print(self.output(clients))
299
def run(self, clients, bus=None, mandos=None):
300
print(self.output(clients.values()))
301
def output(self, clients):
302
raise NotImplementedError()
298
304
class PropertyCmd(Command):
299
305
"""Abstract class for Actions for setting one client property"""
300
306
def run_on_one_client(self, client, properties):
301
307
"""Set the Client's D-Bus property"""
302
client.Set(client_interface, self.property, self.value_to_set,
308
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
309
client.__dbus_object_path__,
310
dbus.PROPERTIES_IFACE, client_interface,
311
self.propname, self.value_to_set
312
if not isinstance(self.value_to_set, dbus.Boolean)
313
else bool(self.value_to_set))
314
client.Set(client_interface, self.propname, self.value_to_set,
303
315
dbus_interface=dbus.PROPERTIES_IFACE)
318
raise NotImplementedError()
305
class ValueArgumentMixIn(object):
306
"""Mixin class for commands taking a value as argument"""
320
class PropertyValueCmd(PropertyCmd):
321
"""Abstract class for PropertyCmd recieving a value as argument"""
307
322
def __init__(self, value):
308
323
self.value_to_set = value
310
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
311
"""Mixin class for commands taking a value argument as
325
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
326
"""Abstract class for PropertyValueCmd taking a value argument as
327
a datetime.timedelta() but should store it as milliseconds."""
314
329
def value_to_set(self):
316
331
@value_to_set.setter
317
332
def value_to_set(self, value):
318
"""When setting, convert value to a datetime.timedelta"""
319
self._vts = string_to_delta(value).total_seconds() * 1000
333
"""When setting, convert value from a datetime.timedelta"""
334
self._vts = int(round(value.total_seconds() * 1000))
321
336
# Actual (non-abstract) command classes
423
438
class IsEnabledCmd(Command):
424
def run_on_one_client(self, client, properties):
439
def run(self, clients, bus=None, mandos=None):
440
client, properties = next(iter(clients.items()))
425
441
if self.is_enabled(client, properties):
428
444
def is_enabled(self, client, properties):
429
return bool(properties["Enabled"])
445
return properties["Enabled"]
431
447
class RemoveCmd(Command):
432
448
def run_on_one_client(self, client, properties):
449
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
450
server_path, server_interface,
451
str(client.__dbus_object_path__))
433
452
self.mandos.RemoveClient(client.__dbus_object_path__)
435
454
class ApproveCmd(Command):
436
455
def run_on_one_client(self, client, properties):
456
log.debug("D-Bus: %s:%s:%s.Approve(True)", busname,
457
client.__dbus_object_path__, client_interface)
437
458
client.Approve(dbus.Boolean(True),
438
459
dbus_interface=client_interface)
440
461
class DenyCmd(Command):
441
462
def run_on_one_client(self, client, properties):
463
log.debug("D-Bus: %s:%s:%s.Approve(False)", busname,
464
client.__dbus_object_path__, client_interface)
442
465
client.Approve(dbus.Boolean(False),
443
466
dbus_interface=client_interface)
445
468
class EnableCmd(PropertyCmd):
447
470
value_to_set = dbus.Boolean(True)
449
472
class DisableCmd(PropertyCmd):
451
474
value_to_set = dbus.Boolean(False)
453
476
class BumpTimeoutCmd(PropertyCmd):
454
property = "LastCheckedOK"
477
propname = "LastCheckedOK"
455
478
value_to_set = ""
457
480
class StartCheckerCmd(PropertyCmd):
458
property = "CheckerRunning"
481
propname = "CheckerRunning"
459
482
value_to_set = dbus.Boolean(True)
461
484
class StopCheckerCmd(PropertyCmd):
462
property = "CheckerRunning"
485
propname = "CheckerRunning"
463
486
value_to_set = dbus.Boolean(False)
465
488
class ApproveByDefaultCmd(PropertyCmd):
466
property = "ApprovedByDefault"
489
propname = "ApprovedByDefault"
467
490
value_to_set = dbus.Boolean(True)
469
492
class DenyByDefaultCmd(PropertyCmd):
470
property = "ApprovedByDefault"
493
propname = "ApprovedByDefault"
471
494
value_to_set = dbus.Boolean(False)
473
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
476
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
479
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
496
class SetCheckerCmd(PropertyValueCmd):
499
class SetHostCmd(PropertyValueCmd):
502
class SetSecretCmd(PropertyValueCmd):
481
505
def value_to_set(self):
485
509
"""When setting, read data from supplied file object"""
486
510
self._vts = value.read()
490
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
493
class SetExtendedTimeoutCmd(PropertyCmd,
494
MillisecondsValueArgumentMixIn):
495
property = "ExtendedTimeout"
497
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
498
property = "Interval"
500
class SetApprovalDelayCmd(PropertyCmd,
501
MillisecondsValueArgumentMixIn):
502
property = "ApprovalDelay"
504
class SetApprovalDurationCmd(PropertyCmd,
505
MillisecondsValueArgumentMixIn):
506
property = "ApprovalDuration"
508
def has_actions(options):
509
return any((options.enable,
511
options.bump_timeout,
512
options.start_checker,
513
options.stop_checker,
516
options.checker is not None,
517
options.timeout is not None,
518
options.extended_timeout is not None,
519
options.interval is not None,
520
options.approved_by_default is not None,
521
options.approval_delay is not None,
522
options.approval_duration is not None,
523
options.host is not None,
524
options.secret is not None,
513
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
516
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
517
propname = "ExtendedTimeout"
519
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
520
propname = "Interval"
522
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
523
propname = "ApprovalDelay"
525
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
526
propname = "ApprovalDuration"
528
528
def add_command_line_options(parser):
529
529
parser.add_argument("--version", action="version",
794
834
class MockClient(object):
795
835
def __init__(self, name, **attributes):
796
self.__dbus_object_path__ = "objpath_{}".format(name)
836
self.__dbus_object_path__ = "/clients/{}".format(name)
797
837
self.attributes = attributes
798
838
self.attributes["Name"] = name
800
def Set(self, interface, property, value, dbus_interface):
801
testcase.assertEqual(interface, client_interface)
802
testcase.assertEqual(dbus_interface,
803
dbus.PROPERTIES_IFACE)
804
self.attributes[property] = value
805
def Get(self, interface, property, dbus_interface):
806
testcase.assertEqual(interface, client_interface)
807
testcase.assertEqual(dbus_interface,
808
dbus.PROPERTIES_IFACE)
809
return self.attributes[property]
840
def Set(self, interface, propname, value, dbus_interface):
841
testcase.assertEqual(interface, client_interface)
842
testcase.assertEqual(dbus_interface,
843
dbus.PROPERTIES_IFACE)
844
self.attributes[propname] = value
845
def Get(self, interface, propname, dbus_interface):
846
testcase.assertEqual(interface, client_interface)
847
testcase.assertEqual(dbus_interface,
848
dbus.PROPERTIES_IFACE)
849
return self.attributes[propname]
810
850
def Approve(self, approve, dbus_interface):
811
851
testcase.assertEqual(dbus_interface, client_interface)
812
852
self.calls.append(("Approve", (approve,
982
1034
self.calls.append(("RemoveClient", (dbus_path,)))
983
1035
mandos = MockMandos()
984
1036
super(TestRemoveCmd, self).setUp()
985
RemoveCmd().run(mandos, self.clients)
1037
RemoveCmd().run(self.clients, self.bus, mandos)
986
1038
self.assertEqual(len(mandos.calls), 2)
987
for client in self.clients:
988
self.assertIn(("RemoveClient",
989
(client.__dbus_object_path__,)),
1039
for clientpath in self.clients:
1040
self.assertIn(("RemoveClient", (clientpath,)),
992
1043
class TestApproveCmd(TestCmd):
993
1044
def test_approve(self):
994
ApproveCmd().run(None, self.clients)
995
for client in self.clients:
1045
ApproveCmd().run(self.clients, self.bus)
1046
for clientpath in self.clients:
1047
client = self.bus.get_object(busname, clientpath)
996
1048
self.assertIn(("Approve", (True, client_interface)),
999
1051
class TestDenyCmd(TestCmd):
1000
1052
def test_deny(self):
1001
DenyCmd().run(None, self.clients)
1002
for client in self.clients:
1053
DenyCmd().run(self.clients, self.bus)
1054
for clientpath in self.clients:
1055
client = self.bus.get_object(busname, clientpath)
1003
1056
self.assertIn(("Approve", (False, client_interface)),
1006
1059
class TestEnableCmd(TestCmd):
1007
1060
def test_enable(self):
1008
for client in self.clients:
1061
for clientpath in self.clients:
1062
client = self.bus.get_object(busname, clientpath)
1009
1063
client.attributes["Enabled"] = False
1011
EnableCmd().run(None, self.clients)
1065
EnableCmd().run(self.clients, self.bus)
1013
for client in self.clients:
1067
for clientpath in self.clients:
1068
client = self.bus.get_object(busname, clientpath)
1014
1069
self.assertTrue(client.attributes["Enabled"])
1016
1071
class TestDisableCmd(TestCmd):
1017
1072
def test_disable(self):
1018
DisableCmd().run(None, self.clients)
1020
for client in self.clients:
1073
DisableCmd().run(self.clients, self.bus)
1074
for clientpath in self.clients:
1075
client = self.bus.get_object(busname, clientpath)
1021
1076
self.assertFalse(client.attributes["Enabled"])
1023
1078
class Unique(object):
1033
1088
self.values_to_set)
1034
1089
for value_to_set, value_to_get in zip(self.values_to_set,
1035
1090
values_to_get):
1036
for client in self.clients:
1037
old_value = client.attributes[self.property]
1091
for clientpath in self.clients:
1092
client = self.bus.get_object(busname, clientpath)
1093
old_value = client.attributes[self.propname]
1038
1094
self.assertNotIsInstance(old_value, Unique)
1039
client.attributes[self.property] = Unique()
1095
client.attributes[self.propname] = Unique()
1040
1096
self.run_command(value_to_set, self.clients)
1041
for client in self.clients:
1042
value = client.attributes[self.property]
1097
for clientpath in self.clients:
1098
client = self.bus.get_object(busname, clientpath)
1099
value = client.attributes[self.propname]
1043
1100
self.assertNotIsInstance(value, Unique)
1044
1101
self.assertEqual(value, value_to_get)
1045
1102
def run_command(self, value, clients):
1046
self.command().run(None, clients)
1103
self.command().run(clients, self.bus)
1048
1105
class TestBumpTimeoutCmd(TestPropertyCmd):
1049
1106
command = BumpTimeoutCmd
1050
property = "LastCheckedOK"
1107
propname = "LastCheckedOK"
1051
1108
values_to_set = [""]
1053
1110
class TestStartCheckerCmd(TestPropertyCmd):
1054
1111
command = StartCheckerCmd
1055
property = "CheckerRunning"
1112
propname = "CheckerRunning"
1056
1113
values_to_set = [dbus.Boolean(True)]
1058
1115
class TestStopCheckerCmd(TestPropertyCmd):
1059
1116
command = StopCheckerCmd
1060
property = "CheckerRunning"
1117
propname = "CheckerRunning"
1061
1118
values_to_set = [dbus.Boolean(False)]
1063
1120
class TestApproveByDefaultCmd(TestPropertyCmd):
1064
1121
command = ApproveByDefaultCmd
1065
property = "ApprovedByDefault"
1122
propname = "ApprovedByDefault"
1066
1123
values_to_set = [dbus.Boolean(True)]
1068
1125
class TestDenyByDefaultCmd(TestPropertyCmd):
1069
1126
command = DenyByDefaultCmd
1070
property = "ApprovedByDefault"
1127
propname = "ApprovedByDefault"
1071
1128
values_to_set = [dbus.Boolean(False)]
1073
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1074
"""Abstract class for tests of PropertyCmd classes using the
1075
ValueArgumentMixIn"""
1130
class TestPropertyValueCmd(TestPropertyCmd):
1131
"""Abstract class for tests of PropertyValueCmd classes"""
1076
1132
def runTest(self):
1077
if type(self) is TestValueArgumentPropertyCmd:
1133
if type(self) is TestPropertyValueCmd:
1079
return super(TestValueArgumentPropertyCmd, self).runTest()
1135
return super(TestPropertyValueCmd, self).runTest()
1080
1136
def run_command(self, value, clients):
1081
self.command(value).run(None, clients)
1137
self.command(value).run(clients, self.bus)
1083
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1139
class TestSetCheckerCmd(TestPropertyValueCmd):
1084
1140
command = SetCheckerCmd
1085
property = "Checker"
1141
propname = "Checker"
1086
1142
values_to_set = ["", ":", "fping -q -- %s"]
1088
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1144
class TestSetHostCmd(TestPropertyValueCmd):
1089
1145
command = SetHostCmd
1091
1147
values_to_set = ["192.0.2.3", "foo.example.org"]
1093
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1149
class TestSetSecretCmd(TestPropertyValueCmd):
1094
1150
command = SetSecretCmd
1096
values_to_set = [open("/dev/null", "rb"),
1152
values_to_set = [io.BytesIO(b""),
1097
1153
io.BytesIO(b"secret\0xyzzy\nbar")]
1098
1154
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1100
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1156
class TestSetTimeoutCmd(TestPropertyValueCmd):
1101
1157
command = SetTimeoutCmd
1102
property = "Timeout"
1103
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1104
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1158
propname = "Timeout"
1159
values_to_set = [datetime.timedelta(),
1160
datetime.timedelta(minutes=5),
1161
datetime.timedelta(seconds=1),
1162
datetime.timedelta(weeks=1),
1163
datetime.timedelta(weeks=52)]
1164
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1106
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1166
class TestSetExtendedTimeoutCmd(TestPropertyValueCmd):
1107
1167
command = SetExtendedTimeoutCmd
1108
property = "ExtendedTimeout"
1109
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1110
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1168
propname = "ExtendedTimeout"
1169
values_to_set = [datetime.timedelta(),
1170
datetime.timedelta(minutes=5),
1171
datetime.timedelta(seconds=1),
1172
datetime.timedelta(weeks=1),
1173
datetime.timedelta(weeks=52)]
1174
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1112
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1176
class TestSetIntervalCmd(TestPropertyValueCmd):
1113
1177
command = SetIntervalCmd
1114
property = "Interval"
1115
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1116
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1178
propname = "Interval"
1179
values_to_set = [datetime.timedelta(),
1180
datetime.timedelta(minutes=5),
1181
datetime.timedelta(seconds=1),
1182
datetime.timedelta(weeks=1),
1183
datetime.timedelta(weeks=52)]
1184
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1118
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1186
class TestSetApprovalDelayCmd(TestPropertyValueCmd):
1119
1187
command = SetApprovalDelayCmd
1120
property = "ApprovalDelay"
1121
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1122
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1188
propname = "ApprovalDelay"
1189
values_to_set = [datetime.timedelta(),
1190
datetime.timedelta(minutes=5),
1191
datetime.timedelta(seconds=1),
1192
datetime.timedelta(weeks=1),
1193
datetime.timedelta(weeks=52)]
1194
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1124
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1196
class TestSetApprovalDurationCmd(TestPropertyValueCmd):
1125
1197
command = SetApprovalDurationCmd
1126
property = "ApprovalDuration"
1127
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1128
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1198
propname = "ApprovalDuration"
1199
values_to_set = [datetime.timedelta(),
1200
datetime.timedelta(minutes=5),
1201
datetime.timedelta(seconds=1),
1202
datetime.timedelta(weeks=1),
1203
datetime.timedelta(weeks=52)]
1204
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1130
1206
class Test_command_from_options(unittest.TestCase):
1131
1207
def setUp(self):
1247
1378
self.assert_command_from_args(["--is-enabled", "foo"],
1381
def test_is_enabled_short(self):
1382
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1384
def test_deny_before_remove(self):
1385
options = self.parser.parse_args(["--deny", "--remove", "foo"])
1386
check_option_syntax(self.parser, options)
1387
commands = commands_from_options(options)
1388
self.assertEqual(len(commands), 2)
1389
self.assertIsInstance(commands[0], DenyCmd)
1390
self.assertIsInstance(commands[1], RemoveCmd)
1392
def test_deny_before_remove_reversed(self):
1393
options = self.parser.parse_args(["--remove", "--deny", "--all"])
1394
check_option_syntax(self.parser, options)
1395
commands = commands_from_options(options)
1396
self.assertEqual(len(commands), 2)
1397
self.assertIsInstance(commands[0], DenyCmd)
1398
self.assertIsInstance(commands[1], RemoveCmd)
1401
class Test_check_option_syntax(unittest.TestCase):
1402
# This mostly corresponds to the definition from has_actions() in
1403
# check_option_syntax()
1405
# The actual values set here are not that important, but we do
1406
# at least stick to the correct types, even though they are
1410
"bump_timeout": True,
1411
"start_checker": True,
1412
"stop_checker": True,
1416
"timeout": datetime.timedelta(),
1417
"extended_timeout": datetime.timedelta(),
1418
"interval": datetime.timedelta(),
1419
"approved_by_default": True,
1420
"approval_delay": datetime.timedelta(),
1421
"approval_duration": datetime.timedelta(),
1423
"secret": io.BytesIO(b"x"),
1429
self.parser = argparse.ArgumentParser()
1430
add_command_line_options(self.parser)
1432
@contextlib.contextmanager
1433
def assertParseError(self):
1434
with self.assertRaises(SystemExit) as e:
1435
with self.temporarily_suppress_stderr():
1437
# Exit code from argparse is guaranteed to be "2". Reference:
1438
# https://docs.python.org/3/library/argparse.html#exiting-methods
1439
self.assertEqual(e.exception.code, 2)
1442
@contextlib.contextmanager
1443
def temporarily_suppress_stderr():
1444
null = os.open(os.path.devnull, os.O_RDWR)
1445
stderrcopy = os.dup(sys.stderr.fileno())
1446
os.dup2(null, sys.stderr.fileno())
1452
os.dup2(stderrcopy, sys.stderr.fileno())
1453
os.close(stderrcopy)
1455
def check_option_syntax(self, options):
1456
check_option_syntax(self.parser, options)
1458
def test_actions_requires_client_or_all(self):
1459
for action, value in self.actions.items():
1460
options = self.parser.parse_args()
1461
setattr(options, action, value)
1462
with self.assertParseError():
1463
self.check_option_syntax(options)
1465
def test_actions_conflicts_with_verbose(self):
1466
for action, value in self.actions.items():
1467
options = self.parser.parse_args()
1468
setattr(options, action, value)
1469
options.verbose = True
1470
with self.assertParseError():
1471
self.check_option_syntax(options)
1473
def test_dump_json_conflicts_with_verbose(self):
1474
options = self.parser.parse_args()
1475
options.dump_json = True
1476
options.verbose = True
1477
with self.assertParseError():
1478
self.check_option_syntax(options)
1480
def test_dump_json_conflicts_with_action(self):
1481
for action, value in self.actions.items():
1482
options = self.parser.parse_args()
1483
setattr(options, action, value)
1484
options.dump_json = True
1485
with self.assertParseError():
1486
self.check_option_syntax(options)
1488
def test_all_can_not_be_alone(self):
1489
options = self.parser.parse_args()
1491
with self.assertParseError():
1492
self.check_option_syntax(options)
1494
def test_all_is_ok_with_any_action(self):
1495
for action, value in self.actions.items():
1496
options = self.parser.parse_args()
1497
setattr(options, action, value)
1499
self.check_option_syntax(options)
1501
def test_is_enabled_fails_without_client(self):
1502
options = self.parser.parse_args()
1503
options.is_enabled = True
1504
with self.assertParseError():
1505
self.check_option_syntax(options)
1507
def test_is_enabled_works_with_one_client(self):
1508
options = self.parser.parse_args()
1509
options.is_enabled = True
1510
options.client = ["foo"]
1511
self.check_option_syntax(options)
1513
def test_is_enabled_fails_with_two_clients(self):
1514
options = self.parser.parse_args()
1515
options.is_enabled = True
1516
options.client = ["foo", "barbar"]
1517
with self.assertParseError():
1518
self.check_option_syntax(options)
1520
def test_remove_can_only_be_combined_with_action_deny(self):
1521
for action, value in self.actions.items():
1522
if action in {"remove", "deny"}:
1524
options = self.parser.parse_args()
1525
setattr(options, action, value)
1527
options.remove = True
1528
with self.assertParseError():
1529
self.check_option_syntax(options)
1252
1533
def should_only_run_tests():