297
292
"LastApprovalRequest", "ApprovalDelay",
298
293
"ApprovalDuration", "Checker", "ExtendedTimeout",
299
294
"Expires", "LastCheckerStatus")
300
def run(self, clients, bus=None, mandos=None):
301
print(self.output(clients.values()))
302
def output(self, clients):
303
raise NotImplementedError()
295
def run(self, mandos, clients):
296
print(self.output(clients))
305
298
class PropertyCmd(Command):
306
299
"""Abstract class for Actions for setting one client property"""
307
300
def run_on_one_client(self, client, properties):
308
301
"""Set the Client's D-Bus property"""
309
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
310
client.__dbus_object_path__,
311
dbus.PROPERTIES_IFACE, client_dbus_interface,
312
self.propname, self.value_to_set
313
if not isinstance(self.value_to_set, dbus.Boolean)
314
else bool(self.value_to_set))
315
client.Set(client_dbus_interface, self.propname,
302
client.Set(client_interface, self.property, self.value_to_set,
317
303
dbus_interface=dbus.PROPERTIES_IFACE)
320
raise NotImplementedError()
322
class PropertyValueCmd(PropertyCmd):
323
"""Abstract class for PropertyCmd recieving a value as argument"""
305
class ValueArgumentMixIn(object):
306
"""Mixin class for commands taking a value as argument"""
324
307
def __init__(self, value):
325
308
self.value_to_set = value
327
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
328
"""Abstract class for PropertyValueCmd taking a value argument as
329
a datetime.timedelta() but should store it as milliseconds."""
310
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
311
"""Mixin class for commands taking a value argument as
331
314
def value_to_set(self):
333
316
@value_to_set.setter
334
317
def value_to_set(self, value):
335
"""When setting, convert value from a datetime.timedelta"""
318
"""When setting, convert value to a datetime.timedelta"""
336
319
self._vts = int(round(value.total_seconds() * 1000))
338
321
# Actual (non-abstract) command classes
440
423
class IsEnabledCmd(Command):
441
def run(self, clients, bus=None, mandos=None):
442
client, properties = next(iter(clients.items()))
424
def run_on_one_client(self, client, properties):
443
425
if self.is_enabled(client, properties):
446
428
def is_enabled(self, client, properties):
447
return properties["Enabled"]
429
return bool(properties["Enabled"])
449
431
class RemoveCmd(Command):
450
432
def run_on_one_client(self, client, properties):
451
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname,
452
server_dbus_path, server_dbus_interface,
453
str(client.__dbus_object_path__))
454
433
self.mandos.RemoveClient(client.__dbus_object_path__)
456
435
class ApproveCmd(Command):
457
436
def run_on_one_client(self, client, properties):
458
log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
459
client.__dbus_object_path__, client_dbus_interface)
460
437
client.Approve(dbus.Boolean(True),
461
dbus_interface=client_dbus_interface)
438
dbus_interface=client_interface)
463
440
class DenyCmd(Command):
464
441
def run_on_one_client(self, client, properties):
465
log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
466
client.__dbus_object_path__, client_dbus_interface)
467
442
client.Approve(dbus.Boolean(False),
468
dbus_interface=client_dbus_interface)
443
dbus_interface=client_interface)
470
445
class EnableCmd(PropertyCmd):
472
447
value_to_set = dbus.Boolean(True)
474
449
class DisableCmd(PropertyCmd):
476
451
value_to_set = dbus.Boolean(False)
478
453
class BumpTimeoutCmd(PropertyCmd):
479
propname = "LastCheckedOK"
454
property = "LastCheckedOK"
480
455
value_to_set = ""
482
457
class StartCheckerCmd(PropertyCmd):
483
propname = "CheckerRunning"
458
property = "CheckerRunning"
484
459
value_to_set = dbus.Boolean(True)
486
461
class StopCheckerCmd(PropertyCmd):
487
propname = "CheckerRunning"
462
property = "CheckerRunning"
488
463
value_to_set = dbus.Boolean(False)
490
465
class ApproveByDefaultCmd(PropertyCmd):
491
propname = "ApprovedByDefault"
466
property = "ApprovedByDefault"
492
467
value_to_set = dbus.Boolean(True)
494
469
class DenyByDefaultCmd(PropertyCmd):
495
propname = "ApprovedByDefault"
470
property = "ApprovedByDefault"
496
471
value_to_set = dbus.Boolean(False)
498
class SetCheckerCmd(PropertyValueCmd):
501
class SetHostCmd(PropertyValueCmd):
504
class SetSecretCmd(PropertyValueCmd):
473
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
476
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
479
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
507
481
def value_to_set(self):
511
485
"""When setting, read data from supplied file object"""
512
486
self._vts = value.read()
515
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
518
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
519
propname = "ExtendedTimeout"
521
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
522
propname = "Interval"
524
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
525
propname = "ApprovalDelay"
527
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
528
propname = "ApprovalDuration"
490
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
493
class SetExtendedTimeoutCmd(PropertyCmd,
494
MillisecondsValueArgumentMixIn):
495
property = "ExtendedTimeout"
497
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
498
property = "Interval"
500
class SetApprovalDelayCmd(PropertyCmd,
501
MillisecondsValueArgumentMixIn):
502
property = "ApprovalDelay"
504
class SetApprovalDurationCmd(PropertyCmd,
505
MillisecondsValueArgumentMixIn):
506
property = "ApprovalDuration"
530
508
def add_command_line_options(parser):
531
509
parser.add_argument("--version", action="version",
837
799
class MockClient(object):
838
800
def __init__(self, name, **attributes):
839
self.__dbus_object_path__ = "/clients/{}".format(name)
801
self.__dbus_object_path__ = "objpath_{}".format(name)
840
802
self.attributes = attributes
841
803
self.attributes["Name"] = name
843
def Set(self, interface, propname, value, dbus_interface):
844
testcase.assertEqual(interface, client_dbus_interface)
845
testcase.assertEqual(dbus_interface,
846
dbus.PROPERTIES_IFACE)
847
self.attributes[propname] = value
848
def Get(self, interface, propname, dbus_interface):
849
testcase.assertEqual(interface, client_dbus_interface)
850
testcase.assertEqual(dbus_interface,
851
dbus.PROPERTIES_IFACE)
852
return self.attributes[propname]
805
def Set(self, interface, property, value, dbus_interface):
806
testcase.assertEqual(interface, client_interface)
807
testcase.assertEqual(dbus_interface,
808
dbus.PROPERTIES_IFACE)
809
self.attributes[property] = value
810
def Get(self, interface, property, dbus_interface):
811
testcase.assertEqual(interface, client_interface)
812
testcase.assertEqual(dbus_interface,
813
dbus.PROPERTIES_IFACE)
814
return self.attributes[property]
853
815
def Approve(self, approve, dbus_interface):
854
testcase.assertEqual(dbus_interface,
855
client_dbus_interface)
816
testcase.assertEqual(dbus_interface, client_interface)
856
817
self.calls.append(("Approve", (approve,
857
818
dbus_interface)))
858
819
self.client = MockClient(
905
866
LastCheckerStatus=-2)
906
867
self.clients = collections.OrderedDict(
908
("/clients/foo", self.client.attributes),
909
("/clients/barbar", self.other_client.attributes),
869
(self.client, self.client.attributes),
870
(self.other_client, self.other_client.attributes),
911
self.one_client = {"/clients/foo": self.client.attributes}
916
def get_object(client_bus_name, path):
917
self.assertEqual(client_bus_name, dbus_busname)
919
"/clients/foo": self.client,
920
"/clients/barbar": self.other_client,
872
self.one_client = {self.client: self.client.attributes}
924
874
class TestPrintTableCmd(TestCmd):
925
875
def test_normal(self):
926
output = PrintTableCmd().output(self.clients.values())
876
output = PrintTableCmd().output(self.clients)
927
877
expected_output = """
928
878
Name Enabled Timeout Last Successful Check
929
879
foo Yes 00:05:00 2019-02-03T00:00:00
1038
987
self.calls.append(("RemoveClient", (dbus_path,)))
1039
988
mandos = MockMandos()
1040
989
super(TestRemoveCmd, self).setUp()
1041
RemoveCmd().run(self.clients, self.bus, mandos)
990
RemoveCmd().run(mandos, self.clients)
1042
991
self.assertEqual(len(mandos.calls), 2)
1043
for clientpath in self.clients:
1044
self.assertIn(("RemoveClient", (clientpath,)),
992
for client in self.clients:
993
self.assertIn(("RemoveClient",
994
(client.__dbus_object_path__,)),
1047
997
class TestApproveCmd(TestCmd):
1048
998
def test_approve(self):
1049
ApproveCmd().run(self.clients, self.bus)
1050
for clientpath in self.clients:
1051
client = self.bus.get_object(dbus_busname, clientpath)
1052
self.assertIn(("Approve", (True, client_dbus_interface)),
999
ApproveCmd().run(None, self.clients)
1000
for client in self.clients:
1001
self.assertIn(("Approve", (True, client_interface)),
1055
1004
class TestDenyCmd(TestCmd):
1056
1005
def test_deny(self):
1057
DenyCmd().run(self.clients, self.bus)
1058
for clientpath in self.clients:
1059
client = self.bus.get_object(dbus_busname, clientpath)
1060
self.assertIn(("Approve", (False, client_dbus_interface)),
1006
DenyCmd().run(None, self.clients)
1007
for client in self.clients:
1008
self.assertIn(("Approve", (False, client_interface)),
1063
1011
class TestEnableCmd(TestCmd):
1064
1012
def test_enable(self):
1065
for clientpath in self.clients:
1066
client = self.bus.get_object(dbus_busname, clientpath)
1013
for client in self.clients:
1067
1014
client.attributes["Enabled"] = False
1069
EnableCmd().run(self.clients, self.bus)
1016
EnableCmd().run(None, self.clients)
1071
for clientpath in self.clients:
1072
client = self.bus.get_object(dbus_busname, clientpath)
1018
for client in self.clients:
1073
1019
self.assertTrue(client.attributes["Enabled"])
1075
1021
class TestDisableCmd(TestCmd):
1076
1022
def test_disable(self):
1077
DisableCmd().run(self.clients, self.bus)
1078
for clientpath in self.clients:
1079
client = self.bus.get_object(dbus_busname, clientpath)
1023
DisableCmd().run(None, self.clients)
1025
for client in self.clients:
1080
1026
self.assertFalse(client.attributes["Enabled"])
1082
1028
class Unique(object):
1092
1038
self.values_to_set)
1093
1039
for value_to_set, value_to_get in zip(self.values_to_set,
1094
1040
values_to_get):
1095
for clientpath in self.clients:
1096
client = self.bus.get_object(dbus_busname, clientpath)
1097
old_value = client.attributes[self.propname]
1041
for client in self.clients:
1042
old_value = client.attributes[self.property]
1098
1043
self.assertNotIsInstance(old_value, Unique)
1099
client.attributes[self.propname] = Unique()
1044
client.attributes[self.property] = Unique()
1100
1045
self.run_command(value_to_set, self.clients)
1101
for clientpath in self.clients:
1102
client = self.bus.get_object(dbus_busname, clientpath)
1103
value = client.attributes[self.propname]
1046
for client in self.clients:
1047
value = client.attributes[self.property]
1104
1048
self.assertNotIsInstance(value, Unique)
1105
1049
self.assertEqual(value, value_to_get)
1106
1050
def run_command(self, value, clients):
1107
self.command().run(clients, self.bus)
1051
self.command().run(None, clients)
1109
1053
class TestBumpTimeoutCmd(TestPropertyCmd):
1110
1054
command = BumpTimeoutCmd
1111
propname = "LastCheckedOK"
1055
property = "LastCheckedOK"
1112
1056
values_to_set = [""]
1114
1058
class TestStartCheckerCmd(TestPropertyCmd):
1115
1059
command = StartCheckerCmd
1116
propname = "CheckerRunning"
1060
property = "CheckerRunning"
1117
1061
values_to_set = [dbus.Boolean(True)]
1119
1063
class TestStopCheckerCmd(TestPropertyCmd):
1120
1064
command = StopCheckerCmd
1121
propname = "CheckerRunning"
1065
property = "CheckerRunning"
1122
1066
values_to_set = [dbus.Boolean(False)]
1124
1068
class TestApproveByDefaultCmd(TestPropertyCmd):
1125
1069
command = ApproveByDefaultCmd
1126
propname = "ApprovedByDefault"
1070
property = "ApprovedByDefault"
1127
1071
values_to_set = [dbus.Boolean(True)]
1129
1073
class TestDenyByDefaultCmd(TestPropertyCmd):
1130
1074
command = DenyByDefaultCmd
1131
propname = "ApprovedByDefault"
1075
property = "ApprovedByDefault"
1132
1076
values_to_set = [dbus.Boolean(False)]
1134
class TestPropertyValueCmd(TestPropertyCmd):
1135
"""Abstract class for tests of PropertyValueCmd classes"""
1078
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1079
"""Abstract class for tests of PropertyCmd classes using the
1080
ValueArgumentMixIn"""
1136
1081
def runTest(self):
1137
if type(self) is TestPropertyValueCmd:
1082
if type(self) is TestValueArgumentPropertyCmd:
1139
return super(TestPropertyValueCmd, self).runTest()
1084
return super(TestValueArgumentPropertyCmd, self).runTest()
1140
1085
def run_command(self, value, clients):
1141
self.command(value).run(clients, self.bus)
1086
self.command(value).run(None, clients)
1143
class TestSetCheckerCmd(TestPropertyValueCmd):
1088
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1144
1089
command = SetCheckerCmd
1145
propname = "Checker"
1090
property = "Checker"
1146
1091
values_to_set = ["", ":", "fping -q -- %s"]
1148
class TestSetHostCmd(TestPropertyValueCmd):
1093
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1149
1094
command = SetHostCmd
1151
1096
values_to_set = ["192.0.2.3", "foo.example.org"]
1153
class TestSetSecretCmd(TestPropertyValueCmd):
1098
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1154
1099
command = SetSecretCmd
1156
values_to_set = [io.BytesIO(b""),
1101
values_to_set = [open("/dev/null", "rb"),
1157
1102
io.BytesIO(b"secret\0xyzzy\nbar")]
1158
1103
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1160
class TestSetTimeoutCmd(TestPropertyValueCmd):
1105
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1161
1106
command = SetTimeoutCmd
1162
propname = "Timeout"
1107
property = "Timeout"
1163
1108
values_to_set = [datetime.timedelta(),
1164
1109
datetime.timedelta(minutes=5),
1165
1110
datetime.timedelta(seconds=1),
1385
1330
def test_is_enabled_short(self):
1386
1331
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1388
def test_deny_before_remove(self):
1389
options = self.parser.parse_args(["--deny", "--remove", "foo"])
1390
check_option_syntax(self.parser, options)
1391
commands = commands_from_options(options)
1392
self.assertEqual(len(commands), 2)
1393
self.assertIsInstance(commands[0], DenyCmd)
1394
self.assertIsInstance(commands[1], RemoveCmd)
1396
def test_deny_before_remove_reversed(self):
1397
options = self.parser.parse_args(["--remove", "--deny", "--all"])
1398
check_option_syntax(self.parser, options)
1399
commands = commands_from_options(options)
1400
self.assertEqual(len(commands), 2)
1401
self.assertIsInstance(commands[0], DenyCmd)
1402
self.assertIsInstance(commands[1], RemoveCmd)
1405
class Test_check_option_syntax(unittest.TestCase):
1406
# This mostly corresponds to the definition from has_actions() in
1407
# check_option_syntax()
1409
# The actual values set here are not that important, but we do
1410
# at least stick to the correct types, even though they are
1414
"bump_timeout": True,
1415
"start_checker": True,
1416
"stop_checker": True,
1420
"timeout": datetime.timedelta(),
1421
"extended_timeout": datetime.timedelta(),
1422
"interval": datetime.timedelta(),
1423
"approved_by_default": True,
1424
"approval_delay": datetime.timedelta(),
1425
"approval_duration": datetime.timedelta(),
1427
"secret": io.BytesIO(b"x"),
1433
self.parser = argparse.ArgumentParser()
1434
add_command_line_options(self.parser)
1436
@contextlib.contextmanager
1437
def assertParseError(self):
1438
with self.assertRaises(SystemExit) as e:
1439
with self.temporarily_suppress_stderr():
1441
# Exit code from argparse is guaranteed to be "2". Reference:
1442
# https://docs.python.org/3/library/argparse.html#exiting-methods
1443
self.assertEqual(e.exception.code, 2)
1446
@contextlib.contextmanager
1447
def temporarily_suppress_stderr():
1448
null = os.open(os.path.devnull, os.O_RDWR)
1449
stderrcopy = os.dup(sys.stderr.fileno())
1450
os.dup2(null, sys.stderr.fileno())
1456
os.dup2(stderrcopy, sys.stderr.fileno())
1457
os.close(stderrcopy)
1459
def check_option_syntax(self, options):
1460
check_option_syntax(self.parser, options)
1462
def test_actions_requires_client_or_all(self):
1463
for action, value in self.actions.items():
1464
options = self.parser.parse_args()
1465
setattr(options, action, value)
1466
with self.assertParseError():
1467
self.check_option_syntax(options)
1469
def test_actions_conflicts_with_verbose(self):
1470
for action, value in self.actions.items():
1471
options = self.parser.parse_args()
1472
setattr(options, action, value)
1473
options.verbose = True
1474
with self.assertParseError():
1475
self.check_option_syntax(options)
1477
def test_dump_json_conflicts_with_verbose(self):
1478
options = self.parser.parse_args()
1479
options.dump_json = True
1480
options.verbose = True
1481
with self.assertParseError():
1482
self.check_option_syntax(options)
1484
def test_dump_json_conflicts_with_action(self):
1485
for action, value in self.actions.items():
1486
options = self.parser.parse_args()
1487
setattr(options, action, value)
1488
options.dump_json = True
1489
with self.assertParseError():
1490
self.check_option_syntax(options)
1492
def test_all_can_not_be_alone(self):
1493
options = self.parser.parse_args()
1495
with self.assertParseError():
1496
self.check_option_syntax(options)
1498
def test_all_is_ok_with_any_action(self):
1499
for action, value in self.actions.items():
1500
options = self.parser.parse_args()
1501
setattr(options, action, value)
1503
self.check_option_syntax(options)
1505
def test_is_enabled_fails_without_client(self):
1506
options = self.parser.parse_args()
1507
options.is_enabled = True
1508
with self.assertParseError():
1509
self.check_option_syntax(options)
1511
def test_is_enabled_works_with_one_client(self):
1512
options = self.parser.parse_args()
1513
options.is_enabled = True
1514
options.client = ["foo"]
1515
self.check_option_syntax(options)
1517
def test_is_enabled_fails_with_two_clients(self):
1518
options = self.parser.parse_args()
1519
options.is_enabled = True
1520
options.client = ["foo", "barbar"]
1521
with self.assertParseError():
1522
self.check_option_syntax(options)
1524
def test_remove_can_only_be_combined_with_action_deny(self):
1525
for action, value in self.actions.items():
1526
if action in {"remove", "deny"}:
1528
options = self.parser.parse_args()
1529
setattr(options, action, value)
1531
options.remove = True
1532
with self.assertParseError():
1533
self.check_option_syntax(options)
1537
1335
def should_only_run_tests():