801
783
testcase.assertEqual(dbus_interface,
802
784
dbus.PROPERTIES_IFACE)
803
785
self.attributes[property] = value
786
self.calls.append(("Set", (interface, property, value,
804
788
def Get(self, interface, property, dbus_interface):
805
789
testcase.assertEqual(interface, client_interface)
806
790
testcase.assertEqual(dbus_interface,
807
791
dbus.PROPERTIES_IFACE)
792
self.calls.append(("Get", (interface, property,
808
794
return self.attributes[property]
809
def Approve(self, approve, dbus_interface):
810
testcase.assertEqual(dbus_interface, client_interface)
811
self.calls.append(("Approve", (approve,
813
self.client = MockClient(
815
KeyID=("92ed150794387c03ce684574b1139a65"
816
"94a34f895daaaf09fd8ea90a27cddb12"),
818
Host="foo.example.org",
819
Enabled=dbus.Boolean(True),
821
LastCheckedOK="2019-02-03T00:00:00",
822
Created="2019-01-02T00:00:00",
824
Fingerprint=("778827225BA7DE539C5A"
825
"7CFA59CFF7CDBD9A5920"),
826
CheckerRunning=dbus.Boolean(False),
827
LastEnabled="2019-01-03T00:00:00",
828
ApprovalPending=dbus.Boolean(False),
829
ApprovedByDefault=dbus.Boolean(True),
830
LastApprovalRequest="",
832
ApprovalDuration=1000,
833
Checker="fping -q -- %(host)s",
834
ExtendedTimeout=900000,
835
Expires="2019-02-04T00:00:00",
837
self.other_client = MockClient(
839
KeyID=("0558568eedd67d622f5c83b35a115f79"
840
"6ab612cff5ad227247e46c2b020f441c"),
843
Enabled=dbus.Boolean(True),
845
LastCheckedOK="2019-02-04T00:00:00",
846
Created="2019-01-03T00:00:00",
848
Fingerprint=("3E393AEAEFB84C7E89E2"
849
"F547B3A107558FCA3A27"),
850
CheckerRunning=dbus.Boolean(True),
851
LastEnabled="2019-01-04T00:00:00",
852
ApprovalPending=dbus.Boolean(False),
853
ApprovedByDefault=dbus.Boolean(False),
854
LastApprovalRequest="2019-01-03T00:00:00",
856
ApprovalDuration=1000,
858
ExtendedTimeout=900000,
859
Expires="2019-02-05T00:00:00",
860
LastCheckerStatus=-2)
861
self.clients = collections.OrderedDict(
863
(self.client, self.client.attributes),
864
(self.other_client, self.other_client.attributes),
795
def __getitem__(self, key):
796
return self.attributes[key]
797
def __setitem__(self, key, value):
798
self.attributes[key] = value
799
self.clients = collections.OrderedDict([
803
KeyID=("92ed150794387c03ce684574b1139a65"
804
"94a34f895daaaf09fd8ea90a27cddb12"),
806
Host="foo.example.org",
807
Enabled=dbus.Boolean(True),
809
LastCheckedOK="2019-02-03T00:00:00",
810
Created="2019-01-02T00:00:00",
812
Fingerprint=("778827225BA7DE539C5A"
813
"7CFA59CFF7CDBD9A5920"),
814
CheckerRunning=dbus.Boolean(False),
815
LastEnabled="2019-01-03T00:00:00",
816
ApprovalPending=dbus.Boolean(False),
817
ApprovedByDefault=dbus.Boolean(True),
818
LastApprovalRequest="",
820
ApprovalDuration=1000,
821
Checker="fping -q -- %(host)s",
822
ExtendedTimeout=900000,
823
Expires="2019-02-04T00:00:00",
824
LastCheckerStatus=0)),
828
KeyID=("0558568eedd67d622f5c83b35a115f79"
829
"6ab612cff5ad227247e46c2b020f441c"),
832
Enabled=dbus.Boolean(True),
834
LastCheckedOK="2019-02-04T00:00:00",
835
Created="2019-01-03T00:00:00",
837
Fingerprint=("3E393AEAEFB84C7E89E2"
838
"F547B3A107558FCA3A27"),
839
CheckerRunning=dbus.Boolean(True),
840
LastEnabled="2019-01-04T00:00:00",
841
ApprovalPending=dbus.Boolean(False),
842
ApprovedByDefault=dbus.Boolean(False),
843
LastApprovalRequest="2019-01-03T00:00:00",
845
ApprovalDuration=1000,
847
ExtendedTimeout=900000,
848
Expires="2019-02-05T00:00:00",
849
LastCheckerStatus=-2)),
866
self.one_client = {self.client: self.client.attributes}
868
852
class TestPrintTableCmd(TestCmd):
869
853
def test_normal(self):
947
931
json_data = json.loads(DumpJSONCmd().output(self.clients))
948
932
self.assertDictEqual(json_data, self.expected_json)
949
933
def test_one_client(self):
950
clients = self.one_client
934
clients = {"foo": self.clients["foo"]}
951
935
json_data = json.loads(DumpJSONCmd().output(clients))
952
936
expected_json = {"foo": self.expected_json["foo"]}
953
937
self.assertDictEqual(json_data, expected_json)
955
939
class TestIsEnabledCmd(TestCmd):
956
940
def test_is_enabled(self):
957
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
958
for client, properties in self.clients.items()))
941
self.assertTrue(all(IsEnabledCmd().is_enabled(client)
942
for client in self.clients.values()))
943
def test_is_enabled_does_get_attribute(self):
944
client = self.clients["foo"]
945
self.assertTrue(IsEnabledCmd().is_enabled(client))
946
self.assertListEqual(client.calls,
948
("se.recompile.Mandos.Client",
950
"org.freedesktop.DBus.Properties"))])
959
951
def test_is_enabled_run_exits_successfully(self):
952
client = self.clients["foo"]
960
953
with self.assertRaises(SystemExit) as e:
961
IsEnabledCmd().run(None, self.one_client)
954
IsEnabledCmd().run_on_one_client(client)
962
955
if e.exception.code is not None:
963
956
self.assertEqual(e.exception.code, 0)
965
958
self.assertIsNone(e.exception.code)
966
959
def test_is_enabled_run_exits_with_failure(self):
967
self.client.attributes["Enabled"] = dbus.Boolean(False)
960
client = self.clients["foo"]
961
client["Enabled"] = dbus.Boolean(False)
968
962
with self.assertRaises(SystemExit) as e:
969
IsEnabledCmd().run(None, self.one_client)
963
IsEnabledCmd().run_on_one_client(client)
970
964
if isinstance(e.exception.code, int):
971
965
self.assertNotEqual(e.exception.code, 0)
973
967
self.assertIsNotNone(e.exception.code)
975
class TestRemoveCmd(TestCmd):
976
def test_remove(self):
977
class MockMandos(object):
980
def RemoveClient(self, dbus_path):
981
self.calls.append(("RemoveClient", (dbus_path,)))
982
mandos = MockMandos()
983
super(TestRemoveCmd, self).setUp()
984
RemoveCmd().run(mandos, self.clients)
985
self.assertEqual(len(mandos.calls), 2)
986
for client in self.clients:
987
self.assertIn(("RemoveClient",
988
(client.__dbus_object_path__,)),
991
class TestApproveCmd(TestCmd):
992
def test_approve(self):
993
ApproveCmd().run(None, self.clients)
994
for client in self.clients:
995
self.assertIn(("Approve", (True, client_interface)),
998
class TestDenyCmd(TestCmd):
1000
DenyCmd().run(None, self.clients)
1001
for client in self.clients:
1002
self.assertIn(("Approve", (False, client_interface)),
1005
class TestEnableCmd(TestCmd):
1006
def test_enable(self):
1007
for client in self.clients:
1008
client.attributes["Enabled"] = False
1010
EnableCmd().run(None, self.clients)
1012
for client in self.clients:
1013
self.assertTrue(client.attributes["Enabled"])
1015
class TestDisableCmd(TestCmd):
1016
def test_disable(self):
1017
DisableCmd().run(None, self.clients)
1019
for client in self.clients:
1020
self.assertFalse(client.attributes["Enabled"])
1022
class Unique(object):
1023
"""Class for objects which exist only to be unique objects, since
1024
unittest.mock.sentinel only exists in Python 3.3"""
1026
class TestPropertyCmd(TestCmd):
1027
"""Abstract class for tests of PropertyCmd classes"""
1029
if not hasattr(self, "command"):
1031
values_to_get = getattr(self, "values_to_get",
1033
for value_to_set, value_to_get in zip(self.values_to_set,
1035
for client in self.clients:
1036
old_value = client.attributes[self.property]
1037
self.assertNotIsInstance(old_value, Unique)
1038
client.attributes[self.property] = Unique()
1039
self.run_command(value_to_set, self.clients)
1040
for client in self.clients:
1041
value = client.attributes[self.property]
1042
self.assertNotIsInstance(value, Unique)
1043
self.assertEqual(value, value_to_get)
1044
def run_command(self, value, clients):
1045
self.command().run(None, clients)
1047
class TestBumpTimeoutCmd(TestPropertyCmd):
1048
command = BumpTimeoutCmd
1049
property = "LastCheckedOK"
1050
values_to_set = [""]
1052
class TestStartCheckerCmd(TestPropertyCmd):
1053
command = StartCheckerCmd
1054
property = "CheckerRunning"
1055
values_to_set = [dbus.Boolean(True)]
1057
class TestStopCheckerCmd(TestPropertyCmd):
1058
command = StopCheckerCmd
1059
property = "CheckerRunning"
1060
values_to_set = [dbus.Boolean(False)]
1062
class TestApproveByDefaultCmd(TestPropertyCmd):
1063
command = ApproveByDefaultCmd
1064
property = "ApprovedByDefault"
1065
values_to_set = [dbus.Boolean(True)]
1067
class TestDenyByDefaultCmd(TestPropertyCmd):
1068
command = DenyByDefaultCmd
1069
property = "ApprovedByDefault"
1070
values_to_set = [dbus.Boolean(False)]
1072
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1073
"""Abstract class for tests of PropertyCmd classes using the
1074
ValueArgumentMixIn"""
1076
if type(self) is TestValueArgumentPropertyCmd:
1078
return super(TestValueArgumentPropertyCmd, self).runTest()
1079
def run_command(self, value, clients):
1080
self.command(value).run(None, clients)
1082
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1083
command = SetCheckerCmd
1084
property = "Checker"
1085
values_to_set = ["", ":", "fping -q -- %s"]
1087
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1088
command = SetHostCmd
1090
values_to_set = ["192.0.2.3", "foo.example.org"]
1092
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1093
command = SetSecretCmd
1095
values_to_set = [open("/dev/null", "rb"),
1096
io.BytesIO(b"secret\0xyzzy\nbar")]
1097
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1099
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1100
command = SetTimeoutCmd
1101
property = "Timeout"
1102
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1103
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1105
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1106
command = SetExtendedTimeoutCmd
1107
property = "ExtendedTimeout"
1108
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1109
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1111
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1112
command = SetIntervalCmd
1113
property = "Interval"
1114
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1115
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1117
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1118
command = SetApprovalDelayCmd
1119
property = "ApprovalDelay"
1120
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1121
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1123
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1124
command = SetApprovalDurationCmd
1125
property = "ApprovalDuration"
1126
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1127
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1129
class TestOptions(unittest.TestCase):
1131
self.parser = argparse.ArgumentParser()
1132
add_command_line_options(self.parser)
1133
def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1134
"""Assert that parsing ARGS should result in an instance of
1135
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1136
options = self.parser.parse_args(args)
1137
commands = commands_from_options(options)
1138
self.assertEqual(len(commands), 1)
1139
command = commands[0]
1140
self.assertIsInstance(command, command_cls)
1141
for key, value in cmd_attrs.items():
1142
self.assertEqual(getattr(command, key), value)
1143
def test_default_is_show_table(self):
1144
self.assert_command_from_args([], PrintTableCmd,
1146
def test_show_table_verbose(self):
1147
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1149
def test_enable(self):
1150
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1151
def test_disable(self):
1152
self.assert_command_from_args(["--disable", "foo"],
1157
971
def should_only_run_tests():