267
223
value += datetime.timedelta(0, 0, 0, int(num))
271
## Classes for commands.
273
# Abstract classes first
274
class Command(object):
275
"""Abstract class for commands"""
276
def run(self, mandos, clients):
277
"""Normal commands should implement run_on_one_client(), but
278
commands which want to operate on all clients at the same time
279
can override this run() method instead."""
281
for client, properties in clients.items():
282
self.run_on_one_client(client, properties)
284
class PrintCmd(Command):
285
"""Abstract class for commands printing client details"""
286
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
287
"Created", "Interval", "Host", "KeyID",
288
"Fingerprint", "CheckerRunning", "LastEnabled",
289
"ApprovalPending", "ApprovedByDefault",
290
"LastApprovalRequest", "ApprovalDelay",
291
"ApprovalDuration", "Checker", "ExtendedTimeout",
292
"Expires", "LastCheckerStatus")
293
def run(self, mandos, clients):
294
print(self.output(clients))
296
class PropertyCmd(Command):
297
"""Abstract class for Actions for setting one client property"""
298
def run_on_one_client(self, client, properties):
299
"""Set the Client's D-Bus property"""
300
client.Set(client_interface, self.property, self.value_to_set,
301
dbus_interface=dbus.PROPERTIES_IFACE)
303
class ValueArgumentMixIn(object):
304
"""Mixin class for commands taking a value as argument"""
305
def __init__(self, value):
306
self.value_to_set = value
308
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
309
"""Mixin class for commands taking a value argument as
312
def value_to_set(self):
315
def value_to_set(self, value):
316
"""When setting, convert value to a datetime.timedelta"""
317
self._vts = string_to_delta(value).total_seconds() * 1000
319
# Actual (non-abstract) command classes
321
class PrintTableCmd(PrintCmd):
322
def __init__(self, verbose=False):
323
self.verbose = verbose
325
def output(self, clients):
326
default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
327
keywords = default_keywords
329
keywords = self.all_keywords
330
return str(self.TableOfClients(clients.values(), keywords))
332
class TableOfClients(object):
335
"Enabled": "Enabled",
336
"Timeout": "Timeout",
337
"LastCheckedOK": "Last Successful Check",
338
"LastApprovalRequest": "Last Approval Request",
339
"Created": "Created",
340
"Interval": "Interval",
342
"Fingerprint": "Fingerprint",
344
"CheckerRunning": "Check Is Running",
345
"LastEnabled": "Last Enabled",
346
"ApprovalPending": "Approval Is Pending",
347
"ApprovedByDefault": "Approved By Default",
348
"ApprovalDelay": "Approval Delay",
349
"ApprovalDuration": "Approval Duration",
350
"Checker": "Checker",
351
"ExtendedTimeout": "Extended Timeout",
352
"Expires": "Expires",
353
"LastCheckerStatus": "Last Checker Status",
356
def __init__(self, clients, keywords, tableheaders=None):
357
self.clients = clients
358
self.keywords = keywords
359
if tableheaders is not None:
360
self.tableheaders = tableheaders
363
return "\n".join(self.rows())
365
if sys.version_info.major == 2:
366
__unicode__ = __str__
368
return str(self).encode(locale.getpreferredencoding())
371
format_string = self.row_formatting_string()
372
rows = [self.header_line(format_string)]
373
rows.extend(self.client_line(client, format_string)
374
for client in self.clients)
377
def row_formatting_string(self):
378
"Format string used to format table rows"
379
return " ".join("{{{key}:{width}}}".format(
380
width=max(len(self.tableheaders[key]),
381
*(len(self.string_from_client(client, key))
382
for client in self.clients)),
384
for key in self.keywords)
386
def string_from_client(self, client, key):
387
return self.valuetostring(client[key], key)
390
def valuetostring(value, keyword):
391
if isinstance(value, dbus.Boolean):
392
return "Yes" if value else "No"
393
if keyword in ("Timeout", "Interval", "ApprovalDelay",
394
"ApprovalDuration", "ExtendedTimeout"):
395
return milliseconds_to_string(value)
398
def header_line(self, format_string):
399
return format_string.format(**self.tableheaders)
401
def client_line(self, client, format_string):
402
return format_string.format(
403
**{key: self.string_from_client(client, key)
404
for key in self.keywords})
408
class DumpJSONCmd(PrintCmd):
409
def output(self, clients):
410
data = {client["Name"]:
411
{key: self.dbus_boolean_to_bool(client[key])
412
for key in self.all_keywords}
413
for client in clients.values()}
414
return json.dumps(data, indent=4, separators=(',', ': '))
416
def dbus_boolean_to_bool(value):
417
if isinstance(value, dbus.Boolean):
421
class IsEnabledCmd(Command):
422
def run_on_one_client(self, client, properties):
423
if self.is_enabled(client, properties):
426
def is_enabled(self, client, properties):
427
return bool(properties["Enabled"])
429
class RemoveCmd(Command):
430
def run_on_one_client(self, client, properties):
431
self.mandos.RemoveClient(client.__dbus_object_path__)
433
class ApproveCmd(Command):
434
def run_on_one_client(self, client, properties):
435
client.Approve(dbus.Boolean(True),
436
dbus_interface=client_interface)
438
class DenyCmd(Command):
439
def run_on_one_client(self, client, properties):
440
client.Approve(dbus.Boolean(False),
441
dbus_interface=client_interface)
443
class EnableCmd(PropertyCmd):
445
value_to_set = dbus.Boolean(True)
447
class DisableCmd(PropertyCmd):
449
value_to_set = dbus.Boolean(False)
451
class BumpTimeoutCmd(PropertyCmd):
452
property = "LastCheckedOK"
455
class StartCheckerCmd(PropertyCmd):
456
property = "CheckerRunning"
457
value_to_set = dbus.Boolean(True)
459
class StopCheckerCmd(PropertyCmd):
460
property = "CheckerRunning"
461
value_to_set = dbus.Boolean(False)
463
class ApproveByDefaultCmd(PropertyCmd):
464
property = "ApprovedByDefault"
465
value_to_set = dbus.Boolean(True)
467
class DenyByDefaultCmd(PropertyCmd):
468
property = "ApprovedByDefault"
469
value_to_set = dbus.Boolean(False)
471
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
474
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
477
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
480
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
483
class SetExtendedTimeoutCmd(PropertyCmd,
484
MillisecondsValueArgumentMixIn):
485
property = "ExtendedTimeout"
487
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
488
property = "Interval"
490
class SetApprovalDelayCmd(PropertyCmd,
491
MillisecondsValueArgumentMixIn):
492
property = "ApprovalDelay"
494
class SetApprovalDurationCmd(PropertyCmd,
495
MillisecondsValueArgumentMixIn):
496
property = "ApprovalDuration"
226
def print_clients(clients, keywords):
227
def valuetostring(value, keyword):
228
if type(value) is dbus.Boolean:
229
return "Yes" if value else "No"
230
if keyword in ("Timeout", "Interval", "ApprovalDelay",
231
"ApprovalDuration", "ExtendedTimeout"):
232
return milliseconds_to_string(value)
233
return unicode(value)
235
# Create format string to print table rows
236
format_string = " ".join("{{{key}:{width}}}".format(
237
width = max(len(tablewords[key]),
238
max(len(valuetostring(client[key],
242
key = key) for key in keywords)
244
print(format_string.format(**tablewords))
245
for client in clients:
246
print(format_string.format(**dict((key,
247
valuetostring(client[key],
249
for key in keywords)))
498
251
def has_actions(options):
499
252
return any((options.enable,
552
299
help="Set extended timeout for client")
553
300
parser.add_argument("-i", "--interval",
554
301
help="Set checker interval for client")
555
approve_deny_default = parser.add_mutually_exclusive_group()
556
approve_deny_default.add_argument(
557
"--approve-by-default", action="store_true",
558
default=None, dest="approved_by_default",
559
help="Set client to be approved by default")
560
approve_deny_default.add_argument(
561
"--deny-by-default", action="store_false",
562
dest="approved_by_default",
563
help="Set client to be denied by default")
302
parser.add_argument("--approve-by-default", action="store_true",
303
default=None, dest="approved_by_default",
304
help="Set client to be approved by default")
305
parser.add_argument("--deny-by-default", action="store_false",
306
dest="approved_by_default",
307
help="Set client to be denied by default")
564
308
parser.add_argument("--approval-delay",
565
309
help="Set delay before client approve/deny")
566
310
parser.add_argument("--approval-duration",
567
311
help="Set duration of one client approval")
568
312
parser.add_argument("-H", "--host", help="Set host for client")
569
parser.add_argument("-s", "--secret",
570
type=argparse.FileType(mode="rb"),
313
parser.add_argument("-s", "--secret", type=file,
571
314
help="Set password blob (file) for client")
572
approve_deny = parser.add_mutually_exclusive_group()
573
approve_deny.add_argument(
574
"-A", "--approve", action="store_true",
575
help="Approve any current client request")
576
approve_deny.add_argument("-D", "--deny", action="store_true",
577
help="Deny any current client request")
315
parser.add_argument("-A", "--approve", action="store_true",
316
help="Approve any current client request")
317
parser.add_argument("-D", "--deny", action="store_true",
318
help="Deny any current client request")
578
319
parser.add_argument("--check", action="store_true",
579
320
help="Run self-test")
580
321
parser.add_argument("client", nargs="*", help="Client name")
583
def commands_from_options(options):
587
if options.dump_json:
588
commands.append(DumpJSONCmd())
591
commands.append(EnableCmd())
594
commands.append(DisableCmd())
596
if options.bump_timeout:
597
commands.append(BumpTimeoutCmd())
599
if options.start_checker:
600
commands.append(StartCheckerCmd())
602
if options.stop_checker:
603
commands.append(StopCheckerCmd())
605
if options.is_enabled:
606
commands.append(IsEnabledCmd())
609
commands.append(RemoveCmd())
611
if options.checker is not None:
612
commands.append(SetCheckerCmd())
614
if options.timeout is not None:
615
commands.append(SetTimeoutCmd(options.timeout))
617
if options.extended_timeout:
619
SetExtendedTimeoutCmd(options.extended_timeout))
621
if options.interval is not None:
622
command.append(SetIntervalCmd(options.interval))
624
if options.approved_by_default is not None:
625
if options.approved_by_default:
626
command.append(ApproveByDefaultCmd())
628
command.append(DenyByDefaultCmd())
630
if options.approval_delay is not None:
631
command.append(SetApprovalDelayCmd(options.approval_delay))
633
if options.approval_duration is not None:
635
SetApprovalDurationCmd(options.approval_duration))
637
if options.host is not None:
638
command.append(SetHostCmd(options.host))
640
if options.secret is not None:
641
command.append(SetSecretCmd(options.secret))
644
commands.append(ApproveCmd())
647
commands.append(DenyCmd())
649
# If no command option has been given, show table of clients,
650
# optionally verbosely
652
commands.append(PrintTableCmd(verbose=options.verbose))
658
parser = argparse.ArgumentParser()
660
add_command_line_options(parser)
662
322
options = parser.parse_args()
664
324
if has_actions(options) and not (options.client or options.all):
665
325
parser.error("Options require clients names or --all.")
666
326
if options.verbose and has_actions(options):
667
parser.error("--verbose can only be used alone.")
668
if options.dump_json and (options.verbose
669
or has_actions(options)):
670
parser.error("--dump-json can only be used alone.")
327
parser.error("--verbose can only be used alone or with"
671
329
if options.all and not has_actions(options):
672
330
parser.error("--all requires an action.")
673
if options.is_enabled and len(options.client) > 1:
674
parser.error("--is-enabled requires exactly one client")
676
clientnames = options.client
333
fail_count, test_count = doctest.testmod()
334
sys.exit(0 if fail_count == 0 else 1)
679
337
bus = dbus.SystemBus()
680
338
mandos_dbus_objc = bus.get_object(busname, server_path)
681
339
except dbus.exceptions.DBusException:
682
log.critical("Could not connect to Mandos server")
340
print("Could not connect to Mandos server",
685
344
mandos_serv = dbus.Interface(mandos_dbus_objc,
686
dbus_interface=server_interface)
687
mandos_serv_object_manager = dbus.Interface(
688
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
690
# Filter out log message from dbus module
691
dbus_logger = logging.getLogger("dbus.proxies")
692
class NullFilter(logging.Filter):
693
def filter(self, record):
695
dbus_filter = NullFilter()
345
dbus_interface = server_interface)
347
#block stderr since dbus library prints to stderr
348
null = os.open(os.path.devnull, os.O_RDWR)
349
stderrcopy = os.dup(sys.stderr.fileno())
350
os.dup2(null, sys.stderr.fileno())
697
dbus_logger.addFilter(dbus_filter)
698
mandos_clients = {path: ifs_and_props[client_interface]
699
for path, ifs_and_props in
700
mandos_serv_object_manager
701
.GetManagedObjects().items()
702
if client_interface in ifs_and_props}
703
except dbus.exceptions.DBusException as e:
704
log.critical("Failed to access Mandos server through D-Bus:"
354
mandos_clients = mandos_serv.GetAllClientsWithProperties()
357
os.dup2(stderrcopy, sys.stderr.fileno())
359
except dbus.exceptions.DBusException:
360
print("Access denied: Accessing mandos server through dbus.",
708
# restore dbus logger
709
dbus_logger.removeFilter(dbus_filter)
711
364
# Compile dict of (clients: properties) to process
715
clients = {bus.get_object(busname, path): properties
716
for path, properties in mandos_clients.items()}
367
if options.all or not options.client:
368
clients = dict((bus.get_object(busname, path), properties)
369
for path, properties in
370
mandos_clients.iteritems())
718
for name in clientnames:
719
for path, client in mandos_clients.items():
372
for name in options.client:
373
for path, client in mandos_clients.iteritems():
720
374
if client["Name"] == name:
721
375
client_objc = bus.get_object(busname, path)
722
376
clients[client_objc] = client
725
log.critical("Client not found on server: %r", name)
379
print("Client not found on server: {0!r}"
380
.format(name), file=sys.stderr)
728
# Run all commands on clients
729
commands = commands_from_options(options)
730
for command in commands:
731
command.run(mandos_serv, clients)
734
class Test_milliseconds_to_string(unittest.TestCase):
736
self.assertEqual(milliseconds_to_string(93785000),
738
def test_no_days(self):
739
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
740
def test_all_zero(self):
741
self.assertEqual(milliseconds_to_string(0), "00:00:00")
742
def test_no_fractional_seconds(self):
743
self.assertEqual(milliseconds_to_string(400), "00:00:00")
744
self.assertEqual(milliseconds_to_string(900), "00:00:00")
745
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
747
class Test_string_to_delta(unittest.TestCase):
748
def test_handles_basic_rfc3339(self):
749
self.assertEqual(string_to_delta("PT0S"),
750
datetime.timedelta())
751
self.assertEqual(string_to_delta("P0D"),
752
datetime.timedelta())
753
self.assertEqual(string_to_delta("PT1S"),
754
datetime.timedelta(0, 1))
755
self.assertEqual(string_to_delta("PT2H"),
756
datetime.timedelta(0, 7200))
757
def test_falls_back_to_pre_1_6_1_with_warning(self):
758
# assertLogs only exists in Python 3.4
759
if hasattr(self, "assertLogs"):
760
with self.assertLogs(log, logging.WARNING):
761
value = string_to_delta("2h")
763
class WarningFilter(logging.Filter):
764
"""Don't show, but record the presence of, warnings"""
765
def filter(self, record):
766
is_warning = record.levelno >= logging.WARNING
767
self.found = is_warning or getattr(self, "found",
769
return not is_warning
770
warning_filter = WarningFilter()
771
log.addFilter(warning_filter)
773
value = string_to_delta("2h")
775
log.removeFilter(warning_filter)
776
self.assertTrue(getattr(warning_filter, "found", False))
777
self.assertEqual(value, datetime.timedelta(0, 7200))
780
class TestCmd(unittest.TestCase):
781
"""Abstract class for tests of command classes"""
784
class MockClient(object):
785
def __init__(self, name, **attributes):
786
self.__dbus_object_path__ = "objpath_{}".format(name)
787
self.attributes = attributes
788
self.attributes["Name"] = name
790
def Set(self, interface, property, value, dbus_interface):
791
testcase.assertEqual(interface, client_interface)
792
testcase.assertEqual(dbus_interface,
793
dbus.PROPERTIES_IFACE)
794
self.attributes[property] = value
795
def Get(self, interface, property, dbus_interface):
796
testcase.assertEqual(interface, client_interface)
797
testcase.assertEqual(dbus_interface,
798
dbus.PROPERTIES_IFACE)
799
return self.attributes[property]
800
def Approve(self, approve, dbus_interface):
801
testcase.assertEqual(dbus_interface, client_interface)
802
self.calls.append(("Approve", (approve,
804
self.client = MockClient(
806
KeyID=("92ed150794387c03ce684574b1139a65"
807
"94a34f895daaaf09fd8ea90a27cddb12"),
809
Host="foo.example.org",
810
Enabled=dbus.Boolean(True),
812
LastCheckedOK="2019-02-03T00:00:00",
813
Created="2019-01-02T00:00:00",
815
Fingerprint=("778827225BA7DE539C5A"
816
"7CFA59CFF7CDBD9A5920"),
817
CheckerRunning=dbus.Boolean(False),
818
LastEnabled="2019-01-03T00:00:00",
819
ApprovalPending=dbus.Boolean(False),
820
ApprovedByDefault=dbus.Boolean(True),
821
LastApprovalRequest="",
823
ApprovalDuration=1000,
824
Checker="fping -q -- %(host)s",
825
ExtendedTimeout=900000,
826
Expires="2019-02-04T00:00:00",
828
self.other_client = MockClient(
830
KeyID=("0558568eedd67d622f5c83b35a115f79"
831
"6ab612cff5ad227247e46c2b020f441c"),
834
Enabled=dbus.Boolean(True),
836
LastCheckedOK="2019-02-04T00:00:00",
837
Created="2019-01-03T00:00:00",
839
Fingerprint=("3E393AEAEFB84C7E89E2"
840
"F547B3A107558FCA3A27"),
841
CheckerRunning=dbus.Boolean(True),
842
LastEnabled="2019-01-04T00:00:00",
843
ApprovalPending=dbus.Boolean(False),
844
ApprovedByDefault=dbus.Boolean(False),
845
LastApprovalRequest="2019-01-03T00:00:00",
847
ApprovalDuration=1000,
849
ExtendedTimeout=900000,
850
Expires="2019-02-05T00:00:00",
851
LastCheckerStatus=-2)
852
self.clients = collections.OrderedDict(
854
(self.client, self.client.attributes),
855
(self.other_client, self.other_client.attributes),
857
self.one_client = {self.client: self.client.attributes}
859
class TestPrintTableCmd(TestCmd):
860
def test_normal(self):
861
output = PrintTableCmd().output(self.clients)
862
expected_output = """
863
Name Enabled Timeout Last Successful Check
864
foo Yes 00:05:00 2019-02-03T00:00:00
865
barbar Yes 00:05:00 2019-02-04T00:00:00
867
self.assertEqual(output, expected_output)
868
def test_verbose(self):
869
output = PrintTableCmd(verbose=True).output(self.clients)
870
expected_output = """
871
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
872
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
873
barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04T00:00:00 No No 2019-01-03T00:00:00 00:00:30 00:00:01 : 00:15:00 2019-02-05T00:00:00 -2
875
self.assertEqual(output, expected_output)
876
def test_one_client(self):
877
output = PrintTableCmd().output(self.one_client)
878
expected_output = """
879
Name Enabled Timeout Last Successful Check
880
foo Yes 00:05:00 2019-02-03T00:00:00
882
self.assertEqual(output, expected_output)
884
class TestDumpJSONCmd(TestCmd):
886
self.expected_json = {
889
"KeyID": ("92ed150794387c03ce684574b1139a65"
890
"94a34f895daaaf09fd8ea90a27cddb12"),
891
"Host": "foo.example.org",
894
"LastCheckedOK": "2019-02-03T00:00:00",
895
"Created": "2019-01-02T00:00:00",
897
"Fingerprint": ("778827225BA7DE539C5A"
898
"7CFA59CFF7CDBD9A5920"),
899
"CheckerRunning": False,
900
"LastEnabled": "2019-01-03T00:00:00",
901
"ApprovalPending": False,
902
"ApprovedByDefault": True,
903
"LastApprovalRequest": "",
905
"ApprovalDuration": 1000,
906
"Checker": "fping -q -- %(host)s",
907
"ExtendedTimeout": 900000,
908
"Expires": "2019-02-04T00:00:00",
909
"LastCheckerStatus": 0,
913
"KeyID": ("0558568eedd67d622f5c83b35a115f79"
914
"6ab612cff5ad227247e46c2b020f441c"),
918
"LastCheckedOK": "2019-02-04T00:00:00",
919
"Created": "2019-01-03T00:00:00",
921
"Fingerprint": ("3E393AEAEFB84C7E89E2"
922
"F547B3A107558FCA3A27"),
923
"CheckerRunning": True,
924
"LastEnabled": "2019-01-04T00:00:00",
925
"ApprovalPending": False,
926
"ApprovedByDefault": False,
927
"LastApprovalRequest": "2019-01-03T00:00:00",
928
"ApprovalDelay": 30000,
929
"ApprovalDuration": 1000,
931
"ExtendedTimeout": 900000,
932
"Expires": "2019-02-05T00:00:00",
933
"LastCheckerStatus": -2,
936
return super(TestDumpJSONCmd, self).setUp()
937
def test_normal(self):
938
json_data = json.loads(DumpJSONCmd().output(self.clients))
939
self.assertDictEqual(json_data, self.expected_json)
940
def test_one_client(self):
941
clients = self.one_client
942
json_data = json.loads(DumpJSONCmd().output(clients))
943
expected_json = {"foo": self.expected_json["foo"]}
944
self.assertDictEqual(json_data, expected_json)
946
class TestIsEnabledCmd(TestCmd):
947
def test_is_enabled(self):
948
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
949
for client, properties in self.clients.items()))
950
def test_is_enabled_run_exits_successfully(self):
951
with self.assertRaises(SystemExit) as e:
952
IsEnabledCmd().run(None, self.one_client)
953
if e.exception.code is not None:
954
self.assertEqual(e.exception.code, 0)
956
self.assertIsNone(e.exception.code)
957
def test_is_enabled_run_exits_with_failure(self):
958
self.client.attributes["Enabled"] = dbus.Boolean(False)
959
with self.assertRaises(SystemExit) as e:
960
IsEnabledCmd().run(None, self.one_client)
961
if isinstance(e.exception.code, int):
962
self.assertNotEqual(e.exception.code, 0)
964
self.assertIsNotNone(e.exception.code)
966
class TestRemoveCmd(TestCmd):
967
def test_remove(self):
968
class MockMandos(object):
971
def RemoveClient(self, dbus_path):
972
self.calls.append(("RemoveClient", (dbus_path,)))
973
mandos = MockMandos()
974
super(TestRemoveCmd, self).setUp()
975
RemoveCmd().run(mandos, self.clients)
976
self.assertEqual(len(mandos.calls), 2)
977
for client in self.clients:
978
self.assertIn(("RemoveClient",
979
(client.__dbus_object_path__,)),
982
class TestApproveCmd(TestCmd):
983
def test_approve(self):
984
ApproveCmd().run(None, self.clients)
985
for client in self.clients:
986
self.assertIn(("Approve", (True, client_interface)),
989
class TestDenyCmd(TestCmd):
991
DenyCmd().run(None, self.clients)
992
for client in self.clients:
993
self.assertIn(("Approve", (False, client_interface)),
996
class TestEnableCmd(TestCmd):
997
def test_enable(self):
998
for client in self.clients:
999
client.attributes["Enabled"] = False
1001
EnableCmd().run(None, self.clients)
1003
for client in self.clients:
1004
self.assertTrue(client.attributes["Enabled"])
1006
class TestDisableCmd(TestCmd):
1007
def test_disable(self):
1008
DisableCmd().run(None, self.clients)
1010
for client in self.clients:
1011
self.assertFalse(client.attributes["Enabled"])
1013
class Unique(object):
1014
"""Class for objects which exist only to be unique objects, since
1015
unittest.mock.sentinel only exists in Python 3.3"""
1017
class TestPropertyCmd(TestCmd):
1018
"""Abstract class for tests of PropertyCmd classes"""
1020
if not hasattr(self, "command"):
1022
values_to_get = getattr(self, "values_to_get",
1024
for value_to_set, value_to_get in zip(self.values_to_set,
1026
for client in self.clients:
1027
old_value = client.attributes[self.property]
1028
self.assertNotIsInstance(old_value, Unique)
1029
client.attributes[self.property] = Unique()
1030
self.run_command(value_to_set, self.clients)
1031
for client in self.clients:
1032
value = client.attributes[self.property]
1033
self.assertNotIsInstance(value, Unique)
1034
self.assertEqual(value, value_to_get)
1035
def run_command(self, value, clients):
1036
self.command().run(None, clients)
1038
class TestBumpTimeoutCmd(TestPropertyCmd):
1039
command = BumpTimeoutCmd
1040
property = "LastCheckedOK"
1041
values_to_set = [""]
1043
class TestStartCheckerCmd(TestPropertyCmd):
1044
command = StartCheckerCmd
1045
property = "CheckerRunning"
1046
values_to_set = [dbus.Boolean(True)]
1048
class TestStopCheckerCmd(TestPropertyCmd):
1049
command = StopCheckerCmd
1050
property = "CheckerRunning"
1051
values_to_set = [dbus.Boolean(False)]
1053
class TestApproveByDefaultCmd(TestPropertyCmd):
1054
command = ApproveByDefaultCmd
1055
property = "ApprovedByDefault"
1056
values_to_set = [dbus.Boolean(True)]
1058
class TestDenyByDefaultCmd(TestPropertyCmd):
1059
command = DenyByDefaultCmd
1060
property = "ApprovedByDefault"
1061
values_to_set = [dbus.Boolean(False)]
1063
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1064
"""Abstract class for tests of PropertyCmd classes using the
1065
ValueArgumentMixIn"""
1067
if type(self) is TestValueArgumentPropertyCmd:
1069
return super(TestValueArgumentPropertyCmd, self).runTest()
1070
def run_command(self, value, clients):
1071
self.command(value).run(None, clients)
1073
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1074
command = SetCheckerCmd
1075
property = "Checker"
1076
values_to_set = ["", ":", "fping -q -- %s"]
1078
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1079
command = SetHostCmd
1081
values_to_set = ["192.0.2.3", "foo.example.org"]
1083
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1084
command = SetSecretCmd
1086
values_to_set = [b"", b"secret"]
1088
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1089
command = SetTimeoutCmd
1090
property = "Timeout"
1091
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1092
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1094
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1095
command = SetExtendedTimeoutCmd
1096
property = "ExtendedTimeout"
1097
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1098
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1100
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1101
command = SetIntervalCmd
1102
property = "Interval"
1103
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1104
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1106
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1107
command = SetApprovalDelayCmd
1108
property = "ApprovalDelay"
1109
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1110
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1112
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1113
command = SetApprovalDurationCmd
1114
property = "ApprovalDuration"
1115
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1116
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1118
class TestOptions(unittest.TestCase):
1120
self.parser = argparse.ArgumentParser()
1121
add_command_line_options(self.parser)
1122
def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1123
"""Assert that parsing ARGS should result in an instance of
1124
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1125
options = self.parser.parse_args(args)
1126
commands = commands_from_options(options)
1127
self.assertEqual(len(commands), 1)
1128
command = commands[0]
1129
self.assertIsInstance(command, command_cls)
1130
for key, value in cmd_attrs.items():
1131
self.assertEqual(getattr(command, key), value)
1132
def test_default_is_show_table(self):
1133
self.assert_command_from_args([], PrintTableCmd,
1135
def test_show_table_verbose(self):
1136
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1138
def test_enable(self):
1139
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1140
def test_disable(self):
1141
self.assert_command_from_args(["--disable", "foo"],
1146
def should_only_run_tests():
1147
parser = argparse.ArgumentParser(add_help=False)
1148
parser.add_argument("--check", action='store_true')
1149
args, unknown_args = parser.parse_known_args()
1150
run_tests = args.check
1152
# Remove --check argument from sys.argv
1153
sys.argv[1:] = unknown_args
1156
# Add all tests from doctest strings
1157
def load_tests(loader, tests, none):
1159
tests.addTests(doctest.DocTestSuite())
383
if not has_actions(options) and clients:
385
keywords = ("Name", "Enabled", "Timeout",
386
"LastCheckedOK", "Created", "Interval",
387
"Host", "Fingerprint", "CheckerRunning",
388
"LastEnabled", "ApprovalPending",
390
"LastApprovalRequest", "ApprovalDelay",
391
"ApprovalDuration", "Checker",
394
keywords = defaultkeywords
396
print_clients(clients.values(), keywords)
398
# Process each client in the list by all selected options
399
for client in clients:
400
def set_client_prop(prop, value):
401
"""Set a Client D-Bus property"""
402
client.Set(client_interface, prop, value,
403
dbus_interface=dbus.PROPERTIES_IFACE)
404
def set_client_prop_ms(prop, value):
405
"""Set a Client D-Bus property, converted
406
from a string to milliseconds."""
407
set_client_prop(prop,
408
timedelta_to_milliseconds
409
(string_to_delta(value)))
411
mandos_serv.RemoveClient(client.__dbus_object_path__)
413
set_client_prop("Enabled", dbus.Boolean(True))
415
set_client_prop("Enabled", dbus.Boolean(False))
416
if options.bump_timeout:
417
set_client_prop("LastCheckedOK", "")
418
if options.start_checker:
419
set_client_prop("CheckerRunning", dbus.Boolean(True))
420
if options.stop_checker:
421
set_client_prop("CheckerRunning", dbus.Boolean(False))
422
if options.is_enabled:
423
sys.exit(0 if client.Get(client_interface,
426
dbus.PROPERTIES_IFACE)
428
if options.checker is not None:
429
set_client_prop("Checker", options.checker)
430
if options.host is not None:
431
set_client_prop("Host", options.host)
432
if options.interval is not None:
433
set_client_prop_ms("Interval", options.interval)
434
if options.approval_delay is not None:
435
set_client_prop_ms("ApprovalDelay",
436
options.approval_delay)
437
if options.approval_duration is not None:
438
set_client_prop_ms("ApprovalDuration",
439
options.approval_duration)
440
if options.timeout is not None:
441
set_client_prop_ms("Timeout", options.timeout)
442
if options.extended_timeout is not None:
443
set_client_prop_ms("ExtendedTimeout",
444
options.extended_timeout)
445
if options.secret is not None:
446
set_client_prop("Secret",
447
dbus.ByteArray(options.secret.read()))
448
if options.approved_by_default is not None:
449
set_client_prop("ApprovedByDefault",
451
.approved_by_default))
453
client.Approve(dbus.Boolean(True),
454
dbus_interface=client_interface)
456
client.Approve(dbus.Boolean(False),
457
dbus_interface=client_interface)
1162
459
if __name__ == "__main__":
1163
if should_only_run_tests():
1164
# Call using ./tdd-python-script --check [--verbose]