297
292
"LastApprovalRequest", "ApprovalDelay",
298
293
"ApprovalDuration", "Checker", "ExtendedTimeout",
299
294
"Expires", "LastCheckerStatus")
300
def run(self, clients, bus=None, mandos=None):
301
print(self.output(clients.values()))
302
def output(self, clients):
303
raise NotImplementedError()
295
def run(self, mandos, clients):
296
print(self.output(clients))
305
298
class PropertyCmd(Command):
306
299
"""Abstract class for Actions for setting one client property"""
307
300
def run_on_one_client(self, client, properties):
308
301
"""Set the Client's D-Bus property"""
309
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
310
client.__dbus_object_path__,
311
dbus.PROPERTIES_IFACE, client_dbus_interface,
312
self.propname, self.value_to_set
313
if not isinstance(self.value_to_set, dbus.Boolean)
314
else bool(self.value_to_set))
315
client.Set(client_dbus_interface, self.propname,
302
client.Set(client_interface, self.property, self.value_to_set,
317
303
dbus_interface=dbus.PROPERTIES_IFACE)
320
raise NotImplementedError()
322
class PropertyValueCmd(PropertyCmd):
323
"""Abstract class for PropertyCmd recieving a value as argument"""
305
class ValueArgumentMixIn(object):
306
"""Mixin class for commands taking a value as argument"""
324
307
def __init__(self, value):
325
308
self.value_to_set = value
327
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
328
"""Abstract class for PropertyValueCmd taking a value argument as
329
a datetime.timedelta() but should store it as milliseconds."""
310
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
311
"""Mixin class for commands taking a value argument as
331
314
def value_to_set(self):
333
316
@value_to_set.setter
334
317
def value_to_set(self, value):
335
"""When setting, convert value from a datetime.timedelta"""
318
"""When setting, convert value to a datetime.timedelta"""
336
319
self._vts = int(round(value.total_seconds() * 1000))
338
321
# Actual (non-abstract) command classes
441
423
class IsEnabledCmd(Command):
442
def run(self, clients, bus=None, mandos=None):
443
client, properties = next(iter(clients.items()))
424
def run_on_one_client(self, client, properties):
444
425
if self.is_enabled(client, properties):
447
428
def is_enabled(self, client, properties):
448
return properties["Enabled"]
429
return bool(properties["Enabled"])
450
431
class RemoveCmd(Command):
451
432
def run_on_one_client(self, client, properties):
452
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname,
453
server_dbus_path, server_dbus_interface,
454
str(client.__dbus_object_path__))
455
433
self.mandos.RemoveClient(client.__dbus_object_path__)
457
435
class ApproveCmd(Command):
458
436
def run_on_one_client(self, client, properties):
459
log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
460
client.__dbus_object_path__, client_dbus_interface)
461
437
client.Approve(dbus.Boolean(True),
462
dbus_interface=client_dbus_interface)
438
dbus_interface=client_interface)
464
440
class DenyCmd(Command):
465
441
def run_on_one_client(self, client, properties):
466
log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
467
client.__dbus_object_path__, client_dbus_interface)
468
442
client.Approve(dbus.Boolean(False),
469
dbus_interface=client_dbus_interface)
443
dbus_interface=client_interface)
471
445
class EnableCmd(PropertyCmd):
473
447
value_to_set = dbus.Boolean(True)
475
449
class DisableCmd(PropertyCmd):
477
451
value_to_set = dbus.Boolean(False)
479
453
class BumpTimeoutCmd(PropertyCmd):
480
propname = "LastCheckedOK"
454
property = "LastCheckedOK"
481
455
value_to_set = ""
483
457
class StartCheckerCmd(PropertyCmd):
484
propname = "CheckerRunning"
458
property = "CheckerRunning"
485
459
value_to_set = dbus.Boolean(True)
487
461
class StopCheckerCmd(PropertyCmd):
488
propname = "CheckerRunning"
462
property = "CheckerRunning"
489
463
value_to_set = dbus.Boolean(False)
491
465
class ApproveByDefaultCmd(PropertyCmd):
492
propname = "ApprovedByDefault"
466
property = "ApprovedByDefault"
493
467
value_to_set = dbus.Boolean(True)
495
469
class DenyByDefaultCmd(PropertyCmd):
496
propname = "ApprovedByDefault"
470
property = "ApprovedByDefault"
497
471
value_to_set = dbus.Boolean(False)
499
class SetCheckerCmd(PropertyValueCmd):
502
class SetHostCmd(PropertyValueCmd):
505
class SetSecretCmd(PropertyValueCmd):
473
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
476
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
479
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
508
481
def value_to_set(self):
512
485
"""When setting, read data from supplied file object"""
513
486
self._vts = value.read()
516
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
519
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
520
propname = "ExtendedTimeout"
522
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
523
propname = "Interval"
525
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
526
propname = "ApprovalDelay"
528
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
529
propname = "ApprovalDuration"
490
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
493
class SetExtendedTimeoutCmd(PropertyCmd,
494
MillisecondsValueArgumentMixIn):
495
property = "ExtendedTimeout"
497
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
498
property = "Interval"
500
class SetApprovalDelayCmd(PropertyCmd,
501
MillisecondsValueArgumentMixIn):
502
property = "ApprovalDelay"
504
class SetApprovalDurationCmd(PropertyCmd,
505
MillisecondsValueArgumentMixIn):
506
property = "ApprovalDuration"
531
508
def add_command_line_options(parser):
532
509
parser.add_argument("--version", action="version",
838
799
class MockClient(object):
839
800
def __init__(self, name, **attributes):
840
self.__dbus_object_path__ = "/clients/{}".format(name)
801
self.__dbus_object_path__ = "objpath_{}".format(name)
841
802
self.attributes = attributes
842
803
self.attributes["Name"] = name
844
def Set(self, interface, propname, value, dbus_interface):
845
testcase.assertEqual(interface, client_dbus_interface)
846
testcase.assertEqual(dbus_interface,
847
dbus.PROPERTIES_IFACE)
848
self.attributes[propname] = value
849
def Get(self, interface, propname, dbus_interface):
850
testcase.assertEqual(interface, client_dbus_interface)
851
testcase.assertEqual(dbus_interface,
852
dbus.PROPERTIES_IFACE)
853
return self.attributes[propname]
805
def Set(self, interface, property, value, dbus_interface):
806
testcase.assertEqual(interface, client_interface)
807
testcase.assertEqual(dbus_interface,
808
dbus.PROPERTIES_IFACE)
809
self.attributes[property] = value
810
def Get(self, interface, property, dbus_interface):
811
testcase.assertEqual(interface, client_interface)
812
testcase.assertEqual(dbus_interface,
813
dbus.PROPERTIES_IFACE)
814
return self.attributes[property]
854
815
def Approve(self, approve, dbus_interface):
855
testcase.assertEqual(dbus_interface,
856
client_dbus_interface)
816
testcase.assertEqual(dbus_interface, client_interface)
857
817
self.calls.append(("Approve", (approve,
858
818
dbus_interface)))
859
819
self.client = MockClient(
906
866
LastCheckerStatus=-2)
907
867
self.clients = collections.OrderedDict(
909
("/clients/foo", self.client.attributes),
910
("/clients/barbar", self.other_client.attributes),
869
(self.client, self.client.attributes),
870
(self.other_client, self.other_client.attributes),
912
self.one_client = {"/clients/foo": self.client.attributes}
917
def get_object(client_bus_name, path):
918
self.assertEqual(client_bus_name, dbus_busname)
920
"/clients/foo": self.client,
921
"/clients/barbar": self.other_client,
872
self.one_client = {self.client: self.client.attributes}
925
874
class TestPrintTableCmd(TestCmd):
926
875
def test_normal(self):
927
output = PrintTableCmd().output(self.clients.values())
928
expected_output = "\n".join((
929
"Name Enabled Timeout Last Successful Check",
930
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
931
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
876
output = PrintTableCmd().output(self.clients)
877
expected_output = """
878
Name Enabled Timeout Last Successful Check
879
foo Yes 00:05:00 2019-02-03T00:00:00
880
barbar Yes 00:05:00 2019-02-04T00:00:00
933
882
self.assertEqual(output, expected_output)
934
883
def test_verbose(self):
935
output = PrintTableCmd(verbose=True).output(
936
self.clients.values())
937
expected_output = "\n".join((
938
# First line (headers)
939
"Name Enabled Timeout Last Successful Check Created "
940
" Interval Host Key ID "
942
" Check Is Running Last Enabl"
943
"ed Approval Is Pending Approved By Default Last A"
944
"pproval Request Approval Delay Approval Duration Checker"
945
" Extended Timeout Expires Last "
947
# Second line (client "foo")
948
"foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02"
949
"T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684"
950
"574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7"
951
"DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03"
953
" 00:00:00 00:00:01 fping -"
954
"q -- %(host)s 00:15:00 2019-02-04T00:00:00 0 "
956
# Third line (client "barbar")
957
"barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03"
958
"T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c8"
959
"3b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB8"
960
"4C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04"
961
"T00:00:00 No No 2019-0"
962
"1-03T00:00:00 00:00:30 00:00:01 : "
963
" 00:15:00 2019-02-05T00:00:00 -2 "
884
output = PrintTableCmd(verbose=True).output(self.clients)
885
expected_output = """
886
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
887
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
888
barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04T00:00:00 No No 2019-01-03T00:00:00 00:00:30 00:00:01 : 00:15:00 2019-02-05T00:00:00 -2
966
890
self.assertEqual(output, expected_output)
967
891
def test_one_client(self):
968
output = PrintTableCmd().output(self.one_client.values())
892
output = PrintTableCmd().output(self.one_client)
969
893
expected_output = """
970
894
Name Enabled Timeout Last Successful Check
971
895
foo Yes 00:05:00 2019-02-03T00:00:00
1065
987
self.calls.append(("RemoveClient", (dbus_path,)))
1066
988
mandos = MockMandos()
1067
989
super(TestRemoveCmd, self).setUp()
1068
RemoveCmd().run(self.clients, self.bus, mandos)
990
RemoveCmd().run(mandos, self.clients)
1069
991
self.assertEqual(len(mandos.calls), 2)
1070
for clientpath in self.clients:
1071
self.assertIn(("RemoveClient", (clientpath,)),
992
for client in self.clients:
993
self.assertIn(("RemoveClient",
994
(client.__dbus_object_path__,)),
1074
997
class TestApproveCmd(TestCmd):
1075
998
def test_approve(self):
1076
ApproveCmd().run(self.clients, self.bus)
1077
for clientpath in self.clients:
1078
client = self.bus.get_object(dbus_busname, clientpath)
1079
self.assertIn(("Approve", (True, client_dbus_interface)),
999
ApproveCmd().run(None, self.clients)
1000
for client in self.clients:
1001
self.assertIn(("Approve", (True, client_interface)),
1082
1004
class TestDenyCmd(TestCmd):
1083
1005
def test_deny(self):
1084
DenyCmd().run(self.clients, self.bus)
1085
for clientpath in self.clients:
1086
client = self.bus.get_object(dbus_busname, clientpath)
1087
self.assertIn(("Approve", (False, client_dbus_interface)),
1006
DenyCmd().run(None, self.clients)
1007
for client in self.clients:
1008
self.assertIn(("Approve", (False, client_interface)),
1090
1011
class TestEnableCmd(TestCmd):
1091
1012
def test_enable(self):
1092
for clientpath in self.clients:
1093
client = self.bus.get_object(dbus_busname, clientpath)
1013
for client in self.clients:
1094
1014
client.attributes["Enabled"] = False
1096
EnableCmd().run(self.clients, self.bus)
1016
EnableCmd().run(None, self.clients)
1098
for clientpath in self.clients:
1099
client = self.bus.get_object(dbus_busname, clientpath)
1018
for client in self.clients:
1100
1019
self.assertTrue(client.attributes["Enabled"])
1102
1021
class TestDisableCmd(TestCmd):
1103
1022
def test_disable(self):
1104
DisableCmd().run(self.clients, self.bus)
1105
for clientpath in self.clients:
1106
client = self.bus.get_object(dbus_busname, clientpath)
1023
DisableCmd().run(None, self.clients)
1025
for client in self.clients:
1107
1026
self.assertFalse(client.attributes["Enabled"])
1109
1028
class Unique(object):
1119
1038
self.values_to_set)
1120
1039
for value_to_set, value_to_get in zip(self.values_to_set,
1121
1040
values_to_get):
1122
for clientpath in self.clients:
1123
client = self.bus.get_object(dbus_busname, clientpath)
1124
old_value = client.attributes[self.propname]
1041
for client in self.clients:
1042
old_value = client.attributes[self.property]
1125
1043
self.assertNotIsInstance(old_value, Unique)
1126
client.attributes[self.propname] = Unique()
1044
client.attributes[self.property] = Unique()
1127
1045
self.run_command(value_to_set, self.clients)
1128
for clientpath in self.clients:
1129
client = self.bus.get_object(dbus_busname, clientpath)
1130
value = client.attributes[self.propname]
1046
for client in self.clients:
1047
value = client.attributes[self.property]
1131
1048
self.assertNotIsInstance(value, Unique)
1132
1049
self.assertEqual(value, value_to_get)
1133
1050
def run_command(self, value, clients):
1134
self.command().run(clients, self.bus)
1051
self.command().run(None, clients)
1136
1053
class TestBumpTimeoutCmd(TestPropertyCmd):
1137
1054
command = BumpTimeoutCmd
1138
propname = "LastCheckedOK"
1055
property = "LastCheckedOK"
1139
1056
values_to_set = [""]
1141
1058
class TestStartCheckerCmd(TestPropertyCmd):
1142
1059
command = StartCheckerCmd
1143
propname = "CheckerRunning"
1060
property = "CheckerRunning"
1144
1061
values_to_set = [dbus.Boolean(True)]
1146
1063
class TestStopCheckerCmd(TestPropertyCmd):
1147
1064
command = StopCheckerCmd
1148
propname = "CheckerRunning"
1065
property = "CheckerRunning"
1149
1066
values_to_set = [dbus.Boolean(False)]
1151
1068
class TestApproveByDefaultCmd(TestPropertyCmd):
1152
1069
command = ApproveByDefaultCmd
1153
propname = "ApprovedByDefault"
1070
property = "ApprovedByDefault"
1154
1071
values_to_set = [dbus.Boolean(True)]
1156
1073
class TestDenyByDefaultCmd(TestPropertyCmd):
1157
1074
command = DenyByDefaultCmd
1158
propname = "ApprovedByDefault"
1075
property = "ApprovedByDefault"
1159
1076
values_to_set = [dbus.Boolean(False)]
1161
class TestPropertyValueCmd(TestPropertyCmd):
1162
"""Abstract class for tests of PropertyValueCmd classes"""
1078
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1079
"""Abstract class for tests of PropertyCmd classes using the
1080
ValueArgumentMixIn"""
1163
1081
def runTest(self):
1164
if type(self) is TestPropertyValueCmd:
1082
if type(self) is TestValueArgumentPropertyCmd:
1166
return super(TestPropertyValueCmd, self).runTest()
1084
return super(TestValueArgumentPropertyCmd, self).runTest()
1167
1085
def run_command(self, value, clients):
1168
self.command(value).run(clients, self.bus)
1086
self.command(value).run(None, clients)
1170
class TestSetCheckerCmd(TestPropertyValueCmd):
1088
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1171
1089
command = SetCheckerCmd
1172
propname = "Checker"
1090
property = "Checker"
1173
1091
values_to_set = ["", ":", "fping -q -- %s"]
1175
class TestSetHostCmd(TestPropertyValueCmd):
1093
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1176
1094
command = SetHostCmd
1178
1096
values_to_set = ["192.0.2.3", "foo.example.org"]
1180
class TestSetSecretCmd(TestPropertyValueCmd):
1098
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1181
1099
command = SetSecretCmd
1183
values_to_set = [io.BytesIO(b""),
1101
values_to_set = [open("/dev/null", "rb"),
1184
1102
io.BytesIO(b"secret\0xyzzy\nbar")]
1185
1103
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1187
class TestSetTimeoutCmd(TestPropertyValueCmd):
1105
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1188
1106
command = SetTimeoutCmd
1189
propname = "Timeout"
1107
property = "Timeout"
1190
1108
values_to_set = [datetime.timedelta(),
1191
1109
datetime.timedelta(minutes=5),
1192
1110
datetime.timedelta(seconds=1),
1413
1330
def test_is_enabled_short(self):
1414
1331
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1416
def test_deny_before_remove(self):
1417
options = self.parser.parse_args(["--deny", "--remove",
1419
check_option_syntax(self.parser, options)
1420
commands = commands_from_options(options)
1421
self.assertEqual(len(commands), 2)
1422
self.assertIsInstance(commands[0], DenyCmd)
1423
self.assertIsInstance(commands[1], RemoveCmd)
1425
def test_deny_before_remove_reversed(self):
1426
options = self.parser.parse_args(["--remove", "--deny",
1428
check_option_syntax(self.parser, options)
1429
commands = commands_from_options(options)
1430
self.assertEqual(len(commands), 2)
1431
self.assertIsInstance(commands[0], DenyCmd)
1432
self.assertIsInstance(commands[1], RemoveCmd)
1435
class Test_check_option_syntax(unittest.TestCase):
1436
# This mostly corresponds to the definition from has_actions() in
1437
# check_option_syntax()
1439
# The actual values set here are not that important, but we do
1440
# at least stick to the correct types, even though they are
1444
"bump_timeout": True,
1445
"start_checker": True,
1446
"stop_checker": True,
1450
"timeout": datetime.timedelta(),
1451
"extended_timeout": datetime.timedelta(),
1452
"interval": datetime.timedelta(),
1453
"approved_by_default": True,
1454
"approval_delay": datetime.timedelta(),
1455
"approval_duration": datetime.timedelta(),
1457
"secret": io.BytesIO(b"x"),
1463
self.parser = argparse.ArgumentParser()
1464
add_command_line_options(self.parser)
1466
@contextlib.contextmanager
1467
def assertParseError(self):
1468
with self.assertRaises(SystemExit) as e:
1469
with self.temporarily_suppress_stderr():
1471
# Exit code from argparse is guaranteed to be "2". Reference:
1472
# https://docs.python.org/3/library
1473
# /argparse.html#exiting-methods
1474
self.assertEqual(e.exception.code, 2)
1477
@contextlib.contextmanager
1478
def temporarily_suppress_stderr():
1479
null = os.open(os.path.devnull, os.O_RDWR)
1480
stderrcopy = os.dup(sys.stderr.fileno())
1481
os.dup2(null, sys.stderr.fileno())
1487
os.dup2(stderrcopy, sys.stderr.fileno())
1488
os.close(stderrcopy)
1490
def check_option_syntax(self, options):
1491
check_option_syntax(self.parser, options)
1493
def test_actions_requires_client_or_all(self):
1494
for action, value in self.actions.items():
1495
options = self.parser.parse_args()
1496
setattr(options, action, value)
1497
with self.assertParseError():
1498
self.check_option_syntax(options)
1500
def test_actions_conflicts_with_verbose(self):
1501
for action, value in self.actions.items():
1502
options = self.parser.parse_args()
1503
setattr(options, action, value)
1504
options.verbose = True
1505
with self.assertParseError():
1506
self.check_option_syntax(options)
1508
def test_dump_json_conflicts_with_verbose(self):
1509
options = self.parser.parse_args()
1510
options.dump_json = True
1511
options.verbose = True
1512
with self.assertParseError():
1513
self.check_option_syntax(options)
1515
def test_dump_json_conflicts_with_action(self):
1516
for action, value in self.actions.items():
1517
options = self.parser.parse_args()
1518
setattr(options, action, value)
1519
options.dump_json = True
1520
with self.assertParseError():
1521
self.check_option_syntax(options)
1523
def test_all_can_not_be_alone(self):
1524
options = self.parser.parse_args()
1526
with self.assertParseError():
1527
self.check_option_syntax(options)
1529
def test_all_is_ok_with_any_action(self):
1530
for action, value in self.actions.items():
1531
options = self.parser.parse_args()
1532
setattr(options, action, value)
1534
self.check_option_syntax(options)
1536
def test_is_enabled_fails_without_client(self):
1537
options = self.parser.parse_args()
1538
options.is_enabled = True
1539
with self.assertParseError():
1540
self.check_option_syntax(options)
1542
def test_is_enabled_works_with_one_client(self):
1543
options = self.parser.parse_args()
1544
options.is_enabled = True
1545
options.client = ["foo"]
1546
self.check_option_syntax(options)
1548
def test_is_enabled_fails_with_two_clients(self):
1549
options = self.parser.parse_args()
1550
options.is_enabled = True
1551
options.client = ["foo", "barbar"]
1552
with self.assertParseError():
1553
self.check_option_syntax(options)
1555
def test_remove_can_only_be_combined_with_action_deny(self):
1556
for action, value in self.actions.items():
1557
if action in {"remove", "deny"}:
1559
options = self.parser.parse_args()
1560
setattr(options, action, value)
1562
options.remove = True
1563
with self.assertParseError():
1564
self.check_option_syntax(options)
1568
1335
def should_only_run_tests():