796
777
self.__dbus_object_path__ = "objpath_{}".format(name)
797
778
self.attributes = attributes
798
779
self.attributes["Name"] = name
800
def Set(self, interface, property, value, dbus_interface):
780
def Set(interface, property, value,
781
properties_interface):
801
782
testcase.assertEqual(interface, client_interface)
802
testcase.assertEqual(dbus_interface,
783
testcase.assertEqual(properties_interface,
803
784
dbus.PROPERTIES_IFACE)
804
785
self.attributes[property] = value
805
def Get(self, interface, property, dbus_interface):
786
def Get(interface, property, properties_interface):
806
787
testcase.assertEqual(interface, client_interface)
807
testcase.assertEqual(dbus_interface,
788
testcase.assertEqual(properties_interface,
808
789
dbus.PROPERTIES_IFACE)
809
790
return self.attributes[property]
810
def Approve(self, approve, dbus_interface):
811
testcase.assertEqual(dbus_interface, client_interface)
812
self.calls.append(("Approve", (approve,
814
self.client = MockClient(
816
KeyID=("92ed150794387c03ce684574b1139a65"
817
"94a34f895daaaf09fd8ea90a27cddb12"),
819
Host="foo.example.org",
820
Enabled=dbus.Boolean(True),
822
LastCheckedOK="2019-02-03T00:00:00",
823
Created="2019-01-02T00:00:00",
825
Fingerprint=("778827225BA7DE539C5A"
826
"7CFA59CFF7CDBD9A5920"),
827
CheckerRunning=dbus.Boolean(False),
828
LastEnabled="2019-01-03T00:00:00",
829
ApprovalPending=dbus.Boolean(False),
830
ApprovedByDefault=dbus.Boolean(True),
831
LastApprovalRequest="",
833
ApprovalDuration=1000,
834
Checker="fping -q -- %(host)s",
835
ExtendedTimeout=900000,
836
Expires="2019-02-04T00:00:00",
838
self.other_client = MockClient(
840
KeyID=("0558568eedd67d622f5c83b35a115f79"
841
"6ab612cff5ad227247e46c2b020f441c"),
844
Enabled=dbus.Boolean(True),
846
LastCheckedOK="2019-02-04T00:00:00",
847
Created="2019-01-03T00:00:00",
849
Fingerprint=("3E393AEAEFB84C7E89E2"
850
"F547B3A107558FCA3A27"),
851
CheckerRunning=dbus.Boolean(True),
852
LastEnabled="2019-01-04T00:00:00",
853
ApprovalPending=dbus.Boolean(False),
854
ApprovedByDefault=dbus.Boolean(False),
855
LastApprovalRequest="2019-01-03T00:00:00",
857
ApprovalDuration=1000,
859
ExtendedTimeout=900000,
860
Expires="2019-02-05T00:00:00",
861
LastCheckerStatus=-2)
862
self.clients = collections.OrderedDict(
864
(self.client, self.client.attributes),
865
(self.other_client, self.other_client.attributes),
791
def __getitem__(self, key):
792
return self.attributes[key]
793
self.clients = collections.OrderedDict([
797
KeyID=("92ed150794387c03ce684574b1139a65"
798
"94a34f895daaaf09fd8ea90a27cddb12"),
800
Host="foo.example.org",
801
Enabled=dbus.Boolean(True),
803
LastCheckedOK="2019-02-03T00:00:00",
804
Created="2019-01-02T00:00:00",
806
Fingerprint=("778827225BA7DE539C5A"
807
"7CFA59CFF7CDBD9A5920"),
808
CheckerRunning=dbus.Boolean(False),
809
LastEnabled="2019-01-03T00:00:00",
810
ApprovalPending=dbus.Boolean(False),
811
ApprovedByDefault=dbus.Boolean(True),
812
LastApprovalRequest="",
814
ApprovalDuration=1000,
815
Checker="fping -q -- %(host)s",
816
ExtendedTimeout=900000,
817
Expires="2019-02-04T00:00:00",
818
LastCheckerStatus=0)),
822
KeyID=("0558568eedd67d622f5c83b35a115f79"
823
"6ab612cff5ad227247e46c2b020f441c"),
826
Enabled=dbus.Boolean(True),
828
LastCheckedOK="2019-02-04T00:00:00",
829
Created="2019-01-03T00:00:00",
831
Fingerprint=("3E393AEAEFB84C7E89E2"
832
"F547B3A107558FCA3A27"),
833
CheckerRunning=dbus.Boolean(True),
834
LastEnabled="2019-01-04T00:00:00",
835
ApprovalPending=dbus.Boolean(False),
836
ApprovedByDefault=dbus.Boolean(False),
837
LastApprovalRequest="2019-01-03T00:00:00",
839
ApprovalDuration=1000,
841
ExtendedTimeout=900000,
842
Expires="2019-02-05T00:00:00",
843
LastCheckerStatus=-2)),
867
self.one_client = {self.client: self.client.attributes}
869
846
class TestPrintTableCmd(TestCmd):
870
847
def test_normal(self):
885
862
self.assertEqual(output, expected_output)
886
863
def test_one_client(self):
887
output = PrintTableCmd().output(self.one_client)
864
output = PrintTableCmd().output({"foo": self.clients["foo"]})
888
865
expected_output = """
889
866
Name Enabled Timeout Last Successful Check
890
867
foo Yes 00:05:00 2019-02-03T00:00:00
892
869
self.assertEqual(output, expected_output)
894
class TestDumpJSONCmd(TestCmd):
896
self.expected_json = {
899
"KeyID": ("92ed150794387c03ce684574b1139a65"
900
"94a34f895daaaf09fd8ea90a27cddb12"),
901
"Host": "foo.example.org",
904
"LastCheckedOK": "2019-02-03T00:00:00",
905
"Created": "2019-01-02T00:00:00",
907
"Fingerprint": ("778827225BA7DE539C5A"
908
"7CFA59CFF7CDBD9A5920"),
909
"CheckerRunning": False,
910
"LastEnabled": "2019-01-03T00:00:00",
911
"ApprovalPending": False,
912
"ApprovedByDefault": True,
913
"LastApprovalRequest": "",
915
"ApprovalDuration": 1000,
916
"Checker": "fping -q -- %(host)s",
917
"ExtendedTimeout": 900000,
918
"Expires": "2019-02-04T00:00:00",
919
"LastCheckerStatus": 0,
923
"KeyID": ("0558568eedd67d622f5c83b35a115f79"
924
"6ab612cff5ad227247e46c2b020f441c"),
928
"LastCheckedOK": "2019-02-04T00:00:00",
929
"Created": "2019-01-03T00:00:00",
931
"Fingerprint": ("3E393AEAEFB84C7E89E2"
932
"F547B3A107558FCA3A27"),
933
"CheckerRunning": True,
934
"LastEnabled": "2019-01-04T00:00:00",
935
"ApprovalPending": False,
936
"ApprovedByDefault": False,
937
"LastApprovalRequest": "2019-01-03T00:00:00",
938
"ApprovalDelay": 30000,
939
"ApprovalDuration": 1000,
941
"ExtendedTimeout": 900000,
942
"Expires": "2019-02-05T00:00:00",
943
"LastCheckerStatus": -2,
946
return super(TestDumpJSONCmd, self).setUp()
947
def test_normal(self):
948
json_data = json.loads(DumpJSONCmd().output(self.clients))
949
self.assertDictEqual(json_data, self.expected_json)
950
def test_one_client(self):
951
clients = self.one_client
952
json_data = json.loads(DumpJSONCmd().output(clients))
953
expected_json = {"foo": self.expected_json["foo"]}
954
self.assertDictEqual(json_data, expected_json)
956
class TestIsEnabledCmd(TestCmd):
957
def test_is_enabled(self):
958
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
959
for client, properties in self.clients.items()))
960
def test_is_enabled_run_exits_successfully(self):
961
with self.assertRaises(SystemExit) as e:
962
IsEnabledCmd().run(None, self.one_client)
963
if e.exception.code is not None:
964
self.assertEqual(e.exception.code, 0)
966
self.assertIsNone(e.exception.code)
967
def test_is_enabled_run_exits_with_failure(self):
968
self.client.attributes["Enabled"] = dbus.Boolean(False)
969
with self.assertRaises(SystemExit) as e:
970
IsEnabledCmd().run(None, self.one_client)
971
if isinstance(e.exception.code, int):
972
self.assertNotEqual(e.exception.code, 0)
974
self.assertIsNotNone(e.exception.code)
976
class TestRemoveCmd(TestCmd):
977
def test_remove(self):
978
class MockMandos(object):
981
def RemoveClient(self, dbus_path):
982
self.calls.append(("RemoveClient", (dbus_path,)))
983
mandos = MockMandos()
984
super(TestRemoveCmd, self).setUp()
985
RemoveCmd().run(mandos, self.clients)
986
self.assertEqual(len(mandos.calls), 2)
987
for client in self.clients:
988
self.assertIn(("RemoveClient",
989
(client.__dbus_object_path__,)),
992
class TestApproveCmd(TestCmd):
993
def test_approve(self):
994
ApproveCmd().run(None, self.clients)
995
for client in self.clients:
996
self.assertIn(("Approve", (True, client_interface)),
999
class TestDenyCmd(TestCmd):
1000
def test_deny(self):
1001
DenyCmd().run(None, self.clients)
1002
for client in self.clients:
1003
self.assertIn(("Approve", (False, client_interface)),
1006
class TestEnableCmd(TestCmd):
1007
def test_enable(self):
1008
for client in self.clients:
1009
client.attributes["Enabled"] = False
1011
EnableCmd().run(None, self.clients)
1013
for client in self.clients:
1014
self.assertTrue(client.attributes["Enabled"])
1016
class TestDisableCmd(TestCmd):
1017
def test_disable(self):
1018
DisableCmd().run(None, self.clients)
1020
for client in self.clients:
1021
self.assertFalse(client.attributes["Enabled"])
1023
class Unique(object):
1024
"""Class for objects which exist only to be unique objects, since
1025
unittest.mock.sentinel only exists in Python 3.3"""
1027
class TestPropertyCmd(TestCmd):
1028
"""Abstract class for tests of PropertyCmd classes"""
1030
if not hasattr(self, "command"):
1032
values_to_get = getattr(self, "values_to_get",
1034
for value_to_set, value_to_get in zip(self.values_to_set,
1036
for client in self.clients:
1037
old_value = client.attributes[self.property]
1038
self.assertNotIsInstance(old_value, Unique)
1039
client.attributes[self.property] = Unique()
1040
self.run_command(value_to_set, self.clients)
1041
for client in self.clients:
1042
value = client.attributes[self.property]
1043
self.assertNotIsInstance(value, Unique)
1044
self.assertEqual(value, value_to_get)
1045
def run_command(self, value, clients):
1046
self.command().run(None, clients)
1048
class TestBumpTimeoutCmd(TestPropertyCmd):
1049
command = BumpTimeoutCmd
1050
property = "LastCheckedOK"
1051
values_to_set = [""]
1053
class TestStartCheckerCmd(TestPropertyCmd):
1054
command = StartCheckerCmd
1055
property = "CheckerRunning"
1056
values_to_set = [dbus.Boolean(True)]
1058
class TestStopCheckerCmd(TestPropertyCmd):
1059
command = StopCheckerCmd
1060
property = "CheckerRunning"
1061
values_to_set = [dbus.Boolean(False)]
1063
class TestApproveByDefaultCmd(TestPropertyCmd):
1064
command = ApproveByDefaultCmd
1065
property = "ApprovedByDefault"
1066
values_to_set = [dbus.Boolean(True)]
1068
class TestDenyByDefaultCmd(TestPropertyCmd):
1069
command = DenyByDefaultCmd
1070
property = "ApprovedByDefault"
1071
values_to_set = [dbus.Boolean(False)]
1073
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1074
"""Abstract class for tests of PropertyCmd classes using the
1075
ValueArgumentMixIn"""
1077
if type(self) is TestValueArgumentPropertyCmd:
1079
return super(TestValueArgumentPropertyCmd, self).runTest()
1080
def run_command(self, value, clients):
1081
self.command(value).run(None, clients)
1083
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1084
command = SetCheckerCmd
1085
property = "Checker"
1086
values_to_set = ["", ":", "fping -q -- %s"]
1088
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1089
command = SetHostCmd
1091
values_to_set = ["192.0.2.3", "foo.example.org"]
1093
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1094
command = SetSecretCmd
1096
values_to_set = [open("/dev/null", "rb"),
1097
io.BytesIO(b"secret\0xyzzy\nbar")]
1098
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1100
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1101
command = SetTimeoutCmd
1102
property = "Timeout"
1103
values_to_set = [datetime.timedelta(),
1104
datetime.timedelta(minutes=5),
1105
datetime.timedelta(seconds=1),
1106
datetime.timedelta(weeks=1),
1107
datetime.timedelta(weeks=52)]
1108
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1110
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1111
command = SetExtendedTimeoutCmd
1112
property = "ExtendedTimeout"
1113
values_to_set = [datetime.timedelta(),
1114
datetime.timedelta(minutes=5),
1115
datetime.timedelta(seconds=1),
1116
datetime.timedelta(weeks=1),
1117
datetime.timedelta(weeks=52)]
1118
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1120
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1121
command = SetIntervalCmd
1122
property = "Interval"
1123
values_to_set = [datetime.timedelta(),
1124
datetime.timedelta(minutes=5),
1125
datetime.timedelta(seconds=1),
1126
datetime.timedelta(weeks=1),
1127
datetime.timedelta(weeks=52)]
1128
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1130
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1131
command = SetApprovalDelayCmd
1132
property = "ApprovalDelay"
1133
values_to_set = [datetime.timedelta(),
1134
datetime.timedelta(minutes=5),
1135
datetime.timedelta(seconds=1),
1136
datetime.timedelta(weeks=1),
1137
datetime.timedelta(weeks=52)]
1138
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1140
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1141
command = SetApprovalDurationCmd
1142
property = "ApprovalDuration"
1143
values_to_set = [datetime.timedelta(),
1144
datetime.timedelta(minutes=5),
1145
datetime.timedelta(seconds=1),
1146
datetime.timedelta(weeks=1),
1147
datetime.timedelta(weeks=52)]
1148
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1150
class Test_command_from_options(unittest.TestCase):
1152
self.parser = argparse.ArgumentParser()
1153
add_command_line_options(self.parser)
1154
def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1155
"""Assert that parsing ARGS should result in an instance of
1156
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1157
options = self.parser.parse_args(args)
1158
commands = commands_from_options(options)
1159
self.assertEqual(len(commands), 1)
1160
command = commands[0]
1161
self.assertIsInstance(command, command_cls)
1162
for key, value in cmd_attrs.items():
1163
self.assertEqual(getattr(command, key), value)
1164
def test_print_table(self):
1165
self.assert_command_from_args([], PrintTableCmd,
1168
def test_print_table_verbose(self):
1169
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1172
def test_print_table_verbose_short(self):
1173
self.assert_command_from_args(["-v"], PrintTableCmd,
1176
def test_enable(self):
1177
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1179
def test_enable_short(self):
1180
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1182
def test_disable(self):
1183
self.assert_command_from_args(["--disable", "foo"],
1186
def test_disable_short(self):
1187
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1189
def test_bump_timeout(self):
1190
self.assert_command_from_args(["--bump-timeout", "foo"],
1193
def test_bump_timeout_short(self):
1194
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1196
def test_start_checker(self):
1197
self.assert_command_from_args(["--start-checker", "foo"],
1200
def test_stop_checker(self):
1201
self.assert_command_from_args(["--stop-checker", "foo"],
1204
def test_remove(self):
1205
self.assert_command_from_args(["--remove", "foo"],
1208
def test_remove_short(self):
1209
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1211
def test_checker(self):
1212
self.assert_command_from_args(["--checker", ":", "foo"],
1213
SetCheckerCmd, value_to_set=":")
1215
def test_checker_empty(self):
1216
self.assert_command_from_args(["--checker", "", "foo"],
1217
SetCheckerCmd, value_to_set="")
1219
def test_checker_short(self):
1220
self.assert_command_from_args(["-c", ":", "foo"],
1221
SetCheckerCmd, value_to_set=":")
1223
def test_timeout(self):
1224
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1226
value_to_set=300000)
1228
def test_timeout_short(self):
1229
self.assert_command_from_args(["-t", "PT5M", "foo"],
1231
value_to_set=300000)
1233
def test_extended_timeout(self):
1234
self.assert_command_from_args(["--extended-timeout", "PT15M",
1236
SetExtendedTimeoutCmd,
1237
value_to_set=900000)
1239
def test_interval(self):
1240
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1242
value_to_set=120000)
1244
def test_interval_short(self):
1245
self.assert_command_from_args(["-i", "PT2M", "foo"],
1247
value_to_set=120000)
1249
def test_approve_by_default(self):
1250
self.assert_command_from_args(["--approve-by-default", "foo"],
1251
ApproveByDefaultCmd)
1253
def test_deny_by_default(self):
1254
self.assert_command_from_args(["--deny-by-default", "foo"],
1257
def test_approval_delay(self):
1258
self.assert_command_from_args(["--approval-delay", "PT30S",
1259
"foo"], SetApprovalDelayCmd,
1262
def test_approval_duration(self):
1263
self.assert_command_from_args(["--approval-duration", "PT1S",
1264
"foo"], SetApprovalDurationCmd,
1267
def test_host(self):
1268
self.assert_command_from_args(["--host", "foo.example.org",
1270
value_to_set="foo.example.org")
1272
def test_host_short(self):
1273
self.assert_command_from_args(["-H", "foo.example.org",
1275
value_to_set="foo.example.org")
1277
def test_secret_devnull(self):
1278
self.assert_command_from_args(["--secret", os.path.devnull,
1279
"foo"], SetSecretCmd,
1282
def test_secret_tempfile(self):
1283
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1284
value = b"secret\0xyzzy\nbar"
1287
self.assert_command_from_args(["--secret", f.name,
1288
"foo"], SetSecretCmd,
1291
def test_secret_devnull_short(self):
1292
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1293
SetSecretCmd, value_to_set=b"")
1295
def test_secret_tempfile_short(self):
1296
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1297
value = b"secret\0xyzzy\nbar"
1300
self.assert_command_from_args(["-s", f.name, "foo"],
1304
def test_approve(self):
1305
self.assert_command_from_args(["--approve", "foo"],
1308
def test_approve_short(self):
1309
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1311
def test_deny(self):
1312
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1314
def test_deny_short(self):
1315
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1317
def test_dump_json(self):
1318
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1320
def test_is_enabled(self):
1321
self.assert_command_from_args(["--is-enabled", "foo"],
1324
def test_is_enabled_short(self):
1325
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1329
873
def should_only_run_tests():