294
294
"ApprovalDuration", "Checker", "ExtendedTimeout",
295
295
"Expires", "LastCheckerStatus")
296
296
def run(self, mandos, clients):
297
print(self.output(clients.values()))
298
def output(self, clients):
299
raise NotImplementedError()
297
print(self.output(clients))
301
299
class PropertyCmd(Command):
302
300
"""Abstract class for Actions for setting one client property"""
305
303
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
306
304
client.__dbus_object_path__,
307
305
dbus.PROPERTIES_IFACE, client_interface,
308
self.propname, self.value_to_set
306
self.property, self.value_to_set
309
307
if not isinstance(self.value_to_set, dbus.Boolean)
310
308
else bool(self.value_to_set))
311
client.Set(client_interface, self.propname, self.value_to_set,
309
client.Set(client_interface, self.property, self.value_to_set,
312
310
dbus_interface=dbus.PROPERTIES_IFACE)
315
raise NotImplementedError()
317
312
class ValueArgumentMixIn(object):
318
313
"""Mixin class for commands taking a value as argument"""
341
336
keywords = default_keywords
343
338
keywords = self.all_keywords
344
return str(self.TableOfClients(clients, keywords))
339
return str(self.TableOfClients(clients.values(), keywords))
346
341
class TableOfClients(object):
440
435
def is_enabled(self, client, properties):
441
return properties["Enabled"]
436
return bool(properties["Enabled"])
443
438
class RemoveCmd(Command):
444
439
def run_on_one_client(self, client, properties):
462
457
dbus_interface=client_interface)
464
459
class EnableCmd(PropertyCmd):
466
461
value_to_set = dbus.Boolean(True)
468
463
class DisableCmd(PropertyCmd):
470
465
value_to_set = dbus.Boolean(False)
472
467
class BumpTimeoutCmd(PropertyCmd):
473
propname = "LastCheckedOK"
468
property = "LastCheckedOK"
474
469
value_to_set = ""
476
471
class StartCheckerCmd(PropertyCmd):
477
propname = "CheckerRunning"
472
property = "CheckerRunning"
478
473
value_to_set = dbus.Boolean(True)
480
475
class StopCheckerCmd(PropertyCmd):
481
propname = "CheckerRunning"
476
property = "CheckerRunning"
482
477
value_to_set = dbus.Boolean(False)
484
479
class ApproveByDefaultCmd(PropertyCmd):
485
propname = "ApprovedByDefault"
480
property = "ApprovedByDefault"
486
481
value_to_set = dbus.Boolean(True)
488
483
class DenyByDefaultCmd(PropertyCmd):
489
propname = "ApprovedByDefault"
484
property = "ApprovedByDefault"
490
485
value_to_set = dbus.Boolean(False)
492
487
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
495
490
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
498
493
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
501
495
def value_to_set(self):
505
499
"""When setting, read data from supplied file object"""
506
500
self._vts = value.read()
509
504
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
512
507
class SetExtendedTimeoutCmd(PropertyCmd,
513
508
MillisecondsValueArgumentMixIn):
514
propname = "ExtendedTimeout"
509
property = "ExtendedTimeout"
516
511
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
517
propname = "Interval"
512
property = "Interval"
519
514
class SetApprovalDelayCmd(PropertyCmd,
520
515
MillisecondsValueArgumentMixIn):
521
propname = "ApprovalDelay"
516
property = "ApprovalDelay"
523
518
class SetApprovalDurationCmd(PropertyCmd,
524
519
MillisecondsValueArgumentMixIn):
525
propname = "ApprovalDuration"
520
property = "ApprovalDuration"
527
522
def add_command_line_options(parser):
528
523
parser.add_argument("--version", action="version",
700
695
parser.error("--all requires an action.")
701
696
if options.is_enabled and len(options.client) > 1:
702
697
parser.error("--is-enabled requires exactly one client")
704
options.remove = False
705
if has_actions(options) and not options.deny:
706
parser.error("--remove can only be combined with --deny")
707
options.remove = True
764
754
if not clientnames:
765
clients = {(log.debug("D-Bus: Connect to: (name=%r, path=%r)",
766
busname, str(path)) and False) or
767
bus.get_object(busname, path): properties
755
clients = {bus.get_object(busname, path): properties
768
756
for path, properties in mandos_clients.items()}
770
758
for name in clientnames:
771
759
for path, client in mandos_clients.items():
772
760
if client["Name"] == name:
773
log.debug("D-Bus: Connect to: (name=%r, path=%r)",
775
761
client_objc = bus.get_object(busname, path)
776
762
clients[client_objc] = client
841
827
self.attributes = attributes
842
828
self.attributes["Name"] = name
844
def Set(self, interface, propname, value, dbus_interface):
845
testcase.assertEqual(interface, client_interface)
846
testcase.assertEqual(dbus_interface,
847
dbus.PROPERTIES_IFACE)
848
self.attributes[propname] = value
849
def Get(self, interface, propname, dbus_interface):
850
testcase.assertEqual(interface, client_interface)
851
testcase.assertEqual(dbus_interface,
852
dbus.PROPERTIES_IFACE)
853
return self.attributes[propname]
830
def Set(self, interface, property, value, dbus_interface):
831
testcase.assertEqual(interface, client_interface)
832
testcase.assertEqual(dbus_interface,
833
dbus.PROPERTIES_IFACE)
834
self.attributes[property] = value
835
def Get(self, interface, property, dbus_interface):
836
testcase.assertEqual(interface, client_interface)
837
testcase.assertEqual(dbus_interface,
838
dbus.PROPERTIES_IFACE)
839
return self.attributes[property]
854
840
def Approve(self, approve, dbus_interface):
855
841
testcase.assertEqual(dbus_interface, client_interface)
856
842
self.calls.append(("Approve", (approve,
913
899
class TestPrintTableCmd(TestCmd):
914
900
def test_normal(self):
915
output = PrintTableCmd().output(self.clients.values())
901
output = PrintTableCmd().output(self.clients)
916
902
expected_output = """
917
903
Name Enabled Timeout Last Successful Check
918
904
foo Yes 00:05:00 2019-02-03T00:00:00
921
907
self.assertEqual(output, expected_output)
922
908
def test_verbose(self):
923
output = PrintTableCmd(verbose=True).output(
924
self.clients.values())
909
output = PrintTableCmd(verbose=True).output(self.clients)
925
910
expected_output = """
926
911
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
927
912
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
930
915
self.assertEqual(output, expected_output)
931
916
def test_one_client(self):
932
output = PrintTableCmd().output(self.one_client.values())
917
output = PrintTableCmd().output(self.one_client)
933
918
expected_output = """
934
919
Name Enabled Timeout Last Successful Check
935
920
foo Yes 00:05:00 2019-02-03T00:00:00
1079
1064
for value_to_set, value_to_get in zip(self.values_to_set,
1080
1065
values_to_get):
1081
1066
for client in self.clients:
1082
old_value = client.attributes[self.propname]
1067
old_value = client.attributes[self.property]
1083
1068
self.assertNotIsInstance(old_value, Unique)
1084
client.attributes[self.propname] = Unique()
1069
client.attributes[self.property] = Unique()
1085
1070
self.run_command(value_to_set, self.clients)
1086
1071
for client in self.clients:
1087
value = client.attributes[self.propname]
1072
value = client.attributes[self.property]
1088
1073
self.assertNotIsInstance(value, Unique)
1089
1074
self.assertEqual(value, value_to_get)
1090
1075
def run_command(self, value, clients):
1093
1078
class TestBumpTimeoutCmd(TestPropertyCmd):
1094
1079
command = BumpTimeoutCmd
1095
propname = "LastCheckedOK"
1080
property = "LastCheckedOK"
1096
1081
values_to_set = [""]
1098
1083
class TestStartCheckerCmd(TestPropertyCmd):
1099
1084
command = StartCheckerCmd
1100
propname = "CheckerRunning"
1085
property = "CheckerRunning"
1101
1086
values_to_set = [dbus.Boolean(True)]
1103
1088
class TestStopCheckerCmd(TestPropertyCmd):
1104
1089
command = StopCheckerCmd
1105
propname = "CheckerRunning"
1090
property = "CheckerRunning"
1106
1091
values_to_set = [dbus.Boolean(False)]
1108
1093
class TestApproveByDefaultCmd(TestPropertyCmd):
1109
1094
command = ApproveByDefaultCmd
1110
propname = "ApprovedByDefault"
1095
property = "ApprovedByDefault"
1111
1096
values_to_set = [dbus.Boolean(True)]
1113
1098
class TestDenyByDefaultCmd(TestPropertyCmd):
1114
1099
command = DenyByDefaultCmd
1115
propname = "ApprovedByDefault"
1100
property = "ApprovedByDefault"
1116
1101
values_to_set = [dbus.Boolean(False)]
1118
1103
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1128
1113
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1129
1114
command = SetCheckerCmd
1130
propname = "Checker"
1115
property = "Checker"
1131
1116
values_to_set = ["", ":", "fping -q -- %s"]
1133
1118
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1134
1119
command = SetHostCmd
1136
1121
values_to_set = ["192.0.2.3", "foo.example.org"]
1138
1123
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1139
1124
command = SetSecretCmd
1141
1126
values_to_set = [io.BytesIO(b""),
1142
1127
io.BytesIO(b"secret\0xyzzy\nbar")]
1143
1128
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1145
1130
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1146
1131
command = SetTimeoutCmd
1147
propname = "Timeout"
1132
property = "Timeout"
1148
1133
values_to_set = [datetime.timedelta(),
1149
1134
datetime.timedelta(minutes=5),
1150
1135
datetime.timedelta(seconds=1),
1155
1140
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1156
1141
command = SetExtendedTimeoutCmd
1157
propname = "ExtendedTimeout"
1142
property = "ExtendedTimeout"
1158
1143
values_to_set = [datetime.timedelta(),
1159
1144
datetime.timedelta(minutes=5),
1160
1145
datetime.timedelta(seconds=1),
1165
1150
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1166
1151
command = SetIntervalCmd
1167
propname = "Interval"
1152
property = "Interval"
1168
1153
values_to_set = [datetime.timedelta(),
1169
1154
datetime.timedelta(minutes=5),
1170
1155
datetime.timedelta(seconds=1),
1175
1160
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1176
1161
command = SetApprovalDelayCmd
1177
propname = "ApprovalDelay"
1162
property = "ApprovalDelay"
1178
1163
values_to_set = [datetime.timedelta(),
1179
1164
datetime.timedelta(minutes=5),
1180
1165
datetime.timedelta(seconds=1),
1185
1170
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1186
1171
command = SetApprovalDurationCmd
1187
propname = "ApprovalDuration"
1172
property = "ApprovalDuration"
1188
1173
values_to_set = [datetime.timedelta(),
1189
1174
datetime.timedelta(minutes=5),
1190
1175
datetime.timedelta(seconds=1),
1370
1355
def test_is_enabled_short(self):
1371
1356
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1373
def test_deny_before_remove(self):
1374
options = self.parser.parse_args(["--deny", "--remove", "foo"])
1375
check_option_syntax(self.parser, options)
1376
commands = commands_from_options(options)
1377
self.assertEqual(len(commands), 2)
1378
self.assertIsInstance(commands[0], DenyCmd)
1379
self.assertIsInstance(commands[1], RemoveCmd)
1381
def test_deny_before_remove_reversed(self):
1382
options = self.parser.parse_args(["--remove", "--deny", "--all"])
1383
check_option_syntax(self.parser, options)
1384
commands = commands_from_options(options)
1385
self.assertEqual(len(commands), 2)
1386
self.assertIsInstance(commands[0], DenyCmd)
1387
self.assertIsInstance(commands[1], RemoveCmd)
1390
1359
class Test_check_option_syntax(unittest.TestCase):
1391
1360
# This mostly corresponds to the definition from has_actions() in
1506
1475
with self.assertParseError():
1507
1476
self.check_option_syntax(options)
1509
def test_remove_can_only_be_combined_with_action_deny(self):
1510
for action, value in self.actions.items():
1511
if action in {"remove", "deny"}:
1513
options = self.parser.parse_args()
1514
setattr(options, action, value)
1516
options.remove = True
1517
with self.assertParseError():
1518
self.check_option_syntax(options)
1522
1480
def should_only_run_tests():