491
610
"ApprovalDelay", "ApprovalDuration",
492
611
"Checker", "ExtendedTimeout", "Expires",
493
612
"LastCheckerStatus")
495
keywords = defaultkeywords
497
if options.dump_json:
498
json.dump({client["Name"]: {key:
500
if isinstance(client[key],
504
for client in clients.values()},
505
fp=sys.stdout, indent=4,
506
separators=(',', ': '))
509
print(TableOfClients(clients.values(), keywords))
511
# Process each client in the list by all selected options
512
for client in clients:
514
def set_client_prop(prop, value):
515
"""Set a Client D-Bus property"""
516
client.Set(client_interface, prop, value,
517
dbus_interface=dbus.PROPERTIES_IFACE)
519
def set_client_prop_ms(prop, value):
520
"""Set a Client D-Bus property, converted
521
from a string to milliseconds."""
522
set_client_prop(prop,
523
string_to_delta(value).total_seconds()
527
mandos_serv.RemoveClient(client.__dbus_object_path__)
529
set_client_prop("Enabled", dbus.Boolean(True))
531
set_client_prop("Enabled", dbus.Boolean(False))
532
if options.bump_timeout:
533
set_client_prop("LastCheckedOK", "")
534
if options.start_checker:
535
set_client_prop("CheckerRunning", dbus.Boolean(True))
536
if options.stop_checker:
537
set_client_prop("CheckerRunning", dbus.Boolean(False))
538
if options.is_enabled:
539
if client.Get(client_interface, "Enabled",
540
dbus_interface=dbus.PROPERTIES_IFACE):
544
if options.checker is not None:
545
set_client_prop("Checker", options.checker)
546
if options.host is not None:
547
set_client_prop("Host", options.host)
548
if options.interval is not None:
549
set_client_prop_ms("Interval", options.interval)
550
if options.approval_delay is not None:
551
set_client_prop_ms("ApprovalDelay",
552
options.approval_delay)
553
if options.approval_duration is not None:
554
set_client_prop_ms("ApprovalDuration",
555
options.approval_duration)
556
if options.timeout is not None:
557
set_client_prop_ms("Timeout", options.timeout)
558
if options.extended_timeout is not None:
559
set_client_prop_ms("ExtendedTimeout",
560
options.extended_timeout)
561
if options.secret is not None:
562
set_client_prop("Secret",
563
dbus.ByteArray(options.secret.read()))
564
if options.approved_by_default is not None:
565
set_client_prop("ApprovedByDefault",
567
.approved_by_default))
569
client.Approve(dbus.Boolean(True),
570
dbus_interface=client_interface)
572
client.Approve(dbus.Boolean(False),
573
dbus_interface=client_interface)
615
class DumpJSON(Output):
616
def run(self, clients, bus=None, mandos=None):
617
data = {client["Name"]:
618
{key: self.dbus_boolean_to_bool(client[key])
619
for key in self.all_keywords}
620
for client in clients.values()}
621
print(json.dumps(data, indent=4, separators=(',', ': ')))
624
def dbus_boolean_to_bool(value):
625
if isinstance(value, dbus.Boolean):
630
class PrintTable(Output):
631
def __init__(self, verbose=False):
632
self.verbose = verbose
634
def run(self, clients, bus=None, mandos=None):
635
default_keywords = ("Name", "Enabled", "Timeout",
637
keywords = default_keywords
639
keywords = self.all_keywords
640
print(self.TableOfClients(clients.values(), keywords))
642
class TableOfClients(object):
645
"Enabled": "Enabled",
646
"Timeout": "Timeout",
647
"LastCheckedOK": "Last Successful Check",
648
"LastApprovalRequest": "Last Approval Request",
649
"Created": "Created",
650
"Interval": "Interval",
652
"Fingerprint": "Fingerprint",
654
"CheckerRunning": "Check Is Running",
655
"LastEnabled": "Last Enabled",
656
"ApprovalPending": "Approval Is Pending",
657
"ApprovedByDefault": "Approved By Default",
658
"ApprovalDelay": "Approval Delay",
659
"ApprovalDuration": "Approval Duration",
660
"Checker": "Checker",
661
"ExtendedTimeout": "Extended Timeout",
662
"Expires": "Expires",
663
"LastCheckerStatus": "Last Checker Status",
666
def __init__(self, clients, keywords):
667
self.clients = clients
668
self.keywords = keywords
671
return "\n".join(self.rows())
673
if sys.version_info.major == 2:
674
__unicode__ = __str__
676
return str(self).encode(
677
locale.getpreferredencoding())
680
format_string = self.row_formatting_string()
681
rows = [self.header_line(format_string)]
682
rows.extend(self.client_line(client, format_string)
683
for client in self.clients)
686
def row_formatting_string(self):
687
"Format string used to format table rows"
688
return " ".join("{{{key}:{width}}}".format(
689
width=max(len(self.tableheaders[key]),
690
*(len(self.string_from_client(client,
692
for client in self.clients)),
694
for key in self.keywords)
696
def string_from_client(self, client, key):
697
return self.valuetostring(client[key], key)
700
def valuetostring(cls, value, keyword):
701
if isinstance(value, dbus.Boolean):
702
return "Yes" if value else "No"
703
if keyword in ("Timeout", "Interval", "ApprovalDelay",
704
"ApprovalDuration", "ExtendedTimeout"):
705
return cls.milliseconds_to_string(value)
708
def header_line(self, format_string):
709
return format_string.format(**self.tableheaders)
711
def client_line(self, client, format_string):
712
return format_string.format(
713
**{key: self.string_from_client(client, key)
714
for key in self.keywords})
717
def milliseconds_to_string(ms):
718
td = datetime.timedelta(0, 0, 0, ms)
719
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
720
.format(days="{}T".format(td.days)
722
hours=td.seconds // 3600,
723
minutes=(td.seconds % 3600) // 60,
724
seconds=td.seconds % 60))
727
class PropertySetter(Base):
728
"Abstract class for Actions for setting one client property"
730
def run_on_one_client(self, client, properties):
731
"""Set the Client's D-Bus property"""
732
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
733
client.__dbus_object_path__,
734
dbus.PROPERTIES_IFACE, client_dbus_interface,
735
self.propname, self.value_to_set
736
if not isinstance(self.value_to_set,
738
else bool(self.value_to_set))
739
client.Set(client_dbus_interface, self.propname,
741
dbus_interface=dbus.PROPERTIES_IFACE)
745
raise NotImplementedError()
748
class Enable(PropertySetter):
750
value_to_set = dbus.Boolean(True)
753
class Disable(PropertySetter):
755
value_to_set = dbus.Boolean(False)
758
class BumpTimeout(PropertySetter):
759
propname = "LastCheckedOK"
763
class StartChecker(PropertySetter):
764
propname = "CheckerRunning"
765
value_to_set = dbus.Boolean(True)
768
class StopChecker(PropertySetter):
769
propname = "CheckerRunning"
770
value_to_set = dbus.Boolean(False)
773
class ApproveByDefault(PropertySetter):
774
propname = "ApprovedByDefault"
775
value_to_set = dbus.Boolean(True)
778
class DenyByDefault(PropertySetter):
779
propname = "ApprovedByDefault"
780
value_to_set = dbus.Boolean(False)
783
class PropertySetterValue(PropertySetter):
784
"""Abstract class for PropertySetter recieving a value as
785
constructor argument instead of a class attribute."""
786
def __init__(self, value):
787
self.value_to_set = value
790
class SetChecker(PropertySetterValue):
794
class SetHost(PropertySetterValue):
798
class SetSecret(PropertySetterValue):
802
def value_to_set(self):
806
def value_to_set(self, value):
807
"""When setting, read data from supplied file object"""
808
self._vts = value.read()
812
class PropertySetterValueMilliseconds(PropertySetterValue):
813
"""Abstract class for PropertySetterValue taking a value
814
argument as a datetime.timedelta() but should store it as
818
def value_to_set(self):
822
def value_to_set(self, value):
823
"When setting, convert value from a datetime.timedelta"
824
self._vts = int(round(value.total_seconds() * 1000))
827
class SetTimeout(PropertySetterValueMilliseconds):
831
class SetExtendedTimeout(PropertySetterValueMilliseconds):
832
propname = "ExtendedTimeout"
835
class SetInterval(PropertySetterValueMilliseconds):
836
propname = "Interval"
839
class SetApprovalDelay(PropertySetterValueMilliseconds):
840
propname = "ApprovalDelay"
843
class SetApprovalDuration(PropertySetterValueMilliseconds):
844
propname = "ApprovalDuration"
576
class Test_milliseconds_to_string(unittest.TestCase):
578
self.assertEqual(milliseconds_to_string(93785000),
580
def test_no_days(self):
581
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
582
def test_all_zero(self):
583
self.assertEqual(milliseconds_to_string(0), "00:00:00")
584
def test_no_fractional_seconds(self):
585
self.assertEqual(milliseconds_to_string(400), "00:00:00")
586
self.assertEqual(milliseconds_to_string(900), "00:00:00")
587
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
589
class Test_string_to_delta(unittest.TestCase):
590
def test_handles_basic_rfc3339(self):
591
self.assertEqual(string_to_delta("PT2H"),
592
datetime.timedelta(0, 7200))
848
class TestCaseWithAssertLogs(unittest.TestCase):
849
"""unittest.TestCase.assertLogs only exists in Python 3.4"""
851
if not hasattr(unittest.TestCase, "assertLogs"):
852
@contextlib.contextmanager
853
def assertLogs(self, logger, level=logging.INFO):
854
capturing_handler = self.CapturingLevelHandler(level)
855
old_level = logger.level
856
old_propagate = logger.propagate
857
logger.addHandler(capturing_handler)
858
logger.setLevel(level)
859
logger.propagate = False
861
yield capturing_handler.watcher
863
logger.propagate = old_propagate
864
logger.removeHandler(capturing_handler)
865
logger.setLevel(old_level)
866
self.assertGreater(len(capturing_handler.watcher.records),
869
class CapturingLevelHandler(logging.Handler):
870
def __init__(self, level, *args, **kwargs):
871
logging.Handler.__init__(self, *args, **kwargs)
872
self.watcher = self.LoggingWatcher([], [])
873
def emit(self, record):
874
self.watcher.records.append(record)
875
self.watcher.output.append(self.format(record))
877
LoggingWatcher = collections.namedtuple("LoggingWatcher",
882
class Test_string_to_delta(TestCaseWithAssertLogs):
883
# Just test basic RFC 3339 functionality here, the doc string for
884
# rfc3339_duration_to_delta() already has more comprehensive
885
# tests, which is run by doctest.
887
def test_rfc3339_zero_seconds(self):
888
self.assertEqual(datetime.timedelta(),
889
string_to_delta("PT0S"))
891
def test_rfc3339_zero_days(self):
892
self.assertEqual(datetime.timedelta(), string_to_delta("P0D"))
894
def test_rfc3339_one_second(self):
895
self.assertEqual(datetime.timedelta(0, 1),
896
string_to_delta("PT1S"))
898
def test_rfc3339_two_hours(self):
899
self.assertEqual(datetime.timedelta(0, 7200),
900
string_to_delta("PT2H"))
593
902
def test_falls_back_to_pre_1_6_1_with_warning(self):
594
# assertLogs only exists in Python 3.4
595
if hasattr(self, "assertLogs"):
596
with self.assertLogs(log, logging.WARNING):
597
value = string_to_delta("2h")
903
with self.assertLogs(log, logging.WARNING):
599
904
value = string_to_delta("2h")
600
self.assertEqual(value, datetime.timedelta(0, 7200))
602
class Test_TableOfClients(unittest.TestCase):
608
"Bool": "A D-BUS Boolean",
609
"NonDbusBoolean": "A Non-D-BUS Boolean",
610
"Integer": "An Integer",
611
"Timeout": "Timedelta 1",
612
"Interval": "Timedelta 2",
613
"ApprovalDelay": "Timedelta 3",
614
"ApprovalDuration": "Timedelta 4",
615
"ExtendedTimeout": "Timedelta 5",
616
"String": "A String",
618
self.keywords = ["Attr1", "AttrTwo"]
624
"Bool": dbus.Boolean(False),
625
"NonDbusBoolean": False,
629
"ApprovalDelay": 2000,
630
"ApprovalDuration": 3000,
631
"ExtendedTimeout": 4000,
638
"Bool": dbus.Boolean(True),
639
"NonDbusBoolean": True,
642
"Interval": 93786000,
643
"ApprovalDelay": 93787000,
644
"ApprovalDuration": 93788000,
645
"ExtendedTimeout": 93789000,
646
"String": "A huge string which will not fit," * 10,
649
def test_short_header(self):
650
text = str(TableOfClients(self.clients, self.keywords,
657
self.assertEqual(text, expected_text)
658
def test_booleans(self):
659
keywords = ["Bool", "NonDbusBoolean"]
660
text = str(TableOfClients(self.clients, keywords,
663
A D-BUS Boolean A Non-D-BUS Boolean
667
self.assertEqual(text, expected_text)
668
def test_milliseconds_detection(self):
669
keywords = ["Integer", "Timeout", "Interval", "ApprovalDelay",
670
"ApprovalDuration", "ExtendedTimeout"]
671
text = str(TableOfClients(self.clients, keywords,
674
An Integer Timedelta 1 Timedelta 2 Timedelta 3 Timedelta 4 Timedelta 5
675
0 00:00:00 00:00:01 00:00:02 00:00:03 00:00:04
676
1 1T02:03:05 1T02:03:06 1T02:03:07 1T02:03:08 1T02:03:09
678
self.assertEqual(text, expected_text)
679
def test_empty_and_long_string_values(self):
680
keywords = ["String"]
681
text = str(TableOfClients(self.clients, keywords,
686
A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,
688
self.assertEqual(text, expected_text)
905
self.assertEqual(datetime.timedelta(0, 7200), value)
908
class Test_check_option_syntax(unittest.TestCase):
910
self.parser = argparse.ArgumentParser()
911
add_command_line_options(self.parser)
913
def test_actions_requires_client_or_all(self):
914
for action, value in self.actions.items():
915
options = self.parser.parse_args()
916
setattr(options, action, value)
917
with self.assertParseError():
918
self.check_option_syntax(options)
920
# This mostly corresponds to the definition from has_actions() in
921
# check_option_syntax()
923
# The actual values set here are not that important, but we do
924
# at least stick to the correct types, even though they are
928
"bump_timeout": True,
929
"start_checker": True,
930
"stop_checker": True,
934
"timeout": datetime.timedelta(),
935
"extended_timeout": datetime.timedelta(),
936
"interval": datetime.timedelta(),
937
"approved_by_default": True,
938
"approval_delay": datetime.timedelta(),
939
"approval_duration": datetime.timedelta(),
941
"secret": io.BytesIO(b"x"),
946
@contextlib.contextmanager
947
def assertParseError(self):
948
with self.assertRaises(SystemExit) as e:
949
with self.redirect_stderr_to_devnull():
951
# Exit code from argparse is guaranteed to be "2". Reference:
952
# https://docs.python.org/3/library
953
# /argparse.html#exiting-methods
954
self.assertEqual(2, e.exception.code)
957
@contextlib.contextmanager
958
def redirect_stderr_to_devnull():
959
old_stderr = sys.stderr
960
with contextlib.closing(open(os.devnull, "w")) as null:
965
sys.stderr = old_stderr
967
def check_option_syntax(self, options):
968
check_option_syntax(self.parser, options)
970
def test_actions_all_conflicts_with_verbose(self):
971
for action, value in self.actions.items():
972
options = self.parser.parse_args()
973
setattr(options, action, value)
975
options.verbose = True
976
with self.assertParseError():
977
self.check_option_syntax(options)
979
def test_actions_with_client_conflicts_with_verbose(self):
980
for action, value in self.actions.items():
981
options = self.parser.parse_args()
982
setattr(options, action, value)
983
options.verbose = True
984
options.client = ["client"]
985
with self.assertParseError():
986
self.check_option_syntax(options)
988
def test_dump_json_conflicts_with_verbose(self):
989
options = self.parser.parse_args()
990
options.dump_json = True
991
options.verbose = True
992
with self.assertParseError():
993
self.check_option_syntax(options)
995
def test_dump_json_conflicts_with_action(self):
996
for action, value in self.actions.items():
997
options = self.parser.parse_args()
998
setattr(options, action, value)
999
options.dump_json = True
1000
with self.assertParseError():
1001
self.check_option_syntax(options)
1003
def test_all_can_not_be_alone(self):
1004
options = self.parser.parse_args()
1006
with self.assertParseError():
1007
self.check_option_syntax(options)
1009
def test_all_is_ok_with_any_action(self):
1010
for action, value in self.actions.items():
1011
options = self.parser.parse_args()
1012
setattr(options, action, value)
1014
self.check_option_syntax(options)
1016
def test_any_action_is_ok_with_one_client(self):
1017
for action, value in self.actions.items():
1018
options = self.parser.parse_args()
1019
setattr(options, action, value)
1020
options.client = ["client"]
1021
self.check_option_syntax(options)
1023
def test_one_client_with_all_actions_except_is_enabled(self):
1024
options = self.parser.parse_args()
1025
for action, value in self.actions.items():
1026
if action == "is_enabled":
1028
setattr(options, action, value)
1029
options.client = ["client"]
1030
self.check_option_syntax(options)
1032
def test_two_clients_with_all_actions_except_is_enabled(self):
1033
options = self.parser.parse_args()
1034
for action, value in self.actions.items():
1035
if action == "is_enabled":
1037
setattr(options, action, value)
1038
options.client = ["client1", "client2"]
1039
self.check_option_syntax(options)
1041
def test_two_clients_are_ok_with_actions_except_is_enabled(self):
1042
for action, value in self.actions.items():
1043
if action == "is_enabled":
1045
options = self.parser.parse_args()
1046
setattr(options, action, value)
1047
options.client = ["client1", "client2"]
1048
self.check_option_syntax(options)
1050
def test_is_enabled_fails_without_client(self):
1051
options = self.parser.parse_args()
1052
options.is_enabled = True
1053
with self.assertParseError():
1054
self.check_option_syntax(options)
1056
def test_is_enabled_fails_with_two_clients(self):
1057
options = self.parser.parse_args()
1058
options.is_enabled = True
1059
options.client = ["client1", "client2"]
1060
with self.assertParseError():
1061
self.check_option_syntax(options)
1063
def test_remove_can_only_be_combined_with_action_deny(self):
1064
for action, value in self.actions.items():
1065
if action in {"remove", "deny"}:
1067
options = self.parser.parse_args()
1068
setattr(options, action, value)
1070
options.remove = True
1071
with self.assertParseError():
1072
self.check_option_syntax(options)
1075
class Test_get_mandos_dbus_object(TestCaseWithAssertLogs):
1076
def test_calls_and_returns_get_object_on_bus(self):
1077
class MockBus(object):
1079
def get_object(mockbus_self, busname, dbus_path):
1080
# Note that "self" is still the testcase instance,
1081
# this MockBus instance is in "mockbus_self".
1082
self.assertEqual(dbus_busname, busname)
1083
self.assertEqual(server_dbus_path, dbus_path)
1084
mockbus_self.called = True
1087
mockbus = get_mandos_dbus_object(bus=MockBus())
1088
self.assertIsInstance(mockbus, MockBus)
1089
self.assertTrue(mockbus.called)
1091
def test_logs_and_exits_on_dbus_error(self):
1092
class FailingBusStub(object):
1093
def get_object(self, busname, dbus_path):
1094
raise dbus.exceptions.DBusException("Test")
1096
with self.assertLogs(log, logging.CRITICAL):
1097
with self.assertRaises(SystemExit) as e:
1098
bus = get_mandos_dbus_object(bus=FailingBusStub())
1100
if isinstance(e.exception.code, int):
1101
self.assertNotEqual(0, e.exception.code)
1103
self.assertIsNotNone(e.exception.code)
1106
class Test_get_managed_objects(TestCaseWithAssertLogs):
1107
def test_calls_and_returns_GetManagedObjects(self):
1108
managed_objects = {"/clients/client": { "Name": "client"}}
1109
class ObjectManagerStub(object):
1110
def GetManagedObjects(self):
1111
return managed_objects
1112
retval = get_managed_objects(ObjectManagerStub())
1113
self.assertDictEqual(managed_objects, retval)
1115
def test_logs_and_exits_on_dbus_error(self):
1116
dbus_logger = logging.getLogger("dbus.proxies")
1118
class ObjectManagerFailingStub(object):
1119
def GetManagedObjects(self):
1120
dbus_logger.error("Test")
1121
raise dbus.exceptions.DBusException("Test")
1123
class CountingHandler(logging.Handler):
1125
def emit(self, record):
1128
counting_handler = CountingHandler()
1130
dbus_logger.addHandler(counting_handler)
1133
with self.assertLogs(log, logging.CRITICAL) as watcher:
1134
with self.assertRaises(SystemExit) as e:
1135
get_managed_objects(ObjectManagerFailingStub())
1137
dbus_logger.removeFilter(counting_handler)
1139
# Make sure the dbus logger was suppressed
1140
self.assertEqual(0, counting_handler.count)
1142
# Test that the dbus_logger still works
1143
with self.assertLogs(dbus_logger, logging.ERROR):
1144
dbus_logger.error("Test")
1146
if isinstance(e.exception.code, int):
1147
self.assertNotEqual(0, e.exception.code)
1149
self.assertIsNotNone(e.exception.code)
1152
class Test_commands_from_options(unittest.TestCase):
1154
self.parser = argparse.ArgumentParser()
1155
add_command_line_options(self.parser)
1157
def test_is_enabled(self):
1158
self.assert_command_from_args(["--is-enabled", "client"],
1161
def assert_command_from_args(self, args, command_cls,
1163
"""Assert that parsing ARGS should result in an instance of
1164
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1165
options = self.parser.parse_args(args)
1166
check_option_syntax(self.parser, options)
1167
commands = commands_from_options(options)
1168
self.assertEqual(1, len(commands))
1169
command = commands[0]
1170
self.assertIsInstance(command, command_cls)
1171
for key, value in cmd_attrs.items():
1172
self.assertEqual(value, getattr(command, key))
1174
def test_is_enabled_short(self):
1175
self.assert_command_from_args(["-V", "client"],
1178
def test_approve(self):
1179
self.assert_command_from_args(["--approve", "client"],
1182
def test_approve_short(self):
1183
self.assert_command_from_args(["-A", "client"],
1186
def test_deny(self):
1187
self.assert_command_from_args(["--deny", "client"],
1190
def test_deny_short(self):
1191
self.assert_command_from_args(["-D", "client"], command.Deny)
1193
def test_remove(self):
1194
self.assert_command_from_args(["--remove", "client"],
1197
def test_deny_before_remove(self):
1198
options = self.parser.parse_args(["--deny", "--remove",
1200
check_option_syntax(self.parser, options)
1201
commands = commands_from_options(options)
1202
self.assertEqual(2, len(commands))
1203
self.assertIsInstance(commands[0], command.Deny)
1204
self.assertIsInstance(commands[1], command.Remove)
1206
def test_deny_before_remove_reversed(self):
1207
options = self.parser.parse_args(["--remove", "--deny",
1209
check_option_syntax(self.parser, options)
1210
commands = commands_from_options(options)
1211
self.assertEqual(2, len(commands))
1212
self.assertIsInstance(commands[0], command.Deny)
1213
self.assertIsInstance(commands[1], command.Remove)
1215
def test_remove_short(self):
1216
self.assert_command_from_args(["-r", "client"],
1219
def test_dump_json(self):
1220
self.assert_command_from_args(["--dump-json"],
1223
def test_enable(self):
1224
self.assert_command_from_args(["--enable", "client"],
1227
def test_enable_short(self):
1228
self.assert_command_from_args(["-e", "client"],
1231
def test_disable(self):
1232
self.assert_command_from_args(["--disable", "client"],
1235
def test_disable_short(self):
1236
self.assert_command_from_args(["-d", "client"],
1239
def test_bump_timeout(self):
1240
self.assert_command_from_args(["--bump-timeout", "client"],
1241
command.BumpTimeout)
1243
def test_bump_timeout_short(self):
1244
self.assert_command_from_args(["-b", "client"],
1245
command.BumpTimeout)
1247
def test_start_checker(self):
1248
self.assert_command_from_args(["--start-checker", "client"],
1249
command.StartChecker)
1251
def test_stop_checker(self):
1252
self.assert_command_from_args(["--stop-checker", "client"],
1253
command.StopChecker)
1255
def test_approve_by_default(self):
1256
self.assert_command_from_args(["--approve-by-default",
1258
command.ApproveByDefault)
1260
def test_deny_by_default(self):
1261
self.assert_command_from_args(["--deny-by-default", "client"],
1262
command.DenyByDefault)
1264
def test_checker(self):
1265
self.assert_command_from_args(["--checker", ":", "client"],
1269
def test_checker_empty(self):
1270
self.assert_command_from_args(["--checker", "", "client"],
1274
def test_checker_short(self):
1275
self.assert_command_from_args(["-c", ":", "client"],
1279
def test_host(self):
1280
self.assert_command_from_args(
1281
["--host", "client.example.org", "client"],
1282
command.SetHost, value_to_set="client.example.org")
1284
def test_host_short(self):
1285
self.assert_command_from_args(
1286
["-H", "client.example.org", "client"], command.SetHost,
1287
value_to_set="client.example.org")
1289
def test_secret_devnull(self):
1290
self.assert_command_from_args(["--secret", os.path.devnull,
1291
"client"], command.SetSecret,
1294
def test_secret_tempfile(self):
1295
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1296
value = b"secret\0xyzzy\nbar"
1299
self.assert_command_from_args(["--secret", f.name,
1304
def test_secret_devnull_short(self):
1305
self.assert_command_from_args(["-s", os.path.devnull,
1306
"client"], command.SetSecret,
1309
def test_secret_tempfile_short(self):
1310
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1311
value = b"secret\0xyzzy\nbar"
1314
self.assert_command_from_args(["-s", f.name, "client"],
1318
def test_timeout(self):
1319
self.assert_command_from_args(["--timeout", "PT5M", "client"],
1321
value_to_set=300000)
1323
def test_timeout_short(self):
1324
self.assert_command_from_args(["-t", "PT5M", "client"],
1326
value_to_set=300000)
1328
def test_extended_timeout(self):
1329
self.assert_command_from_args(["--extended-timeout", "PT15M",
1331
command.SetExtendedTimeout,
1332
value_to_set=900000)
1334
def test_interval(self):
1335
self.assert_command_from_args(["--interval", "PT2M",
1336
"client"], command.SetInterval,
1337
value_to_set=120000)
1339
def test_interval_short(self):
1340
self.assert_command_from_args(["-i", "PT2M", "client"],
1341
command.SetInterval,
1342
value_to_set=120000)
1344
def test_approval_delay(self):
1345
self.assert_command_from_args(["--approval-delay", "PT30S",
1347
command.SetApprovalDelay,
1350
def test_approval_duration(self):
1351
self.assert_command_from_args(["--approval-duration", "PT1S",
1353
command.SetApprovalDuration,
1356
def test_print_table(self):
1357
self.assert_command_from_args([], command.PrintTable,
1360
def test_print_table_verbose(self):
1361
self.assert_command_from_args(["--verbose"],
1365
def test_print_table_verbose_short(self):
1366
self.assert_command_from_args(["-v"], command.PrintTable,
1370
class TestCommand(unittest.TestCase):
1371
"""Abstract class for tests of command classes"""
1375
class MockClient(object):
1376
def __init__(self, name, **attributes):
1377
self.__dbus_object_path__ = "/clients/{}".format(name)
1378
self.attributes = attributes
1379
self.attributes["Name"] = name
1381
def Set(self, interface, propname, value, dbus_interface):
1382
testcase.assertEqual(client_dbus_interface, interface)
1383
testcase.assertEqual(dbus.PROPERTIES_IFACE,
1385
self.attributes[propname] = value
1386
def Approve(self, approve, dbus_interface):
1387
testcase.assertEqual(client_dbus_interface,
1389
self.calls.append(("Approve", (approve,
1391
self.client = MockClient(
1393
KeyID=("92ed150794387c03ce684574b1139a65"
1394
"94a34f895daaaf09fd8ea90a27cddb12"),
1396
Host="foo.example.org",
1397
Enabled=dbus.Boolean(True),
1399
LastCheckedOK="2019-02-03T00:00:00",
1400
Created="2019-01-02T00:00:00",
1402
Fingerprint=("778827225BA7DE539C5A"
1403
"7CFA59CFF7CDBD9A5920"),
1404
CheckerRunning=dbus.Boolean(False),
1405
LastEnabled="2019-01-03T00:00:00",
1406
ApprovalPending=dbus.Boolean(False),
1407
ApprovedByDefault=dbus.Boolean(True),
1408
LastApprovalRequest="",
1410
ApprovalDuration=1000,
1411
Checker="fping -q -- %(host)s",
1412
ExtendedTimeout=900000,
1413
Expires="2019-02-04T00:00:00",
1414
LastCheckerStatus=0)
1415
self.other_client = MockClient(
1417
KeyID=("0558568eedd67d622f5c83b35a115f79"
1418
"6ab612cff5ad227247e46c2b020f441c"),
1419
Secret=b"secretbar",
1421
Enabled=dbus.Boolean(True),
1423
LastCheckedOK="2019-02-04T00:00:00",
1424
Created="2019-01-03T00:00:00",
1426
Fingerprint=("3E393AEAEFB84C7E89E2"
1427
"F547B3A107558FCA3A27"),
1428
CheckerRunning=dbus.Boolean(True),
1429
LastEnabled="2019-01-04T00:00:00",
1430
ApprovalPending=dbus.Boolean(False),
1431
ApprovedByDefault=dbus.Boolean(False),
1432
LastApprovalRequest="2019-01-03T00:00:00",
1433
ApprovalDelay=30000,
1434
ApprovalDuration=93785000,
1436
ExtendedTimeout=900000,
1437
Expires="2019-02-05T00:00:00",
1438
LastCheckerStatus=-2)
1439
self.clients = collections.OrderedDict(
1441
(self.client.__dbus_object_path__,
1442
self.client.attributes),
1443
(self.other_client.__dbus_object_path__,
1444
self.other_client.attributes),
1446
self.one_client = {self.client.__dbus_object_path__:
1447
self.client.attributes}
1451
class MockBus(object):
1453
def get_object(client_bus_name, path):
1454
self.assertEqual(dbus_busname, client_bus_name)
1455
# Note: "self" here is the TestCmd instance, not the
1456
# MockBus instance, since this is a static method!
1457
if path == self.client.__dbus_object_path__:
1459
elif path == self.other_client.__dbus_object_path__:
1460
return self.other_client
1464
class TestBaseCommands(TestCommand):
1466
def test_IsEnabled_exits_successfully(self):
1467
with self.assertRaises(SystemExit) as e:
1468
command.IsEnabled().run(self.one_client)
1469
if e.exception.code is not None:
1470
self.assertEqual(0, e.exception.code)
1472
self.assertIsNone(e.exception.code)
1474
def test_IsEnabled_exits_with_failure(self):
1475
self.client.attributes["Enabled"] = dbus.Boolean(False)
1476
with self.assertRaises(SystemExit) as e:
1477
command.IsEnabled().run(self.one_client)
1478
if isinstance(e.exception.code, int):
1479
self.assertNotEqual(0, e.exception.code)
1481
self.assertIsNotNone(e.exception.code)
1483
def test_Approve(self):
1484
command.Approve().run(self.clients, self.bus)
1485
for clientpath in self.clients:
1486
client = self.bus.get_object(dbus_busname, clientpath)
1487
self.assertIn(("Approve", (True, client_dbus_interface)),
1490
def test_Deny(self):
1491
command.Deny().run(self.clients, self.bus)
1492
for clientpath in self.clients:
1493
client = self.bus.get_object(dbus_busname, clientpath)
1494
self.assertIn(("Approve", (False, client_dbus_interface)),
1497
def test_Remove(self):
1498
class MandosSpy(object):
1501
def RemoveClient(self, dbus_path):
1502
self.calls.append(("RemoveClient", (dbus_path,)))
1503
mandos = MandosSpy()
1504
command.Remove().run(self.clients, self.bus, mandos)
1505
for clientpath in self.clients:
1506
self.assertIn(("RemoveClient", (clientpath,)),
1512
"KeyID": ("92ed150794387c03ce684574b1139a65"
1513
"94a34f895daaaf09fd8ea90a27cddb12"),
1514
"Host": "foo.example.org",
1517
"LastCheckedOK": "2019-02-03T00:00:00",
1518
"Created": "2019-01-02T00:00:00",
1520
"Fingerprint": ("778827225BA7DE539C5A"
1521
"7CFA59CFF7CDBD9A5920"),
1522
"CheckerRunning": False,
1523
"LastEnabled": "2019-01-03T00:00:00",
1524
"ApprovalPending": False,
1525
"ApprovedByDefault": True,
1526
"LastApprovalRequest": "",
1528
"ApprovalDuration": 1000,
1529
"Checker": "fping -q -- %(host)s",
1530
"ExtendedTimeout": 900000,
1531
"Expires": "2019-02-04T00:00:00",
1532
"LastCheckerStatus": 0,
1536
"KeyID": ("0558568eedd67d622f5c83b35a115f79"
1537
"6ab612cff5ad227247e46c2b020f441c"),
1538
"Host": "192.0.2.3",
1541
"LastCheckedOK": "2019-02-04T00:00:00",
1542
"Created": "2019-01-03T00:00:00",
1544
"Fingerprint": ("3E393AEAEFB84C7E89E2"
1545
"F547B3A107558FCA3A27"),
1546
"CheckerRunning": True,
1547
"LastEnabled": "2019-01-04T00:00:00",
1548
"ApprovalPending": False,
1549
"ApprovedByDefault": False,
1550
"LastApprovalRequest": "2019-01-03T00:00:00",
1551
"ApprovalDelay": 30000,
1552
"ApprovalDuration": 93785000,
1554
"ExtendedTimeout": 900000,
1555
"Expires": "2019-02-05T00:00:00",
1556
"LastCheckerStatus": -2,
1560
def test_DumpJSON_normal(self):
1561
with self.capture_stdout_to_buffer() as buffer:
1562
command.DumpJSON().run(self.clients)
1563
json_data = json.loads(buffer.getvalue())
1564
self.assertDictEqual(self.expected_json, json_data)
1567
@contextlib.contextmanager
1568
def capture_stdout_to_buffer():
1569
capture_buffer = io.StringIO()
1570
old_stdout = sys.stdout
1571
sys.stdout = capture_buffer
1573
yield capture_buffer
1575
sys.stdout = old_stdout
1577
def test_DumpJSON_one_client(self):
1578
with self.capture_stdout_to_buffer() as buffer:
1579
command.DumpJSON().run(self.one_client)
1580
json_data = json.loads(buffer.getvalue())
1581
expected_json = {"foo": self.expected_json["foo"]}
1582
self.assertDictEqual(expected_json, json_data)
1584
def test_PrintTable_normal(self):
1585
with self.capture_stdout_to_buffer() as buffer:
1586
command.PrintTable().run(self.clients)
1587
expected_output = "\n".join((
1588
"Name Enabled Timeout Last Successful Check",
1589
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1590
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1592
self.assertEqual(expected_output, buffer.getvalue())
1594
def test_PrintTable_verbose(self):
1595
with self.capture_stdout_to_buffer() as buffer:
1596
command.PrintTable(verbose=True).run(self.clients)
1611
"Last Successful Check ",
1612
"2019-02-03T00:00:00 ",
1613
"2019-02-04T00:00:00 ",
1616
"2019-01-02T00:00:00 ",
1617
"2019-01-03T00:00:00 ",
1629
("92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8"
1631
("0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e"
1635
"778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 ",
1636
"3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 ",
1638
"Check Is Running ",
1643
"2019-01-03T00:00:00 ",
1644
"2019-01-04T00:00:00 ",
1646
"Approval Is Pending ",
1650
"Approved By Default ",
1654
"Last Approval Request ",
1656
"2019-01-03T00:00:00 ",
1662
"Approval Duration ",
1667
"fping -q -- %(host)s ",
1670
"Extended Timeout ",
1675
"2019-02-04T00:00:00 ",
1676
"2019-02-05T00:00:00 ",
1678
"Last Checker Status",
1683
num_lines = max(len(rows) for rows in columns)
1684
expected_output = ("\n".join("".join(rows[line]
1685
for rows in columns)
1686
for line in range(num_lines))
1688
self.assertEqual(expected_output, buffer.getvalue())
1690
def test_PrintTable_one_client(self):
1691
with self.capture_stdout_to_buffer() as buffer:
1692
command.PrintTable().run(self.one_client)
1693
expected_output = "\n".join((
1694
"Name Enabled Timeout Last Successful Check",
1695
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1697
self.assertEqual(expected_output, buffer.getvalue())
1700
class TestPropertySetterCmd(TestCommand):
1701
"""Abstract class for tests of command.PropertySetter classes"""
1703
if not hasattr(self, "command"):
1705
values_to_get = getattr(self, "values_to_get",
1707
for value_to_set, value_to_get in zip(self.values_to_set,
1709
for clientpath in self.clients:
1710
client = self.bus.get_object(dbus_busname, clientpath)
1711
old_value = client.attributes[self.propname]
1712
client.attributes[self.propname] = self.Unique()
1713
self.run_command(value_to_set, self.clients)
1714
for clientpath in self.clients:
1715
client = self.bus.get_object(dbus_busname, clientpath)
1716
value = client.attributes[self.propname]
1717
self.assertNotIsInstance(value, self.Unique)
1718
self.assertEqual(value_to_get, value)
1720
class Unique(object):
1721
"""Class for objects which exist only to be unique objects,
1722
since unittest.mock.sentinel only exists in Python 3.3"""
1724
def run_command(self, value, clients):
1725
self.command().run(clients, self.bus)
1728
class TestEnableCmd(TestPropertySetterCmd):
1729
command = command.Enable
1730
propname = "Enabled"
1731
values_to_set = [dbus.Boolean(True)]
1734
class TestDisableCmd(TestPropertySetterCmd):
1735
command = command.Disable
1736
propname = "Enabled"
1737
values_to_set = [dbus.Boolean(False)]
1740
class TestBumpTimeoutCmd(TestPropertySetterCmd):
1741
command = command.BumpTimeout
1742
propname = "LastCheckedOK"
1743
values_to_set = [""]
1746
class TestStartCheckerCmd(TestPropertySetterCmd):
1747
command = command.StartChecker
1748
propname = "CheckerRunning"
1749
values_to_set = [dbus.Boolean(True)]
1752
class TestStopCheckerCmd(TestPropertySetterCmd):
1753
command = command.StopChecker
1754
propname = "CheckerRunning"
1755
values_to_set = [dbus.Boolean(False)]
1758
class TestApproveByDefaultCmd(TestPropertySetterCmd):
1759
command = command.ApproveByDefault
1760
propname = "ApprovedByDefault"
1761
values_to_set = [dbus.Boolean(True)]
1764
class TestDenyByDefaultCmd(TestPropertySetterCmd):
1765
command = command.DenyByDefault
1766
propname = "ApprovedByDefault"
1767
values_to_set = [dbus.Boolean(False)]
1770
class TestPropertySetterValueCmd(TestPropertySetterCmd):
1771
"""Abstract class for tests of PropertySetterValueCmd classes"""
1774
if type(self) is TestPropertySetterValueCmd:
1776
return super(TestPropertySetterValueCmd, self).runTest()
1778
def run_command(self, value, clients):
1779
self.command(value).run(clients, self.bus)
1782
class TestSetCheckerCmd(TestPropertySetterValueCmd):
1783
command = command.SetChecker
1784
propname = "Checker"
1785
values_to_set = ["", ":", "fping -q -- %s"]
1788
class TestSetHostCmd(TestPropertySetterValueCmd):
1789
command = command.SetHost
1791
values_to_set = ["192.0.2.3", "client.example.org"]
1794
class TestSetSecretCmd(TestPropertySetterValueCmd):
1795
command = command.SetSecret
1797
values_to_set = [io.BytesIO(b""),
1798
io.BytesIO(b"secret\0xyzzy\nbar")]
1799
values_to_get = [f.getvalue() for f in values_to_set]
1802
class TestSetTimeoutCmd(TestPropertySetterValueCmd):
1803
command = command.SetTimeout
1804
propname = "Timeout"
1805
values_to_set = [datetime.timedelta(),
1806
datetime.timedelta(minutes=5),
1807
datetime.timedelta(seconds=1),
1808
datetime.timedelta(weeks=1),
1809
datetime.timedelta(weeks=52)]
1810
values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1813
class TestSetExtendedTimeoutCmd(TestPropertySetterValueCmd):
1814
command = command.SetExtendedTimeout
1815
propname = "ExtendedTimeout"
1816
values_to_set = [datetime.timedelta(),
1817
datetime.timedelta(minutes=5),
1818
datetime.timedelta(seconds=1),
1819
datetime.timedelta(weeks=1),
1820
datetime.timedelta(weeks=52)]
1821
values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1824
class TestSetIntervalCmd(TestPropertySetterValueCmd):
1825
command = command.SetInterval
1826
propname = "Interval"
1827
values_to_set = [datetime.timedelta(),
1828
datetime.timedelta(minutes=5),
1829
datetime.timedelta(seconds=1),
1830
datetime.timedelta(weeks=1),
1831
datetime.timedelta(weeks=52)]
1832
values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1835
class TestSetApprovalDelayCmd(TestPropertySetterValueCmd):
1836
command = command.SetApprovalDelay
1837
propname = "ApprovalDelay"
1838
values_to_set = [datetime.timedelta(),
1839
datetime.timedelta(minutes=5),
1840
datetime.timedelta(seconds=1),
1841
datetime.timedelta(weeks=1),
1842
datetime.timedelta(weeks=52)]
1843
values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1846
class TestSetApprovalDurationCmd(TestPropertySetterValueCmd):
1847
command = command.SetApprovalDuration
1848
propname = "ApprovalDuration"
1849
values_to_set = [datetime.timedelta(),
1850
datetime.timedelta(minutes=5),
1851
datetime.timedelta(seconds=1),
1852
datetime.timedelta(weeks=1),
1853
datetime.timedelta(weeks=52)]
1854
values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]