425
415
parser.error("--dump-json can only be used alone.")
426
416
if options.all and not has_actions(options):
427
417
parser.error("--all requires an action.")
430
bus = dbus.SystemBus()
431
mandos_dbus_objc = bus.get_object(busname, server_path)
432
except dbus.exceptions.DBusException:
433
log.critical("Could not connect to Mandos server")
436
mandos_serv = dbus.Interface(mandos_dbus_objc,
437
dbus_interface=server_interface)
438
mandos_serv_object_manager = dbus.Interface(
439
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
441
# block stderr since dbus library prints to stderr
442
null = os.open(os.path.devnull, os.O_RDWR)
443
stderrcopy = os.dup(sys.stderr.fileno())
444
os.dup2(null, sys.stderr.fileno())
448
mandos_clients = {path: ifs_and_props[client_interface]
449
for path, ifs_and_props in
450
mandos_serv_object_manager
451
.GetManagedObjects().items()
452
if client_interface in ifs_and_props}
455
os.dup2(stderrcopy, sys.stderr.fileno())
418
if options.is_enabled and len(options.client) > 1:
419
parser.error("--is-enabled requires exactly one client")
421
options.remove = False
422
if has_actions(options) and not options.deny:
423
parser.error("--remove can only be combined with --deny")
424
options.remove = True
427
def get_mandos_dbus_object(bus):
428
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
429
dbus_busname, server_dbus_path)
430
with if_dbus_exception_log_with_exception_and_exit(
431
"Could not connect to Mandos server: %s"):
432
mandos_dbus_object = bus.get_object(dbus_busname,
434
return mandos_dbus_object
437
@contextlib.contextmanager
438
def if_dbus_exception_log_with_exception_and_exit(*args, **kwargs):
457
441
except dbus.exceptions.DBusException as e:
458
log.critical("Failed to access Mandos server through D-Bus:"
462
# Compile dict of (clients: properties) to process
465
if options.all or not options.client:
466
clients = {bus.get_object(busname, path): properties
467
for path, properties in mandos_clients.items()}
469
for name in options.client:
470
for path, client in mandos_clients.items():
471
if client["Name"] == name:
472
client_objc = bus.get_object(busname, path)
473
clients[client_objc] = client
476
log.critical("Client not found on server: %r", name)
479
if not has_actions(options) and clients:
480
if options.verbose or options.dump_json:
481
keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
482
"Created", "Interval", "Host", "KeyID",
483
"Fingerprint", "CheckerRunning",
484
"LastEnabled", "ApprovalPending",
485
"ApprovedByDefault", "LastApprovalRequest",
486
"ApprovalDelay", "ApprovalDuration",
487
"Checker", "ExtendedTimeout", "Expires",
490
keywords = defaultkeywords
492
if options.dump_json:
493
json.dump({client["Name"]: {key:
495
if isinstance(client[key],
499
for client in clients.values()},
500
fp=sys.stdout, indent=4,
501
separators=(',', ': '))
504
print_clients(clients.values(), keywords)
506
# Process each client in the list by all selected options
507
for client in clients:
509
def set_client_prop(prop, value):
510
"""Set a Client D-Bus property"""
511
client.Set(client_interface, prop, value,
512
dbus_interface=dbus.PROPERTIES_IFACE)
514
def set_client_prop_ms(prop, value):
515
"""Set a Client D-Bus property, converted
516
from a string to milliseconds."""
517
set_client_prop(prop,
518
string_to_delta(value).total_seconds()
522
mandos_serv.RemoveClient(client.__dbus_object_path__)
524
set_client_prop("Enabled", dbus.Boolean(True))
526
set_client_prop("Enabled", dbus.Boolean(False))
527
if options.bump_timeout:
528
set_client_prop("LastCheckedOK", "")
529
if options.start_checker:
530
set_client_prop("CheckerRunning", dbus.Boolean(True))
531
if options.stop_checker:
532
set_client_prop("CheckerRunning", dbus.Boolean(False))
533
if options.is_enabled:
534
if client.Get(client_interface, "Enabled",
535
dbus_interface=dbus.PROPERTIES_IFACE):
539
if options.checker is not None:
540
set_client_prop("Checker", options.checker)
541
if options.host is not None:
542
set_client_prop("Host", options.host)
543
if options.interval is not None:
544
set_client_prop_ms("Interval", options.interval)
545
if options.approval_delay is not None:
546
set_client_prop_ms("ApprovalDelay",
547
options.approval_delay)
548
if options.approval_duration is not None:
549
set_client_prop_ms("ApprovalDuration",
550
options.approval_duration)
551
if options.timeout is not None:
552
set_client_prop_ms("Timeout", options.timeout)
553
if options.extended_timeout is not None:
554
set_client_prop_ms("ExtendedTimeout",
555
options.extended_timeout)
556
if options.secret is not None:
557
set_client_prop("Secret",
558
dbus.ByteArray(options.secret.read()))
559
if options.approved_by_default is not None:
560
set_client_prop("ApprovedByDefault",
562
.approved_by_default))
564
client.Approve(dbus.Boolean(True),
565
dbus_interface=client_interface)
567
client.Approve(dbus.Boolean(False),
568
dbus_interface=client_interface)
442
log.critical(*(args + (e,)), **kwargs)
446
def get_managed_objects(object_manager):
447
log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
448
server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
449
with if_dbus_exception_log_with_exception_and_exit(
450
"Failed to access Mandos server through D-Bus:\n%s"):
451
with SilenceLogger("dbus.proxies"):
452
managed_objects = object_manager.GetManagedObjects()
453
return managed_objects
456
class SilenceLogger(object):
457
"Simple context manager to silence a particular logger"
458
def __init__(self, loggername):
459
self.logger = logging.getLogger(loggername)
462
self.logger.addFilter(self.nullfilter)
465
class NullFilter(logging.Filter):
466
def filter(self, record):
469
nullfilter = NullFilter()
471
def __exit__(self, exc_type, exc_val, exc_tb):
472
self.logger.removeFilter(self.nullfilter)
475
def commands_from_options(options):
479
if options.is_enabled:
480
commands.append(IsEnabledCmd())
483
commands.append(ApproveCmd())
486
commands.append(DenyCmd())
489
commands.append(RemoveCmd())
491
if options.dump_json:
492
commands.append(DumpJSONCmd())
495
commands.append(EnableCmd())
498
commands.append(DisableCmd())
500
if options.bump_timeout:
501
commands.append(BumpTimeoutCmd())
503
if options.start_checker:
504
commands.append(StartCheckerCmd())
506
if options.stop_checker:
507
commands.append(StopCheckerCmd())
509
if options.approved_by_default is not None:
510
if options.approved_by_default:
511
commands.append(ApproveByDefaultCmd())
513
commands.append(DenyByDefaultCmd())
515
if options.checker is not None:
516
commands.append(SetCheckerCmd(options.checker))
518
if options.host is not None:
519
commands.append(SetHostCmd(options.host))
521
if options.secret is not None:
522
commands.append(SetSecretCmd(options.secret))
524
if options.timeout is not None:
525
commands.append(SetTimeoutCmd(options.timeout))
527
if options.extended_timeout:
529
SetExtendedTimeoutCmd(options.extended_timeout))
531
if options.interval is not None:
532
commands.append(SetIntervalCmd(options.interval))
534
if options.approval_delay is not None:
535
commands.append(SetApprovalDelayCmd(options.approval_delay))
537
if options.approval_duration is not None:
539
SetApprovalDurationCmd(options.approval_duration))
541
# If no command option has been given, show table of clients,
542
# optionally verbosely
544
commands.append(PrintTableCmd(verbose=options.verbose))
549
class Command(object):
550
"""Abstract class for commands"""
551
def run(self, clients, bus=None, mandos=None):
552
"""Normal commands should implement run_on_one_client(), but
553
commands which want to operate on all clients at the same time
554
can override this run() method instead."""
556
for clientpath, properties in clients.items():
557
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
558
dbus_busname, str(clientpath))
559
client = bus.get_object(dbus_busname, clientpath)
560
self.run_on_one_client(client, properties)
563
class IsEnabledCmd(Command):
564
def run(self, clients, bus=None, mandos=None):
565
client, properties = next(iter(clients.items()))
566
if self.is_enabled(client, properties):
569
def is_enabled(self, client, properties):
570
return properties["Enabled"]
573
class ApproveCmd(Command):
574
def run_on_one_client(self, client, properties):
575
log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
576
client.__dbus_object_path__, client_dbus_interface)
577
client.Approve(dbus.Boolean(True),
578
dbus_interface=client_dbus_interface)
581
class DenyCmd(Command):
582
def run_on_one_client(self, client, properties):
583
log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
584
client.__dbus_object_path__, client_dbus_interface)
585
client.Approve(dbus.Boolean(False),
586
dbus_interface=client_dbus_interface)
589
class RemoveCmd(Command):
590
def run_on_one_client(self, client, properties):
591
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname,
592
server_dbus_path, server_dbus_interface,
593
str(client.__dbus_object_path__))
594
self.mandos.RemoveClient(client.__dbus_object_path__)
597
class OutputCmd(Command):
598
"""Abstract class for commands outputting client details"""
599
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
600
"Created", "Interval", "Host", "KeyID",
601
"Fingerprint", "CheckerRunning", "LastEnabled",
602
"ApprovalPending", "ApprovedByDefault",
603
"LastApprovalRequest", "ApprovalDelay",
604
"ApprovalDuration", "Checker", "ExtendedTimeout",
605
"Expires", "LastCheckerStatus")
607
def run(self, clients, bus=None, mandos=None):
608
print(self.output(clients.values()))
610
def output(self, clients):
611
raise NotImplementedError()
614
class DumpJSONCmd(OutputCmd):
615
def output(self, clients):
616
data = {client["Name"]:
617
{key: self.dbus_boolean_to_bool(client[key])
618
for key in self.all_keywords}
619
for client in clients}
620
return json.dumps(data, indent=4, separators=(',', ': '))
623
def dbus_boolean_to_bool(value):
624
if isinstance(value, dbus.Boolean):
629
class PrintTableCmd(OutputCmd):
630
def __init__(self, verbose=False):
631
self.verbose = verbose
633
def output(self, clients):
634
default_keywords = ("Name", "Enabled", "Timeout",
636
keywords = default_keywords
638
keywords = self.all_keywords
639
return str(self.TableOfClients(clients, keywords))
641
class TableOfClients(object):
644
"Enabled": "Enabled",
645
"Timeout": "Timeout",
646
"LastCheckedOK": "Last Successful Check",
647
"LastApprovalRequest": "Last Approval Request",
648
"Created": "Created",
649
"Interval": "Interval",
651
"Fingerprint": "Fingerprint",
653
"CheckerRunning": "Check Is Running",
654
"LastEnabled": "Last Enabled",
655
"ApprovalPending": "Approval Is Pending",
656
"ApprovedByDefault": "Approved By Default",
657
"ApprovalDelay": "Approval Delay",
658
"ApprovalDuration": "Approval Duration",
659
"Checker": "Checker",
660
"ExtendedTimeout": "Extended Timeout",
661
"Expires": "Expires",
662
"LastCheckerStatus": "Last Checker Status",
665
def __init__(self, clients, keywords):
666
self.clients = clients
667
self.keywords = keywords
670
return "\n".join(self.rows())
672
if sys.version_info.major == 2:
673
__unicode__ = __str__
675
return str(self).encode(locale.getpreferredencoding())
678
format_string = self.row_formatting_string()
679
rows = [self.header_line(format_string)]
680
rows.extend(self.client_line(client, format_string)
681
for client in self.clients)
684
def row_formatting_string(self):
685
"Format string used to format table rows"
686
return " ".join("{{{key}:{width}}}".format(
687
width=max(len(self.tableheaders[key]),
688
*(len(self.string_from_client(client, key))
689
for client in self.clients)),
691
for key in self.keywords)
693
def string_from_client(self, client, key):
694
return self.valuetostring(client[key], key)
697
def valuetostring(cls, value, keyword):
698
if isinstance(value, dbus.Boolean):
699
return "Yes" if value else "No"
700
if keyword in ("Timeout", "Interval", "ApprovalDelay",
701
"ApprovalDuration", "ExtendedTimeout"):
702
return cls.milliseconds_to_string(value)
705
def header_line(self, format_string):
706
return format_string.format(**self.tableheaders)
708
def client_line(self, client, format_string):
709
return format_string.format(
710
**{key: self.string_from_client(client, key)
711
for key in self.keywords})
714
def milliseconds_to_string(ms):
715
td = datetime.timedelta(0, 0, 0, ms)
716
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
717
.format(days="{}T".format(td.days)
719
hours=td.seconds // 3600,
720
minutes=(td.seconds % 3600) // 60,
721
seconds=td.seconds % 60))
724
class PropertyCmd(Command):
725
"""Abstract class for Actions for setting one client property"""
727
def run_on_one_client(self, client, properties):
728
"""Set the Client's D-Bus property"""
729
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
730
client.__dbus_object_path__,
731
dbus.PROPERTIES_IFACE, client_dbus_interface,
732
self.propname, self.value_to_set
733
if not isinstance(self.value_to_set, dbus.Boolean)
734
else bool(self.value_to_set))
735
client.Set(client_dbus_interface, self.propname,
737
dbus_interface=dbus.PROPERTIES_IFACE)
741
raise NotImplementedError()
744
class EnableCmd(PropertyCmd):
746
value_to_set = dbus.Boolean(True)
749
class DisableCmd(PropertyCmd):
751
value_to_set = dbus.Boolean(False)
754
class BumpTimeoutCmd(PropertyCmd):
755
propname = "LastCheckedOK"
759
class StartCheckerCmd(PropertyCmd):
760
propname = "CheckerRunning"
761
value_to_set = dbus.Boolean(True)
764
class StopCheckerCmd(PropertyCmd):
765
propname = "CheckerRunning"
766
value_to_set = dbus.Boolean(False)
769
class ApproveByDefaultCmd(PropertyCmd):
770
propname = "ApprovedByDefault"
771
value_to_set = dbus.Boolean(True)
774
class DenyByDefaultCmd(PropertyCmd):
775
propname = "ApprovedByDefault"
776
value_to_set = dbus.Boolean(False)
779
class PropertyValueCmd(PropertyCmd):
780
"""Abstract class for PropertyCmd recieving a value as argument"""
781
def __init__(self, value):
782
self.value_to_set = value
785
class SetCheckerCmd(PropertyValueCmd):
789
class SetHostCmd(PropertyValueCmd):
793
class SetSecretCmd(PropertyValueCmd):
797
def value_to_set(self):
801
def value_to_set(self, value):
802
"""When setting, read data from supplied file object"""
803
self._vts = value.read()
807
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
808
"""Abstract class for PropertyValueCmd taking a value argument as
809
a datetime.timedelta() but should store it as milliseconds."""
812
def value_to_set(self):
816
def value_to_set(self, value):
817
"""When setting, convert value from a datetime.timedelta"""
818
self._vts = int(round(value.total_seconds() * 1000))
821
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
825
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
826
propname = "ExtendedTimeout"
829
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
830
propname = "Interval"
833
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
834
propname = "ApprovalDelay"
837
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
838
propname = "ApprovalDuration"
571
class Test_milliseconds_to_string(unittest.TestCase):
573
self.assertEqual(milliseconds_to_string(93785000),
575
def test_no_days(self):
576
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
577
def test_all_zero(self):
578
self.assertEqual(milliseconds_to_string(0), "00:00:00")
579
def test_no_fractional_seconds(self):
580
self.assertEqual(milliseconds_to_string(400), "00:00:00")
581
self.assertEqual(milliseconds_to_string(900), "00:00:00")
582
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
584
842
class Test_string_to_delta(unittest.TestCase):
585
843
def test_handles_basic_rfc3339(self):
844
self.assertEqual(string_to_delta("PT0S"),
845
datetime.timedelta())
846
self.assertEqual(string_to_delta("P0D"),
847
datetime.timedelta())
848
self.assertEqual(string_to_delta("PT1S"),
849
datetime.timedelta(0, 1))
586
850
self.assertEqual(string_to_delta("PT2H"),
587
851
datetime.timedelta(0, 7200))
588
853
def test_falls_back_to_pre_1_6_1_with_warning(self):
589
854
# assertLogs only exists in Python 3.4
590
855
if hasattr(self, "assertLogs"):
591
856
with self.assertLogs(log, logging.WARNING):
592
857
value = string_to_delta("2h")
594
value = string_to_delta("2h")
859
class WarningFilter(logging.Filter):
860
"""Don't show, but record the presence of, warnings"""
861
def filter(self, record):
862
is_warning = record.levelno >= logging.WARNING
863
self.found = is_warning or getattr(self, "found",
865
return not is_warning
866
warning_filter = WarningFilter()
867
log.addFilter(warning_filter)
869
value = string_to_delta("2h")
871
log.removeFilter(warning_filter)
872
self.assertTrue(getattr(warning_filter, "found", False))
595
873
self.assertEqual(value, datetime.timedelta(0, 7200))
597
class Test_TableOfClients(unittest.TestCase):
603
"Bool": "A D-BUS Boolean",
604
"NonDbusBoolean": "A Non-D-BUS Boolean",
605
"Integer": "An Integer",
606
"Timeout": "Timedelta 1",
607
"Interval": "Timedelta 2",
608
"ApprovalDelay": "Timedelta 3",
609
"ApprovalDuration": "Timedelta 4",
610
"ExtendedTimeout": "Timedelta 5",
611
"String": "A String",
876
class Test_check_option_syntax(unittest.TestCase):
878
self.parser = argparse.ArgumentParser()
879
add_command_line_options(self.parser)
881
def test_actions_requires_client_or_all(self):
882
for action, value in self.actions.items():
883
options = self.parser.parse_args()
884
setattr(options, action, value)
885
with self.assertParseError():
886
self.check_option_syntax(options)
888
# This mostly corresponds to the definition from has_actions() in
889
# check_option_syntax()
891
# The actual values set here are not that important, but we do
892
# at least stick to the correct types, even though they are
896
"bump_timeout": True,
897
"start_checker": True,
898
"stop_checker": True,
902
"timeout": datetime.timedelta(),
903
"extended_timeout": datetime.timedelta(),
904
"interval": datetime.timedelta(),
905
"approved_by_default": True,
906
"approval_delay": datetime.timedelta(),
907
"approval_duration": datetime.timedelta(),
909
"secret": io.BytesIO(b"x"),
914
@contextlib.contextmanager
915
def assertParseError(self):
916
with self.assertRaises(SystemExit) as e:
917
with self.temporarily_suppress_stderr():
919
# Exit code from argparse is guaranteed to be "2". Reference:
920
# https://docs.python.org/3/library
921
# /argparse.html#exiting-methods
922
self.assertEqual(e.exception.code, 2)
925
@contextlib.contextmanager
926
def temporarily_suppress_stderr():
927
null = os.open(os.path.devnull, os.O_RDWR)
928
stderrcopy = os.dup(sys.stderr.fileno())
929
os.dup2(null, sys.stderr.fileno())
935
os.dup2(stderrcopy, sys.stderr.fileno())
938
def check_option_syntax(self, options):
939
check_option_syntax(self.parser, options)
941
def test_actions_conflicts_with_verbose(self):
942
for action, value in self.actions.items():
943
options = self.parser.parse_args()
944
setattr(options, action, value)
945
options.verbose = True
946
with self.assertParseError():
947
self.check_option_syntax(options)
949
def test_dump_json_conflicts_with_verbose(self):
950
options = self.parser.parse_args()
951
options.dump_json = True
952
options.verbose = True
953
with self.assertParseError():
954
self.check_option_syntax(options)
956
def test_dump_json_conflicts_with_action(self):
957
for action, value in self.actions.items():
958
options = self.parser.parse_args()
959
setattr(options, action, value)
960
options.dump_json = True
961
with self.assertParseError():
962
self.check_option_syntax(options)
964
def test_all_can_not_be_alone(self):
965
options = self.parser.parse_args()
967
with self.assertParseError():
968
self.check_option_syntax(options)
970
def test_all_is_ok_with_any_action(self):
971
for action, value in self.actions.items():
972
options = self.parser.parse_args()
973
setattr(options, action, value)
975
self.check_option_syntax(options)
977
def test_is_enabled_fails_without_client(self):
978
options = self.parser.parse_args()
979
options.is_enabled = True
980
with self.assertParseError():
981
self.check_option_syntax(options)
983
def test_is_enabled_works_with_one_client(self):
984
options = self.parser.parse_args()
985
options.is_enabled = True
986
options.client = ["foo"]
987
self.check_option_syntax(options)
989
def test_is_enabled_fails_with_two_clients(self):
990
options = self.parser.parse_args()
991
options.is_enabled = True
992
options.client = ["foo", "barbar"]
993
with self.assertParseError():
994
self.check_option_syntax(options)
996
def test_remove_can_only_be_combined_with_action_deny(self):
997
for action, value in self.actions.items():
998
if action in {"remove", "deny"}:
1000
options = self.parser.parse_args()
1001
setattr(options, action, value)
1003
options.remove = True
1004
with self.assertParseError():
1005
self.check_option_syntax(options)
1008
class Test_get_mandos_dbus_object(unittest.TestCase):
1009
def test_calls_and_returns_get_object_on_bus(self):
1010
class MockBus(object):
1012
def get_object(mockbus_self, busname, dbus_path):
1013
# Note that "self" is still the testcase instance,
1014
# this MockBus instance is in "mockbus_self".
1015
self.assertEqual(busname, dbus_busname)
1016
self.assertEqual(dbus_path, server_dbus_path)
1017
mockbus_self.called = True
1020
mockbus = get_mandos_dbus_object(bus=MockBus())
1021
self.assertIsInstance(mockbus, MockBus)
1022
self.assertTrue(mockbus.called)
1024
def test_logs_and_exits_on_dbus_error(self):
1025
class MockBusFailing(object):
1026
def get_object(self, busname, dbus_path):
1027
raise dbus.exceptions.DBusException("Test")
1029
# assertLogs only exists in Python 3.4
1030
if hasattr(self, "assertLogs"):
1031
with self.assertLogs(log, logging.CRITICAL):
1032
with self.assertRaises(SystemExit) as e:
1033
bus = get_mandos_dbus_object(bus=MockBus())
1035
critical_filter = self.CriticalFilter()
1036
log.addFilter(critical_filter)
1038
with self.assertRaises(SystemExit) as e:
1039
get_mandos_dbus_object(bus=MockBusFailing())
1041
log.removeFilter(critical_filter)
1042
self.assertTrue(critical_filter.found)
1043
if isinstance(e.exception.code, int):
1044
self.assertNotEqual(e.exception.code, 0)
1046
self.assertIsNotNone(e.exception.code)
1048
class CriticalFilter(logging.Filter):
1049
"""Don't show, but register, critical messages"""
1051
def filter(self, record):
1052
is_critical = record.levelno >= logging.CRITICAL
1053
self.found = is_critical or self.found
1054
return not is_critical
1057
class Test_get_managed_objects(unittest.TestCase):
1058
def test_calls_and_returns_GetManagedObjects(self):
1059
managed_objects = {"/clients/foo": { "Name": "foo"}}
1060
class MockObjectManager(object):
1062
def GetManagedObjects():
1063
return managed_objects
1064
retval = get_managed_objects(MockObjectManager())
1065
self.assertDictEqual(managed_objects, retval)
1067
def test_logs_and_exits_on_dbus_error(self):
1068
class MockObjectManagerFailing(object):
1070
def GetManagedObjects():
1071
raise dbus.exceptions.DBusException("Test")
1073
if hasattr(self, "assertLogs"):
1074
with self.assertLogs(log, logging.CRITICAL):
1075
with self.assertRaises(SystemExit):
1076
get_managed_objects(MockObjectManagerFailing())
1078
critical_filter = self.CriticalFilter()
1079
log.addFilter(critical_filter)
1081
with self.assertRaises(SystemExit) as e:
1082
get_managed_objects(MockObjectManagerFailing())
1084
log.removeFilter(critical_filter)
1085
self.assertTrue(critical_filter.found)
1086
if isinstance(e.exception.code, int):
1087
self.assertNotEqual(e.exception.code, 0)
1089
self.assertIsNotNone(e.exception.code)
1091
class CriticalFilter(logging.Filter):
1092
"""Don't show, but register, critical messages"""
1094
def filter(self, record):
1095
is_critical = record.levelno >= logging.CRITICAL
1096
self.found = is_critical or self.found
1097
return not is_critical
1100
class Test_SilenceLogger(unittest.TestCase):
1101
loggername = "mandos-ctl.Test_SilenceLogger"
1102
log = logging.getLogger(loggername)
1103
log.propagate = False
1104
log.addHandler(logging.NullHandler())
1107
self.counting_filter = self.CountingFilter()
1109
class CountingFilter(logging.Filter):
1110
"Count number of records"
1112
def filter(self, record):
1116
def test_should_filter_records_only_when_active(self):
1118
with SilenceLogger(self.loggername):
1119
self.log.addFilter(self.counting_filter)
1120
self.log.info("Filtered log message 1")
1121
self.log.info("Non-filtered message 2")
1122
self.log.info("Non-filtered message 3")
1124
self.log.removeFilter(self.counting_filter)
1125
self.assertEqual(self.counting_filter.count, 2)
1128
class Test_commands_from_options(unittest.TestCase):
1130
self.parser = argparse.ArgumentParser()
1131
add_command_line_options(self.parser)
1133
def test_is_enabled(self):
1134
self.assert_command_from_args(["--is-enabled", "foo"],
1137
def assert_command_from_args(self, args, command_cls,
1139
"""Assert that parsing ARGS should result in an instance of
1140
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1141
options = self.parser.parse_args(args)
1142
check_option_syntax(self.parser, options)
1143
commands = commands_from_options(options)
1144
self.assertEqual(len(commands), 1)
1145
command = commands[0]
1146
self.assertIsInstance(command, command_cls)
1147
for key, value in cmd_attrs.items():
1148
self.assertEqual(getattr(command, key), value)
1150
def test_is_enabled_short(self):
1151
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1153
def test_approve(self):
1154
self.assert_command_from_args(["--approve", "foo"],
1157
def test_approve_short(self):
1158
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1160
def test_deny(self):
1161
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1163
def test_deny_short(self):
1164
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1166
def test_remove(self):
1167
self.assert_command_from_args(["--remove", "foo"],
1170
def test_deny_before_remove(self):
1171
options = self.parser.parse_args(["--deny", "--remove",
1173
check_option_syntax(self.parser, options)
1174
commands = commands_from_options(options)
1175
self.assertEqual(len(commands), 2)
1176
self.assertIsInstance(commands[0], DenyCmd)
1177
self.assertIsInstance(commands[1], RemoveCmd)
1179
def test_deny_before_remove_reversed(self):
1180
options = self.parser.parse_args(["--remove", "--deny",
1182
check_option_syntax(self.parser, options)
1183
commands = commands_from_options(options)
1184
self.assertEqual(len(commands), 2)
1185
self.assertIsInstance(commands[0], DenyCmd)
1186
self.assertIsInstance(commands[1], RemoveCmd)
1188
def test_remove_short(self):
1189
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1191
def test_dump_json(self):
1192
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1194
def test_enable(self):
1195
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1197
def test_enable_short(self):
1198
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1200
def test_disable(self):
1201
self.assert_command_from_args(["--disable", "foo"],
1204
def test_disable_short(self):
1205
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1207
def test_bump_timeout(self):
1208
self.assert_command_from_args(["--bump-timeout", "foo"],
1211
def test_bump_timeout_short(self):
1212
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1214
def test_start_checker(self):
1215
self.assert_command_from_args(["--start-checker", "foo"],
1218
def test_stop_checker(self):
1219
self.assert_command_from_args(["--stop-checker", "foo"],
1222
def test_approve_by_default(self):
1223
self.assert_command_from_args(["--approve-by-default", "foo"],
1224
ApproveByDefaultCmd)
1226
def test_deny_by_default(self):
1227
self.assert_command_from_args(["--deny-by-default", "foo"],
1230
def test_checker(self):
1231
self.assert_command_from_args(["--checker", ":", "foo"],
1232
SetCheckerCmd, value_to_set=":")
1234
def test_checker_empty(self):
1235
self.assert_command_from_args(["--checker", "", "foo"],
1236
SetCheckerCmd, value_to_set="")
1238
def test_checker_short(self):
1239
self.assert_command_from_args(["-c", ":", "foo"],
1240
SetCheckerCmd, value_to_set=":")
1242
def test_host(self):
1243
self.assert_command_from_args(["--host", "foo.example.org",
1245
value_to_set="foo.example.org")
1247
def test_host_short(self):
1248
self.assert_command_from_args(["-H", "foo.example.org",
1250
value_to_set="foo.example.org")
1252
def test_secret_devnull(self):
1253
self.assert_command_from_args(["--secret", os.path.devnull,
1254
"foo"], SetSecretCmd,
1257
def test_secret_tempfile(self):
1258
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1259
value = b"secret\0xyzzy\nbar"
1262
self.assert_command_from_args(["--secret", f.name,
1263
"foo"], SetSecretCmd,
1266
def test_secret_devnull_short(self):
1267
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1268
SetSecretCmd, value_to_set=b"")
1270
def test_secret_tempfile_short(self):
1271
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1272
value = b"secret\0xyzzy\nbar"
1275
self.assert_command_from_args(["-s", f.name, "foo"],
1279
def test_timeout(self):
1280
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1282
value_to_set=300000)
1284
def test_timeout_short(self):
1285
self.assert_command_from_args(["-t", "PT5M", "foo"],
1287
value_to_set=300000)
1289
def test_extended_timeout(self):
1290
self.assert_command_from_args(["--extended-timeout", "PT15M",
1292
SetExtendedTimeoutCmd,
1293
value_to_set=900000)
1295
def test_interval(self):
1296
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1298
value_to_set=120000)
1300
def test_interval_short(self):
1301
self.assert_command_from_args(["-i", "PT2M", "foo"],
1303
value_to_set=120000)
1305
def test_approval_delay(self):
1306
self.assert_command_from_args(["--approval-delay", "PT30S",
1307
"foo"], SetApprovalDelayCmd,
1310
def test_approval_duration(self):
1311
self.assert_command_from_args(["--approval-duration", "PT1S",
1312
"foo"], SetApprovalDurationCmd,
1315
def test_print_table(self):
1316
self.assert_command_from_args([], PrintTableCmd,
1319
def test_print_table_verbose(self):
1320
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1323
def test_print_table_verbose_short(self):
1324
self.assert_command_from_args(["-v"], PrintTableCmd,
1328
class TestCmd(unittest.TestCase):
1329
"""Abstract class for tests of command classes"""
1333
class MockClient(object):
1334
def __init__(self, name, **attributes):
1335
self.__dbus_object_path__ = "/clients/{}".format(name)
1336
self.attributes = attributes
1337
self.attributes["Name"] = name
1339
def Set(self, interface, propname, value, dbus_interface):
1340
testcase.assertEqual(interface, client_dbus_interface)
1341
testcase.assertEqual(dbus_interface,
1342
dbus.PROPERTIES_IFACE)
1343
self.attributes[propname] = value
1344
def Get(self, interface, propname, dbus_interface):
1345
testcase.assertEqual(interface, client_dbus_interface)
1346
testcase.assertEqual(dbus_interface,
1347
dbus.PROPERTIES_IFACE)
1348
return self.attributes[propname]
1349
def Approve(self, approve, dbus_interface):
1350
testcase.assertEqual(dbus_interface,
1351
client_dbus_interface)
1352
self.calls.append(("Approve", (approve,
1354
self.client = MockClient(
1356
KeyID=("92ed150794387c03ce684574b1139a65"
1357
"94a34f895daaaf09fd8ea90a27cddb12"),
1359
Host="foo.example.org",
1360
Enabled=dbus.Boolean(True),
1362
LastCheckedOK="2019-02-03T00:00:00",
1363
Created="2019-01-02T00:00:00",
1365
Fingerprint=("778827225BA7DE539C5A"
1366
"7CFA59CFF7CDBD9A5920"),
1367
CheckerRunning=dbus.Boolean(False),
1368
LastEnabled="2019-01-03T00:00:00",
1369
ApprovalPending=dbus.Boolean(False),
1370
ApprovedByDefault=dbus.Boolean(True),
1371
LastApprovalRequest="",
1373
ApprovalDuration=1000,
1374
Checker="fping -q -- %(host)s",
1375
ExtendedTimeout=900000,
1376
Expires="2019-02-04T00:00:00",
1377
LastCheckerStatus=0)
1378
self.other_client = MockClient(
1380
KeyID=("0558568eedd67d622f5c83b35a115f79"
1381
"6ab612cff5ad227247e46c2b020f441c"),
1382
Secret=b"secretbar",
1384
Enabled=dbus.Boolean(True),
1386
LastCheckedOK="2019-02-04T00:00:00",
1387
Created="2019-01-03T00:00:00",
1389
Fingerprint=("3E393AEAEFB84C7E89E2"
1390
"F547B3A107558FCA3A27"),
1391
CheckerRunning=dbus.Boolean(True),
1392
LastEnabled="2019-01-04T00:00:00",
1393
ApprovalPending=dbus.Boolean(False),
1394
ApprovedByDefault=dbus.Boolean(False),
1395
LastApprovalRequest="2019-01-03T00:00:00",
1396
ApprovalDelay=30000,
1397
ApprovalDuration=93785000,
1399
ExtendedTimeout=900000,
1400
Expires="2019-02-05T00:00:00",
1401
LastCheckerStatus=-2)
1402
self.clients = collections.OrderedDict(
1404
("/clients/foo", self.client.attributes),
1405
("/clients/barbar", self.other_client.attributes),
1407
self.one_client = {"/clients/foo": self.client.attributes}
1413
def get_object(client_bus_name, path):
1414
self.assertEqual(client_bus_name, dbus_busname)
1416
# Note: "self" here is the TestCmd instance, not
1417
# the Bus instance, since this is a static method!
1418
"/clients/foo": self.client,
1419
"/clients/barbar": self.other_client,
1424
class TestIsEnabledCmd(TestCmd):
1425
def test_is_enabled(self):
1426
self.assertTrue(all(IsEnabledCmd().is_enabled(client,
1428
for client, properties
1429
in self.clients.items()))
1431
def test_is_enabled_run_exits_successfully(self):
1432
with self.assertRaises(SystemExit) as e:
1433
IsEnabledCmd().run(self.one_client)
1434
if e.exception.code is not None:
1435
self.assertEqual(e.exception.code, 0)
1437
self.assertIsNone(e.exception.code)
1439
def test_is_enabled_run_exits_with_failure(self):
1440
self.client.attributes["Enabled"] = dbus.Boolean(False)
1441
with self.assertRaises(SystemExit) as e:
1442
IsEnabledCmd().run(self.one_client)
1443
if isinstance(e.exception.code, int):
1444
self.assertNotEqual(e.exception.code, 0)
1446
self.assertIsNotNone(e.exception.code)
1449
class TestApproveCmd(TestCmd):
1450
def test_approve(self):
1451
ApproveCmd().run(self.clients, self.bus)
1452
for clientpath in self.clients:
1453
client = self.bus.get_object(dbus_busname, clientpath)
1454
self.assertIn(("Approve", (True, client_dbus_interface)),
1458
class TestDenyCmd(TestCmd):
1459
def test_deny(self):
1460
DenyCmd().run(self.clients, self.bus)
1461
for clientpath in self.clients:
1462
client = self.bus.get_object(dbus_busname, clientpath)
1463
self.assertIn(("Approve", (False, client_dbus_interface)),
1467
class TestRemoveCmd(TestCmd):
1468
def test_remove(self):
1469
class MockMandos(object):
1472
def RemoveClient(self, dbus_path):
1473
self.calls.append(("RemoveClient", (dbus_path,)))
1474
mandos = MockMandos()
1475
super(TestRemoveCmd, self).setUp()
1476
RemoveCmd().run(self.clients, self.bus, mandos)
1477
self.assertEqual(len(mandos.calls), 2)
1478
for clientpath in self.clients:
1479
self.assertIn(("RemoveClient", (clientpath,)),
1483
class TestDumpJSONCmd(TestCmd):
1485
self.expected_json = {
1488
"KeyID": ("92ed150794387c03ce684574b1139a65"
1489
"94a34f895daaaf09fd8ea90a27cddb12"),
1490
"Host": "foo.example.org",
1493
"LastCheckedOK": "2019-02-03T00:00:00",
1494
"Created": "2019-01-02T00:00:00",
1496
"Fingerprint": ("778827225BA7DE539C5A"
1497
"7CFA59CFF7CDBD9A5920"),
1498
"CheckerRunning": False,
1499
"LastEnabled": "2019-01-03T00:00:00",
1500
"ApprovalPending": False,
1501
"ApprovedByDefault": True,
1502
"LastApprovalRequest": "",
1504
"ApprovalDuration": 1000,
1505
"Checker": "fping -q -- %(host)s",
1506
"ExtendedTimeout": 900000,
1507
"Expires": "2019-02-04T00:00:00",
1508
"LastCheckerStatus": 0,
1512
"KeyID": ("0558568eedd67d622f5c83b35a115f79"
1513
"6ab612cff5ad227247e46c2b020f441c"),
1514
"Host": "192.0.2.3",
1517
"LastCheckedOK": "2019-02-04T00:00:00",
1518
"Created": "2019-01-03T00:00:00",
1520
"Fingerprint": ("3E393AEAEFB84C7E89E2"
1521
"F547B3A107558FCA3A27"),
1522
"CheckerRunning": True,
1523
"LastEnabled": "2019-01-04T00:00:00",
1524
"ApprovalPending": False,
1525
"ApprovedByDefault": False,
1526
"LastApprovalRequest": "2019-01-03T00:00:00",
1527
"ApprovalDelay": 30000,
1528
"ApprovalDuration": 93785000,
1530
"ExtendedTimeout": 900000,
1531
"Expires": "2019-02-05T00:00:00",
1532
"LastCheckerStatus": -2,
613
self.keywords = ["Attr1", "AttrTwo"]
619
"Bool": dbus.Boolean(False),
620
"NonDbusBoolean": False,
624
"ApprovalDelay": 2000,
625
"ApprovalDuration": 3000,
626
"ExtendedTimeout": 4000,
633
"Bool": dbus.Boolean(True),
634
"NonDbusBoolean": True,
637
"Interval": 93786000,
638
"ApprovalDelay": 93787000,
639
"ApprovalDuration": 93788000,
640
"ExtendedTimeout": 93789000,
641
"String": "A huge string which will not fit," * 10,
644
def test_short_header(self):
645
rows = TableOfClients(self.clients, self.keywords,
646
self.tablewords).rows()
651
self.assertEqual(rows, expected_rows)
652
def test_booleans(self):
653
keywords = ["Bool", "NonDbusBoolean"]
654
rows = TableOfClients(self.clients, keywords,
655
self.tablewords).rows()
657
"A D-BUS Boolean A Non-D-BUS Boolean",
661
self.assertEqual(rows, expected_rows)
662
def test_milliseconds_detection(self):
663
keywords = ["Integer", "Timeout", "Interval", "ApprovalDelay",
664
"ApprovalDuration", "ExtendedTimeout"]
665
rows = TableOfClients(self.clients, keywords,
666
self.tablewords).rows()
668
An Integer Timedelta 1 Timedelta 2 Timedelta 3 Timedelta 4 Timedelta 5
669
0 00:00:00 00:00:01 00:00:02 00:00:03 00:00:04
670
1 1T02:03:05 1T02:03:06 1T02:03:07 1T02:03:08 1T02:03:09
673
self.assertEqual(rows, expected_rows)
674
def test_empty_and_long_string_values(self):
675
keywords = ["String"]
676
rows = TableOfClients(self.clients, keywords,
677
self.tablewords).rows()
681
A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,
684
self.assertEqual(rows, expected_rows)
1535
return super(TestDumpJSONCmd, self).setUp()
1537
def test_normal(self):
1538
output = DumpJSONCmd().output(self.clients.values())
1539
json_data = json.loads(output)
1540
self.assertDictEqual(json_data, self.expected_json)
1542
def test_one_client(self):
1543
output = DumpJSONCmd().output(self.one_client.values())
1544
json_data = json.loads(output)
1545
expected_json = {"foo": self.expected_json["foo"]}
1546
self.assertDictEqual(json_data, expected_json)
1549
class TestPrintTableCmd(TestCmd):
1550
def test_normal(self):
1551
output = PrintTableCmd().output(self.clients.values())
1552
expected_output = "\n".join((
1553
"Name Enabled Timeout Last Successful Check",
1554
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1555
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1557
self.assertEqual(output, expected_output)
1559
def test_verbose(self):
1560
output = PrintTableCmd(verbose=True).output(
1561
self.clients.values())
1576
"Last Successful Check ",
1577
"2019-02-03T00:00:00 ",
1578
"2019-02-04T00:00:00 ",
1581
"2019-01-02T00:00:00 ",
1582
"2019-01-03T00:00:00 ",
1594
("92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8"
1596
("0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e"
1600
"778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 ",
1601
"3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 ",
1603
"Check Is Running ",
1608
"2019-01-03T00:00:00 ",
1609
"2019-01-04T00:00:00 ",
1611
"Approval Is Pending ",
1615
"Approved By Default ",
1619
"Last Approval Request ",
1621
"2019-01-03T00:00:00 ",
1627
"Approval Duration ",
1632
"fping -q -- %(host)s ",
1635
"Extended Timeout ",
1640
"2019-02-04T00:00:00 ",
1641
"2019-02-05T00:00:00 ",
1643
"Last Checker Status",
1648
num_lines = max(len(rows) for rows in columns)
1649
expected_output = "\n".join("".join(rows[line]
1650
for rows in columns)
1651
for line in range(num_lines))
1652
self.assertEqual(output, expected_output)
1654
def test_one_client(self):
1655
output = PrintTableCmd().output(self.one_client.values())
1656
expected_output = "\n".join((
1657
"Name Enabled Timeout Last Successful Check",
1658
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1660
self.assertEqual(output, expected_output)
1663
class TestPropertyCmd(TestCmd):
1664
"""Abstract class for tests of PropertyCmd classes"""
1666
if not hasattr(self, "command"):
1668
values_to_get = getattr(self, "values_to_get",
1670
for value_to_set, value_to_get in zip(self.values_to_set,
1672
for clientpath in self.clients:
1673
client = self.bus.get_object(dbus_busname, clientpath)
1674
old_value = client.attributes[self.propname]
1675
self.assertNotIsInstance(old_value, self.Unique)
1676
client.attributes[self.propname] = self.Unique()
1677
self.run_command(value_to_set, self.clients)
1678
for clientpath in self.clients:
1679
client = self.bus.get_object(dbus_busname, clientpath)
1680
value = client.attributes[self.propname]
1681
self.assertNotIsInstance(value, self.Unique)
1682
self.assertEqual(value, value_to_get)
1684
class Unique(object):
1685
"""Class for objects which exist only to be unique objects,
1686
since unittest.mock.sentinel only exists in Python 3.3"""
1688
def run_command(self, value, clients):
1689
self.command().run(clients, self.bus)
1692
class TestEnableCmd(TestPropertyCmd):
1694
propname = "Enabled"
1695
values_to_set = [dbus.Boolean(True)]
1698
class TestDisableCmd(TestPropertyCmd):
1699
command = DisableCmd
1700
propname = "Enabled"
1701
values_to_set = [dbus.Boolean(False)]
1704
class TestBumpTimeoutCmd(TestPropertyCmd):
1705
command = BumpTimeoutCmd
1706
propname = "LastCheckedOK"
1707
values_to_set = [""]
1710
class TestStartCheckerCmd(TestPropertyCmd):
1711
command = StartCheckerCmd
1712
propname = "CheckerRunning"
1713
values_to_set = [dbus.Boolean(True)]
1716
class TestStopCheckerCmd(TestPropertyCmd):
1717
command = StopCheckerCmd
1718
propname = "CheckerRunning"
1719
values_to_set = [dbus.Boolean(False)]
1722
class TestApproveByDefaultCmd(TestPropertyCmd):
1723
command = ApproveByDefaultCmd
1724
propname = "ApprovedByDefault"
1725
values_to_set = [dbus.Boolean(True)]
1728
class TestDenyByDefaultCmd(TestPropertyCmd):
1729
command = DenyByDefaultCmd
1730
propname = "ApprovedByDefault"
1731
values_to_set = [dbus.Boolean(False)]
1734
class TestPropertyValueCmd(TestPropertyCmd):
1735
"""Abstract class for tests of PropertyValueCmd classes"""
1738
if type(self) is TestPropertyValueCmd:
1740
return super(TestPropertyValueCmd, self).runTest()
1742
def run_command(self, value, clients):
1743
self.command(value).run(clients, self.bus)
1746
class TestSetCheckerCmd(TestPropertyValueCmd):
1747
command = SetCheckerCmd
1748
propname = "Checker"
1749
values_to_set = ["", ":", "fping -q -- %s"]
1752
class TestSetHostCmd(TestPropertyValueCmd):
1753
command = SetHostCmd
1755
values_to_set = ["192.0.2.3", "foo.example.org"]
1758
class TestSetSecretCmd(TestPropertyValueCmd):
1759
command = SetSecretCmd
1761
values_to_set = [io.BytesIO(b""),
1762
io.BytesIO(b"secret\0xyzzy\nbar")]
1763
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1766
class TestSetTimeoutCmd(TestPropertyValueCmd):
1767
command = SetTimeoutCmd
1768
propname = "Timeout"
1769
values_to_set = [datetime.timedelta(),
1770
datetime.timedelta(minutes=5),
1771
datetime.timedelta(seconds=1),
1772
datetime.timedelta(weeks=1),
1773
datetime.timedelta(weeks=52)]
1774
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1777
class TestSetExtendedTimeoutCmd(TestPropertyValueCmd):
1778
command = SetExtendedTimeoutCmd
1779
propname = "ExtendedTimeout"
1780
values_to_set = [datetime.timedelta(),
1781
datetime.timedelta(minutes=5),
1782
datetime.timedelta(seconds=1),
1783
datetime.timedelta(weeks=1),
1784
datetime.timedelta(weeks=52)]
1785
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1788
class TestSetIntervalCmd(TestPropertyValueCmd):
1789
command = SetIntervalCmd
1790
propname = "Interval"
1791
values_to_set = [datetime.timedelta(),
1792
datetime.timedelta(minutes=5),
1793
datetime.timedelta(seconds=1),
1794
datetime.timedelta(weeks=1),
1795
datetime.timedelta(weeks=52)]
1796
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1799
class TestSetApprovalDelayCmd(TestPropertyValueCmd):
1800
command = SetApprovalDelayCmd
1801
propname = "ApprovalDelay"
1802
values_to_set = [datetime.timedelta(),
1803
datetime.timedelta(minutes=5),
1804
datetime.timedelta(seconds=1),
1805
datetime.timedelta(weeks=1),
1806
datetime.timedelta(weeks=52)]
1807
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1810
class TestSetApprovalDurationCmd(TestPropertyValueCmd):
1811
command = SetApprovalDurationCmd
1812
propname = "ApprovalDuration"
1813
values_to_set = [datetime.timedelta(),
1814
datetime.timedelta(minutes=5),
1815
datetime.timedelta(seconds=1),
1816
datetime.timedelta(weeks=1),
1817
datetime.timedelta(weeks=52)]
1818
values_to_get = [0, 300000, 1000, 604800000, 31449600000]