796
777
self.__dbus_object_path__ = "objpath_{}".format(name)
797
778
self.attributes = attributes
798
779
self.attributes["Name"] = name
800
def Set(self, interface, property, value, dbus_interface):
780
def Set(interface, property, value,
781
properties_interface):
801
782
testcase.assertEqual(interface, client_interface)
802
testcase.assertEqual(dbus_interface,
783
testcase.assertEqual(properties_interface,
803
784
dbus.PROPERTIES_IFACE)
804
785
self.attributes[property] = value
805
def Get(self, interface, property, dbus_interface):
786
def Get(interface, property, properties_interface):
806
787
testcase.assertEqual(interface, client_interface)
807
testcase.assertEqual(dbus_interface,
788
testcase.assertEqual(properties_interface,
808
789
dbus.PROPERTIES_IFACE)
809
790
return self.attributes[property]
810
def Approve(self, approve, dbus_interface):
811
testcase.assertEqual(dbus_interface, client_interface)
812
self.calls.append(("Approve", (approve,
814
self.client = MockClient(
816
KeyID=("92ed150794387c03ce684574b1139a65"
817
"94a34f895daaaf09fd8ea90a27cddb12"),
819
Host="foo.example.org",
820
Enabled=dbus.Boolean(True),
822
LastCheckedOK="2019-02-03T00:00:00",
823
Created="2019-01-02T00:00:00",
825
Fingerprint=("778827225BA7DE539C5A"
826
"7CFA59CFF7CDBD9A5920"),
827
CheckerRunning=dbus.Boolean(False),
828
LastEnabled="2019-01-03T00:00:00",
829
ApprovalPending=dbus.Boolean(False),
830
ApprovedByDefault=dbus.Boolean(True),
831
LastApprovalRequest="",
833
ApprovalDuration=1000,
834
Checker="fping -q -- %(host)s",
835
ExtendedTimeout=900000,
836
Expires="2019-02-04T00:00:00",
838
self.other_client = MockClient(
840
KeyID=("0558568eedd67d622f5c83b35a115f79"
841
"6ab612cff5ad227247e46c2b020f441c"),
844
Enabled=dbus.Boolean(True),
846
LastCheckedOK="2019-02-04T00:00:00",
847
Created="2019-01-03T00:00:00",
849
Fingerprint=("3E393AEAEFB84C7E89E2"
850
"F547B3A107558FCA3A27"),
851
CheckerRunning=dbus.Boolean(True),
852
LastEnabled="2019-01-04T00:00:00",
853
ApprovalPending=dbus.Boolean(False),
854
ApprovedByDefault=dbus.Boolean(False),
855
LastApprovalRequest="2019-01-03T00:00:00",
857
ApprovalDuration=1000,
859
ExtendedTimeout=900000,
860
Expires="2019-02-05T00:00:00",
861
LastCheckerStatus=-2)
862
self.clients = collections.OrderedDict(
864
(self.client, self.client.attributes),
865
(self.other_client, self.other_client.attributes),
791
def __getitem__(self, key):
792
return self.attributes[key]
793
self.clients = collections.OrderedDict([
797
KeyID=("92ed150794387c03ce684574b1139a65"
798
"94a34f895daaaf09fd8ea90a27cddb12"),
800
Host="foo.example.org",
801
Enabled=dbus.Boolean(True),
803
LastCheckedOK="2019-02-03T00:00:00",
804
Created="2019-01-02T00:00:00",
806
Fingerprint=("778827225BA7DE539C5A"
807
"7CFA59CFF7CDBD9A5920"),
808
CheckerRunning=dbus.Boolean(False),
809
LastEnabled="2019-01-03T00:00:00",
810
ApprovalPending=dbus.Boolean(False),
811
ApprovedByDefault=dbus.Boolean(True),
812
LastApprovalRequest="",
814
ApprovalDuration=1000,
815
Checker="fping -q -- %(host)s",
816
ExtendedTimeout=900000,
817
Expires="2019-02-04T00:00:00",
818
LastCheckerStatus=0)),
822
KeyID=("0558568eedd67d622f5c83b35a115f79"
823
"6ab612cff5ad227247e46c2b020f441c"),
826
Enabled=dbus.Boolean(True),
828
LastCheckedOK="2019-02-04T00:00:00",
829
Created="2019-01-03T00:00:00",
831
Fingerprint=("3E393AEAEFB84C7E89E2"
832
"F547B3A107558FCA3A27"),
833
CheckerRunning=dbus.Boolean(True),
834
LastEnabled="2019-01-04T00:00:00",
835
ApprovalPending=dbus.Boolean(False),
836
ApprovedByDefault=dbus.Boolean(False),
837
LastApprovalRequest="2019-01-03T00:00:00",
839
ApprovalDuration=1000,
841
ExtendedTimeout=900000,
842
Expires="2019-02-05T00:00:00",
843
LastCheckerStatus=-2)),
867
self.one_client = {self.client: self.client.attributes}
869
846
class TestPrintTableCmd(TestCmd):
870
847
def test_normal(self):
948
925
json_data = json.loads(DumpJSONCmd().output(self.clients))
949
926
self.assertDictEqual(json_data, self.expected_json)
950
927
def test_one_client(self):
951
clients = self.one_client
928
clients = {"foo": self.clients["foo"]}
952
929
json_data = json.loads(DumpJSONCmd().output(clients))
953
930
expected_json = {"foo": self.expected_json["foo"]}
954
931
self.assertDictEqual(json_data, expected_json)
956
class TestIsEnabledCmd(TestCmd):
957
def test_is_enabled(self):
958
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
959
for client, properties in self.clients.items()))
960
def test_is_enabled_run_exits_successfully(self):
961
with self.assertRaises(SystemExit) as e:
962
IsEnabledCmd().run(None, self.one_client)
963
if e.exception.code is not None:
964
self.assertEqual(e.exception.code, 0)
966
self.assertIsNone(e.exception.code)
967
def test_is_enabled_run_exits_with_failure(self):
968
self.client.attributes["Enabled"] = dbus.Boolean(False)
969
with self.assertRaises(SystemExit) as e:
970
IsEnabledCmd().run(None, self.one_client)
971
if isinstance(e.exception.code, int):
972
self.assertNotEqual(e.exception.code, 0)
974
self.assertIsNotNone(e.exception.code)
976
class TestRemoveCmd(TestCmd):
977
def test_remove(self):
978
class MockMandos(object):
981
def RemoveClient(self, dbus_path):
982
self.calls.append(("RemoveClient", (dbus_path,)))
983
mandos = MockMandos()
984
super(TestRemoveCmd, self).setUp()
985
RemoveCmd().run(mandos, self.clients)
986
self.assertEqual(len(mandos.calls), 2)
987
for client in self.clients:
988
self.assertIn(("RemoveClient",
989
(client.__dbus_object_path__,)),
992
class TestApproveCmd(TestCmd):
993
def test_approve(self):
994
ApproveCmd().run(None, self.clients)
995
for client in self.clients:
996
self.assertIn(("Approve", (True, client_interface)),
999
class TestDenyCmd(TestCmd):
1000
def test_deny(self):
1001
DenyCmd().run(None, self.clients)
1002
for client in self.clients:
1003
self.assertIn(("Approve", (False, client_interface)),
1006
class TestEnableCmd(TestCmd):
1007
def test_enable(self):
1008
for client in self.clients:
1009
client.attributes["Enabled"] = False
1011
EnableCmd().run(None, self.clients)
1013
for client in self.clients:
1014
self.assertTrue(client.attributes["Enabled"])
1016
class TestDisableCmd(TestCmd):
1017
def test_disable(self):
1018
DisableCmd().run(None, self.clients)
1020
for client in self.clients:
1021
self.assertFalse(client.attributes["Enabled"])
1023
class Unique(object):
1024
"""Class for objects which exist only to be unique objects, since
1025
unittest.mock.sentinel only exists in Python 3.3"""
1027
class TestPropertyCmd(TestCmd):
1028
"""Abstract class for tests of PropertyCmd classes"""
1030
if not hasattr(self, "command"):
1032
values_to_get = getattr(self, "values_to_get",
1034
for value_to_set, value_to_get in zip(self.values_to_set,
1036
for client in self.clients:
1037
old_value = client.attributes[self.property]
1038
self.assertNotIsInstance(old_value, Unique)
1039
client.attributes[self.property] = Unique()
1040
self.run_command(value_to_set, self.clients)
1041
for client in self.clients:
1042
value = client.attributes[self.property]
1043
self.assertNotIsInstance(value, Unique)
1044
self.assertEqual(value, value_to_get)
1045
def run_command(self, value, clients):
1046
self.command().run(None, clients)
1048
class TestBumpTimeoutCmd(TestPropertyCmd):
1049
command = BumpTimeoutCmd
1050
property = "LastCheckedOK"
1051
values_to_set = [""]
1053
class TestStartCheckerCmd(TestPropertyCmd):
1054
command = StartCheckerCmd
1055
property = "CheckerRunning"
1056
values_to_set = [dbus.Boolean(True)]
1058
class TestStopCheckerCmd(TestPropertyCmd):
1059
command = StopCheckerCmd
1060
property = "CheckerRunning"
1061
values_to_set = [dbus.Boolean(False)]
1063
class TestApproveByDefaultCmd(TestPropertyCmd):
1064
command = ApproveByDefaultCmd
1065
property = "ApprovedByDefault"
1066
values_to_set = [dbus.Boolean(True)]
1068
class TestDenyByDefaultCmd(TestPropertyCmd):
1069
command = DenyByDefaultCmd
1070
property = "ApprovedByDefault"
1071
values_to_set = [dbus.Boolean(False)]
1073
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1074
"""Abstract class for tests of PropertyCmd classes using the
1075
ValueArgumentMixIn"""
1077
if type(self) is TestValueArgumentPropertyCmd:
1079
return super(TestValueArgumentPropertyCmd, self).runTest()
1080
def run_command(self, value, clients):
1081
self.command(value).run(None, clients)
1083
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1084
command = SetCheckerCmd
1085
property = "Checker"
1086
values_to_set = ["", ":", "fping -q -- %s"]
1088
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1089
command = SetHostCmd
1091
values_to_set = ["192.0.2.3", "foo.example.org"]
1093
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1094
command = SetSecretCmd
1096
values_to_set = [open("/dev/null", "rb"),
1097
io.BytesIO(b"secret\0xyzzy\nbar")]
1098
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1100
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1101
command = SetTimeoutCmd
1102
property = "Timeout"
1103
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1104
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1106
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1107
command = SetExtendedTimeoutCmd
1108
property = "ExtendedTimeout"
1109
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1110
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1112
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1113
command = SetIntervalCmd
1114
property = "Interval"
1115
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1116
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1118
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1119
command = SetApprovalDelayCmd
1120
property = "ApprovalDelay"
1121
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1122
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1124
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1125
command = SetApprovalDurationCmd
1126
property = "ApprovalDuration"
1127
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1128
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1130
class Test_command_from_options(unittest.TestCase):
1132
self.parser = argparse.ArgumentParser()
1133
add_command_line_options(self.parser)
1134
def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1135
"""Assert that parsing ARGS should result in an instance of
1136
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1137
options = self.parser.parse_args(args)
1138
commands = commands_from_options(options)
1139
self.assertEqual(len(commands), 1)
1140
command = commands[0]
1141
self.assertIsInstance(command, command_cls)
1142
for key, value in cmd_attrs.items():
1143
self.assertEqual(getattr(command, key), value)
1144
def test_print_table(self):
1145
self.assert_command_from_args([], PrintTableCmd,
1148
def test_print_table_verbose(self):
1149
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1152
def test_enable(self):
1153
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1155
def test_disable(self):
1156
self.assert_command_from_args(["--disable", "foo"],
1159
def test_bump_timeout(self):
1160
self.assert_command_from_args(["--bump-timeout", "foo"],
1163
def test_start_checker(self):
1164
self.assert_command_from_args(["--start-checker", "foo"],
1167
def test_stop_checker(self):
1168
self.assert_command_from_args(["--stop-checker", "foo"],
1171
def test_remove(self):
1172
self.assert_command_from_args(["--remove", "foo"],
1175
def test_checker(self):
1176
self.assert_command_from_args(["--checker", ":", "foo"],
1177
SetCheckerCmd, value_to_set=":")
1179
def test_checker_empty(self):
1180
self.assert_command_from_args(["--checker", "", "foo"],
1181
SetCheckerCmd, value_to_set="")
1183
def test_timeout(self):
1184
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1186
value_to_set=300000)
1188
def test_extended_timeout(self):
1189
self.assert_command_from_args(["--extended-timeout", "PT15M",
1191
SetExtendedTimeoutCmd,
1192
value_to_set=900000)
1194
def test_interval(self):
1195
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1197
value_to_set=120000)
1199
def test_approve_by_default(self):
1200
self.assert_command_from_args(["--approve-by-default", "foo"],
1201
ApproveByDefaultCmd)
1203
def test_deny_by_default(self):
1204
self.assert_command_from_args(["--deny-by-default", "foo"],
1207
def test_approval_delay(self):
1208
self.assert_command_from_args(["--approval-delay", "PT30S",
1209
"foo"], SetApprovalDelayCmd,
1212
def test_approval_duration(self):
1213
self.assert_command_from_args(["--approval-duration", "PT1S",
1214
"foo"], SetApprovalDurationCmd,
1217
def test_host(self):
1218
self.assert_command_from_args(["--host", "foo.example.org",
1220
value_to_set="foo.example.org")
1222
def test_secret_devnull(self):
1223
self.assert_command_from_args(["--secret", os.path.devnull,
1224
"foo"], SetSecretCmd,
1227
def test_secret_tempfile(self):
1228
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1229
value = b"secret\0xyzzy\nbar"
1232
self.assert_command_from_args(["--secret", f.name,
1233
"foo"], SetSecretCmd,
1236
def test_approve(self):
1237
self.assert_command_from_args(["--approve", "foo"],
1240
def test_deny(self):
1241
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1243
def test_dump_json(self):
1244
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1246
def test_is_enabled(self):
1247
self.assert_command_from_args(["--is-enabled", "foo"],
1252
934
def should_only_run_tests():
1253
935
parser = argparse.ArgumentParser(add_help=False)