786
777
self.__dbus_object_path__ = "objpath_{}".format(name)
787
778
self.attributes = attributes
788
779
self.attributes["Name"] = name
790
def Set(self, interface, property, value, dbus_interface):
780
def Set(interface, property, value,
781
properties_interface):
791
782
testcase.assertEqual(interface, client_interface)
792
testcase.assertEqual(dbus_interface,
783
testcase.assertEqual(properties_interface,
793
784
dbus.PROPERTIES_IFACE)
794
785
self.attributes[property] = value
795
def Get(self, interface, property, dbus_interface):
786
def Get(interface, property, properties_interface):
796
787
testcase.assertEqual(interface, client_interface)
797
testcase.assertEqual(dbus_interface,
788
testcase.assertEqual(properties_interface,
798
789
dbus.PROPERTIES_IFACE)
799
790
return self.attributes[property]
800
def Approve(self, approve, dbus_interface):
801
testcase.assertEqual(dbus_interface, client_interface)
802
self.calls.append(("Approve", (approve,
804
self.client = MockClient(
806
KeyID=("92ed150794387c03ce684574b1139a65"
807
"94a34f895daaaf09fd8ea90a27cddb12"),
809
Host="foo.example.org",
810
Enabled=dbus.Boolean(True),
812
LastCheckedOK="2019-02-03T00:00:00",
813
Created="2019-01-02T00:00:00",
815
Fingerprint=("778827225BA7DE539C5A"
816
"7CFA59CFF7CDBD9A5920"),
817
CheckerRunning=dbus.Boolean(False),
818
LastEnabled="2019-01-03T00:00:00",
819
ApprovalPending=dbus.Boolean(False),
820
ApprovedByDefault=dbus.Boolean(True),
821
LastApprovalRequest="",
823
ApprovalDuration=1000,
824
Checker="fping -q -- %(host)s",
825
ExtendedTimeout=900000,
826
Expires="2019-02-04T00:00:00",
828
self.other_client = MockClient(
830
KeyID=("0558568eedd67d622f5c83b35a115f79"
831
"6ab612cff5ad227247e46c2b020f441c"),
834
Enabled=dbus.Boolean(True),
836
LastCheckedOK="2019-02-04T00:00:00",
837
Created="2019-01-03T00:00:00",
839
Fingerprint=("3E393AEAEFB84C7E89E2"
840
"F547B3A107558FCA3A27"),
841
CheckerRunning=dbus.Boolean(True),
842
LastEnabled="2019-01-04T00:00:00",
843
ApprovalPending=dbus.Boolean(False),
844
ApprovedByDefault=dbus.Boolean(False),
845
LastApprovalRequest="2019-01-03T00:00:00",
847
ApprovalDuration=1000,
849
ExtendedTimeout=900000,
850
Expires="2019-02-05T00:00:00",
851
LastCheckerStatus=-2)
852
self.clients = collections.OrderedDict(
854
(self.client, self.client.attributes),
855
(self.other_client, self.other_client.attributes),
791
def __getitem__(self, key):
792
return self.attributes[key]
793
self.clients = collections.OrderedDict([
797
KeyID=("92ed150794387c03ce684574b1139a65"
798
"94a34f895daaaf09fd8ea90a27cddb12"),
800
Host="foo.example.org",
801
Enabled=dbus.Boolean(True),
803
LastCheckedOK="2019-02-03T00:00:00",
804
Created="2019-01-02T00:00:00",
806
Fingerprint=("778827225BA7DE539C5A"
807
"7CFA59CFF7CDBD9A5920"),
808
CheckerRunning=dbus.Boolean(False),
809
LastEnabled="2019-01-03T00:00:00",
810
ApprovalPending=dbus.Boolean(False),
811
ApprovedByDefault=dbus.Boolean(True),
812
LastApprovalRequest="",
814
ApprovalDuration=1000,
815
Checker="fping -q -- %(host)s",
816
ExtendedTimeout=900000,
817
Expires="2019-02-04T00:00:00",
818
LastCheckerStatus=0)),
822
KeyID=("0558568eedd67d622f5c83b35a115f79"
823
"6ab612cff5ad227247e46c2b020f441c"),
826
Enabled=dbus.Boolean(True),
828
LastCheckedOK="2019-02-04T00:00:00",
829
Created="2019-01-03T00:00:00",
831
Fingerprint=("3E393AEAEFB84C7E89E2"
832
"F547B3A107558FCA3A27"),
833
CheckerRunning=dbus.Boolean(True),
834
LastEnabled="2019-01-04T00:00:00",
835
ApprovalPending=dbus.Boolean(False),
836
ApprovedByDefault=dbus.Boolean(False),
837
LastApprovalRequest="2019-01-03T00:00:00",
839
ApprovalDuration=1000,
841
ExtendedTimeout=900000,
842
Expires="2019-02-05T00:00:00",
843
LastCheckerStatus=-2)),
857
self.one_client = {self.client: self.client.attributes}
859
846
class TestPrintTableCmd(TestCmd):
860
847
def test_normal(self):
875
862
self.assertEqual(output, expected_output)
876
863
def test_one_client(self):
877
output = PrintTableCmd().output(self.one_client)
864
output = PrintTableCmd().output({"foo": self.clients["foo"]})
878
865
expected_output = """
879
866
Name Enabled Timeout Last Successful Check
880
867
foo Yes 00:05:00 2019-02-03T00:00:00
882
869
self.assertEqual(output, expected_output)
884
class TestDumpJSONCmd(TestCmd):
886
self.expected_json = {
889
"KeyID": ("92ed150794387c03ce684574b1139a65"
890
"94a34f895daaaf09fd8ea90a27cddb12"),
891
"Host": "foo.example.org",
894
"LastCheckedOK": "2019-02-03T00:00:00",
895
"Created": "2019-01-02T00:00:00",
897
"Fingerprint": ("778827225BA7DE539C5A"
898
"7CFA59CFF7CDBD9A5920"),
899
"CheckerRunning": False,
900
"LastEnabled": "2019-01-03T00:00:00",
901
"ApprovalPending": False,
902
"ApprovedByDefault": True,
903
"LastApprovalRequest": "",
905
"ApprovalDuration": 1000,
906
"Checker": "fping -q -- %(host)s",
907
"ExtendedTimeout": 900000,
908
"Expires": "2019-02-04T00:00:00",
909
"LastCheckerStatus": 0,
913
"KeyID": ("0558568eedd67d622f5c83b35a115f79"
914
"6ab612cff5ad227247e46c2b020f441c"),
918
"LastCheckedOK": "2019-02-04T00:00:00",
919
"Created": "2019-01-03T00:00:00",
921
"Fingerprint": ("3E393AEAEFB84C7E89E2"
922
"F547B3A107558FCA3A27"),
923
"CheckerRunning": True,
924
"LastEnabled": "2019-01-04T00:00:00",
925
"ApprovalPending": False,
926
"ApprovedByDefault": False,
927
"LastApprovalRequest": "2019-01-03T00:00:00",
928
"ApprovalDelay": 30000,
929
"ApprovalDuration": 1000,
931
"ExtendedTimeout": 900000,
932
"Expires": "2019-02-05T00:00:00",
933
"LastCheckerStatus": -2,
936
return super(TestDumpJSONCmd, self).setUp()
937
def test_normal(self):
938
json_data = json.loads(DumpJSONCmd().output(self.clients))
939
self.assertDictEqual(json_data, self.expected_json)
940
def test_one_client(self):
941
clients = self.one_client
942
json_data = json.loads(DumpJSONCmd().output(clients))
943
expected_json = {"foo": self.expected_json["foo"]}
944
self.assertDictEqual(json_data, expected_json)
946
class TestIsEnabledCmd(TestCmd):
947
def test_is_enabled(self):
948
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
949
for client, properties in self.clients.items()))
950
def test_is_enabled_run_exits_successfully(self):
951
with self.assertRaises(SystemExit) as e:
952
IsEnabledCmd().run(None, self.one_client)
953
if e.exception.code is not None:
954
self.assertEqual(e.exception.code, 0)
956
self.assertIsNone(e.exception.code)
957
def test_is_enabled_run_exits_with_failure(self):
958
self.client.attributes["Enabled"] = dbus.Boolean(False)
959
with self.assertRaises(SystemExit) as e:
960
IsEnabledCmd().run(None, self.one_client)
961
if isinstance(e.exception.code, int):
962
self.assertNotEqual(e.exception.code, 0)
964
self.assertIsNotNone(e.exception.code)
966
class TestRemoveCmd(TestCmd):
967
def test_remove(self):
968
class MockMandos(object):
971
def RemoveClient(self, dbus_path):
972
self.calls.append(("RemoveClient", (dbus_path,)))
973
mandos = MockMandos()
974
super(TestRemoveCmd, self).setUp()
975
RemoveCmd().run(mandos, self.clients)
976
self.assertEqual(len(mandos.calls), 2)
977
for client in self.clients:
978
self.assertIn(("RemoveClient",
979
(client.__dbus_object_path__,)),
982
class TestApproveCmd(TestCmd):
983
def test_approve(self):
984
ApproveCmd().run(None, self.clients)
985
for client in self.clients:
986
self.assertIn(("Approve", (True, client_interface)),
989
class TestDenyCmd(TestCmd):
991
DenyCmd().run(None, self.clients)
992
for client in self.clients:
993
self.assertIn(("Approve", (False, client_interface)),
996
class TestEnableCmd(TestCmd):
997
def test_enable(self):
998
for client in self.clients:
999
client.attributes["Enabled"] = False
1001
EnableCmd().run(None, self.clients)
1003
for client in self.clients:
1004
self.assertTrue(client.attributes["Enabled"])
1006
class TestDisableCmd(TestCmd):
1007
def test_disable(self):
1008
DisableCmd().run(None, self.clients)
1010
for client in self.clients:
1011
self.assertFalse(client.attributes["Enabled"])
1013
class Unique(object):
1014
"""Class for objects which exist only to be unique objects, since
1015
unittest.mock.sentinel only exists in Python 3.3"""
1017
class TestPropertyCmd(TestCmd):
1018
"""Abstract class for tests of PropertyCmd classes"""
1020
if not hasattr(self, "command"):
1022
values_to_get = getattr(self, "values_to_get",
1024
for value_to_set, value_to_get in zip(self.values_to_set,
1026
for client in self.clients:
1027
old_value = client.attributes[self.property]
1028
self.assertNotIsInstance(old_value, Unique)
1029
client.attributes[self.property] = Unique()
1030
self.run_command(value_to_set, self.clients)
1031
for client in self.clients:
1032
value = client.attributes[self.property]
1033
self.assertNotIsInstance(value, Unique)
1034
self.assertEqual(value, value_to_get)
1035
def run_command(self, value, clients):
1036
self.command().run(None, clients)
1038
class TestBumpTimeoutCmd(TestPropertyCmd):
1039
command = BumpTimeoutCmd
1040
property = "LastCheckedOK"
1041
values_to_set = [""]
1043
class TestStartCheckerCmd(TestPropertyCmd):
1044
command = StartCheckerCmd
1045
property = "CheckerRunning"
1046
values_to_set = [dbus.Boolean(True)]
1048
class TestStopCheckerCmd(TestPropertyCmd):
1049
command = StopCheckerCmd
1050
property = "CheckerRunning"
1051
values_to_set = [dbus.Boolean(False)]
1053
class TestApproveByDefaultCmd(TestPropertyCmd):
1054
command = ApproveByDefaultCmd
1055
property = "ApprovedByDefault"
1056
values_to_set = [dbus.Boolean(True)]
1058
class TestDenyByDefaultCmd(TestPropertyCmd):
1059
command = DenyByDefaultCmd
1060
property = "ApprovedByDefault"
1061
values_to_set = [dbus.Boolean(False)]
1063
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1064
"""Abstract class for tests of PropertyCmd classes using the
1065
ValueArgumentMixIn"""
1067
if type(self) is TestValueArgumentPropertyCmd:
1069
return super(TestValueArgumentPropertyCmd, self).runTest()
1070
def run_command(self, value, clients):
1071
self.command(value).run(None, clients)
1073
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1074
command = SetCheckerCmd
1075
property = "Checker"
1076
values_to_set = ["", ":", "fping -q -- %s"]
1078
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1079
command = SetHostCmd
1081
values_to_set = ["192.0.2.3", "foo.example.org"]
1083
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1084
command = SetSecretCmd
1086
values_to_set = [b"", b"secret"]
1088
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1089
command = SetTimeoutCmd
1090
property = "Timeout"
1091
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1092
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1094
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1095
command = SetExtendedTimeoutCmd
1096
property = "ExtendedTimeout"
1097
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1098
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1100
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1101
command = SetIntervalCmd
1102
property = "Interval"
1103
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1104
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1106
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1107
command = SetApprovalDelayCmd
1108
property = "ApprovalDelay"
1109
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1110
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1112
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1113
command = SetApprovalDurationCmd
1114
property = "ApprovalDuration"
1115
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1116
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1120
873
def should_only_run_tests():