294
294
"ApprovalDuration", "Checker", "ExtendedTimeout",
295
295
"Expires", "LastCheckerStatus")
296
296
def run(self, mandos, clients):
297
print(self.output(clients))
297
print(self.output(clients.values()))
298
def output(self, clients):
299
raise NotImplementedError()
299
301
class PropertyCmd(Command):
300
302
"""Abstract class for Actions for setting one client property"""
303
305
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
304
306
client.__dbus_object_path__,
305
307
dbus.PROPERTIES_IFACE, client_interface,
306
self.property, self.value_to_set
308
self.propname, self.value_to_set
307
309
if not isinstance(self.value_to_set, dbus.Boolean)
308
310
else bool(self.value_to_set))
309
client.Set(client_interface, self.property, self.value_to_set,
311
client.Set(client_interface, self.propname, self.value_to_set,
310
312
dbus_interface=dbus.PROPERTIES_IFACE)
315
raise NotImplementedError()
312
317
class ValueArgumentMixIn(object):
313
318
"""Mixin class for commands taking a value as argument"""
336
341
keywords = default_keywords
338
343
keywords = self.all_keywords
339
return str(self.TableOfClients(clients.values(), keywords))
344
return str(self.TableOfClients(clients, keywords))
341
346
class TableOfClients(object):
435
440
def is_enabled(self, client, properties):
436
return bool(properties["Enabled"])
441
return properties["Enabled"]
438
443
class RemoveCmd(Command):
439
444
def run_on_one_client(self, client, properties):
457
462
dbus_interface=client_interface)
459
464
class EnableCmd(PropertyCmd):
461
466
value_to_set = dbus.Boolean(True)
463
468
class DisableCmd(PropertyCmd):
465
470
value_to_set = dbus.Boolean(False)
467
472
class BumpTimeoutCmd(PropertyCmd):
468
property = "LastCheckedOK"
473
propname = "LastCheckedOK"
469
474
value_to_set = ""
471
476
class StartCheckerCmd(PropertyCmd):
472
property = "CheckerRunning"
477
propname = "CheckerRunning"
473
478
value_to_set = dbus.Boolean(True)
475
480
class StopCheckerCmd(PropertyCmd):
476
property = "CheckerRunning"
481
propname = "CheckerRunning"
477
482
value_to_set = dbus.Boolean(False)
479
484
class ApproveByDefaultCmd(PropertyCmd):
480
property = "ApprovedByDefault"
485
propname = "ApprovedByDefault"
481
486
value_to_set = dbus.Boolean(True)
483
488
class DenyByDefaultCmd(PropertyCmd):
484
property = "ApprovedByDefault"
489
propname = "ApprovedByDefault"
485
490
value_to_set = dbus.Boolean(False)
487
492
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
490
495
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
493
498
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
495
501
def value_to_set(self):
499
505
"""When setting, read data from supplied file object"""
500
506
self._vts = value.read()
504
509
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
507
512
class SetExtendedTimeoutCmd(PropertyCmd,
508
513
MillisecondsValueArgumentMixIn):
509
property = "ExtendedTimeout"
514
propname = "ExtendedTimeout"
511
516
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
512
property = "Interval"
517
propname = "Interval"
514
519
class SetApprovalDelayCmd(PropertyCmd,
515
520
MillisecondsValueArgumentMixIn):
516
property = "ApprovalDelay"
521
propname = "ApprovalDelay"
518
523
class SetApprovalDurationCmd(PropertyCmd,
519
524
MillisecondsValueArgumentMixIn):
520
property = "ApprovalDuration"
525
propname = "ApprovalDuration"
522
527
def add_command_line_options(parser):
523
528
parser.add_argument("--version", action="version",
695
700
parser.error("--all requires an action.")
696
701
if options.is_enabled and len(options.client) > 1:
697
702
parser.error("--is-enabled requires exactly one client")
704
options.remove = False
705
if has_actions(options) and not options.deny:
706
parser.error("--remove can only be combined with --deny")
707
options.remove = True
754
764
if not clientnames:
755
clients = {bus.get_object(busname, path): properties
765
clients = {(log.debug("D-Bus: Connect to: (name=%r, path=%r)",
766
busname, str(path)) and False) or
767
bus.get_object(busname, path): properties
756
768
for path, properties in mandos_clients.items()}
758
770
for name in clientnames:
759
771
for path, client in mandos_clients.items():
760
772
if client["Name"] == name:
773
log.debug("D-Bus: Connect to: (name=%r, path=%r)",
761
775
client_objc = bus.get_object(busname, path)
762
776
clients[client_objc] = client
827
841
self.attributes = attributes
828
842
self.attributes["Name"] = name
830
def Set(self, interface, property, value, dbus_interface):
831
testcase.assertEqual(interface, client_interface)
832
testcase.assertEqual(dbus_interface,
833
dbus.PROPERTIES_IFACE)
834
self.attributes[property] = value
835
def Get(self, interface, property, dbus_interface):
836
testcase.assertEqual(interface, client_interface)
837
testcase.assertEqual(dbus_interface,
838
dbus.PROPERTIES_IFACE)
839
return self.attributes[property]
844
def Set(self, interface, propname, value, dbus_interface):
845
testcase.assertEqual(interface, client_interface)
846
testcase.assertEqual(dbus_interface,
847
dbus.PROPERTIES_IFACE)
848
self.attributes[propname] = value
849
def Get(self, interface, propname, dbus_interface):
850
testcase.assertEqual(interface, client_interface)
851
testcase.assertEqual(dbus_interface,
852
dbus.PROPERTIES_IFACE)
853
return self.attributes[propname]
840
854
def Approve(self, approve, dbus_interface):
841
855
testcase.assertEqual(dbus_interface, client_interface)
842
856
self.calls.append(("Approve", (approve,
899
913
class TestPrintTableCmd(TestCmd):
900
914
def test_normal(self):
901
output = PrintTableCmd().output(self.clients)
915
output = PrintTableCmd().output(self.clients.values())
902
916
expected_output = """
903
917
Name Enabled Timeout Last Successful Check
904
918
foo Yes 00:05:00 2019-02-03T00:00:00
907
921
self.assertEqual(output, expected_output)
908
922
def test_verbose(self):
909
output = PrintTableCmd(verbose=True).output(self.clients)
923
output = PrintTableCmd(verbose=True).output(
924
self.clients.values())
910
925
expected_output = """
911
926
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
912
927
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
915
930
self.assertEqual(output, expected_output)
916
931
def test_one_client(self):
917
output = PrintTableCmd().output(self.one_client)
932
output = PrintTableCmd().output(self.one_client.values())
918
933
expected_output = """
919
934
Name Enabled Timeout Last Successful Check
920
935
foo Yes 00:05:00 2019-02-03T00:00:00
1064
1079
for value_to_set, value_to_get in zip(self.values_to_set,
1065
1080
values_to_get):
1066
1081
for client in self.clients:
1067
old_value = client.attributes[self.property]
1082
old_value = client.attributes[self.propname]
1068
1083
self.assertNotIsInstance(old_value, Unique)
1069
client.attributes[self.property] = Unique()
1084
client.attributes[self.propname] = Unique()
1070
1085
self.run_command(value_to_set, self.clients)
1071
1086
for client in self.clients:
1072
value = client.attributes[self.property]
1087
value = client.attributes[self.propname]
1073
1088
self.assertNotIsInstance(value, Unique)
1074
1089
self.assertEqual(value, value_to_get)
1075
1090
def run_command(self, value, clients):
1078
1093
class TestBumpTimeoutCmd(TestPropertyCmd):
1079
1094
command = BumpTimeoutCmd
1080
property = "LastCheckedOK"
1095
propname = "LastCheckedOK"
1081
1096
values_to_set = [""]
1083
1098
class TestStartCheckerCmd(TestPropertyCmd):
1084
1099
command = StartCheckerCmd
1085
property = "CheckerRunning"
1100
propname = "CheckerRunning"
1086
1101
values_to_set = [dbus.Boolean(True)]
1088
1103
class TestStopCheckerCmd(TestPropertyCmd):
1089
1104
command = StopCheckerCmd
1090
property = "CheckerRunning"
1105
propname = "CheckerRunning"
1091
1106
values_to_set = [dbus.Boolean(False)]
1093
1108
class TestApproveByDefaultCmd(TestPropertyCmd):
1094
1109
command = ApproveByDefaultCmd
1095
property = "ApprovedByDefault"
1110
propname = "ApprovedByDefault"
1096
1111
values_to_set = [dbus.Boolean(True)]
1098
1113
class TestDenyByDefaultCmd(TestPropertyCmd):
1099
1114
command = DenyByDefaultCmd
1100
property = "ApprovedByDefault"
1115
propname = "ApprovedByDefault"
1101
1116
values_to_set = [dbus.Boolean(False)]
1103
1118
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1113
1128
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1114
1129
command = SetCheckerCmd
1115
property = "Checker"
1130
propname = "Checker"
1116
1131
values_to_set = ["", ":", "fping -q -- %s"]
1118
1133
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1119
1134
command = SetHostCmd
1121
1136
values_to_set = ["192.0.2.3", "foo.example.org"]
1123
1138
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1124
1139
command = SetSecretCmd
1126
1141
values_to_set = [io.BytesIO(b""),
1127
1142
io.BytesIO(b"secret\0xyzzy\nbar")]
1128
1143
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1130
1145
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1131
1146
command = SetTimeoutCmd
1132
property = "Timeout"
1147
propname = "Timeout"
1133
1148
values_to_set = [datetime.timedelta(),
1134
1149
datetime.timedelta(minutes=5),
1135
1150
datetime.timedelta(seconds=1),
1140
1155
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1141
1156
command = SetExtendedTimeoutCmd
1142
property = "ExtendedTimeout"
1157
propname = "ExtendedTimeout"
1143
1158
values_to_set = [datetime.timedelta(),
1144
1159
datetime.timedelta(minutes=5),
1145
1160
datetime.timedelta(seconds=1),
1150
1165
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1151
1166
command = SetIntervalCmd
1152
property = "Interval"
1167
propname = "Interval"
1153
1168
values_to_set = [datetime.timedelta(),
1154
1169
datetime.timedelta(minutes=5),
1155
1170
datetime.timedelta(seconds=1),
1160
1175
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1161
1176
command = SetApprovalDelayCmd
1162
property = "ApprovalDelay"
1177
propname = "ApprovalDelay"
1163
1178
values_to_set = [datetime.timedelta(),
1164
1179
datetime.timedelta(minutes=5),
1165
1180
datetime.timedelta(seconds=1),
1170
1185
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1171
1186
command = SetApprovalDurationCmd
1172
property = "ApprovalDuration"
1187
propname = "ApprovalDuration"
1173
1188
values_to_set = [datetime.timedelta(),
1174
1189
datetime.timedelta(minutes=5),
1175
1190
datetime.timedelta(seconds=1),
1355
1370
def test_is_enabled_short(self):
1356
1371
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1373
def test_deny_before_remove(self):
1374
options = self.parser.parse_args(["--deny", "--remove", "foo"])
1375
check_option_syntax(self.parser, options)
1376
commands = commands_from_options(options)
1377
self.assertEqual(len(commands), 2)
1378
self.assertIsInstance(commands[0], DenyCmd)
1379
self.assertIsInstance(commands[1], RemoveCmd)
1381
def test_deny_before_remove_reversed(self):
1382
options = self.parser.parse_args(["--remove", "--deny", "--all"])
1383
check_option_syntax(self.parser, options)
1384
commands = commands_from_options(options)
1385
self.assertEqual(len(commands), 2)
1386
self.assertIsInstance(commands[0], DenyCmd)
1387
self.assertIsInstance(commands[1], RemoveCmd)
1359
1390
class Test_check_option_syntax(unittest.TestCase):
1360
1391
# This mostly corresponds to the definition from has_actions() in
1475
1506
with self.assertParseError():
1476
1507
self.check_option_syntax(options)
1509
def test_remove_can_only_be_combined_with_action_deny(self):
1510
for action, value in self.actions.items():
1511
if action in {"remove", "deny"}:
1513
options = self.parser.parse_args()
1514
setattr(options, action, value)
1516
options.remove = True
1517
with self.assertParseError():
1518
self.check_option_syntax(options)
1480
1522
def should_only_run_tests():