421
421
class IsEnabledCmd(Command):
422
def run_on_one_client(self, client):
423
if self.is_enabled(client):
422
def run_on_one_client(self, client, properties):
423
if self.is_enabled(client, properties):
426
def is_enabled(self, client):
427
return client.Get(client_interface, "Enabled",
428
dbus_interface=dbus.PROPERTIES_IFACE)
426
def is_enabled(self, client, properties):
427
return bool(properties["Enabled"])
430
429
class RemoveCmd(Command):
431
def run_on_one_client(self, client):
430
def run_on_one_client(self, client, properties):
432
431
self.mandos.RemoveClient(client.__dbus_object_path__)
434
433
class ApproveCmd(Command):
435
def run_on_one_client(self, client):
434
def run_on_one_client(self, client, properties):
436
435
client.Approve(dbus.Boolean(True),
437
436
dbus_interface=client_interface)
439
438
class DenyCmd(Command):
440
def run_on_one_client(self, client):
439
def run_on_one_client(self, client, properties):
441
440
client.Approve(dbus.Boolean(False),
442
441
dbus_interface=client_interface)
799
798
testcase.assertEqual(dbus_interface, client_interface)
800
799
self.calls.append(("Approve", (approve,
801
800
dbus_interface)))
802
def __getitem__(self, key):
803
return self.attributes[key]
804
def __setitem__(self, key, value):
805
self.attributes[key] = value
806
self.clients = collections.OrderedDict([
810
KeyID=("92ed150794387c03ce684574b1139a65"
811
"94a34f895daaaf09fd8ea90a27cddb12"),
813
Host="foo.example.org",
814
Enabled=dbus.Boolean(True),
816
LastCheckedOK="2019-02-03T00:00:00",
817
Created="2019-01-02T00:00:00",
819
Fingerprint=("778827225BA7DE539C5A"
820
"7CFA59CFF7CDBD9A5920"),
821
CheckerRunning=dbus.Boolean(False),
822
LastEnabled="2019-01-03T00:00:00",
823
ApprovalPending=dbus.Boolean(False),
824
ApprovedByDefault=dbus.Boolean(True),
825
LastApprovalRequest="",
827
ApprovalDuration=1000,
828
Checker="fping -q -- %(host)s",
829
ExtendedTimeout=900000,
830
Expires="2019-02-04T00:00:00",
831
LastCheckerStatus=0)),
835
KeyID=("0558568eedd67d622f5c83b35a115f79"
836
"6ab612cff5ad227247e46c2b020f441c"),
839
Enabled=dbus.Boolean(True),
841
LastCheckedOK="2019-02-04T00:00:00",
842
Created="2019-01-03T00:00:00",
844
Fingerprint=("3E393AEAEFB84C7E89E2"
845
"F547B3A107558FCA3A27"),
846
CheckerRunning=dbus.Boolean(True),
847
LastEnabled="2019-01-04T00:00:00",
848
ApprovalPending=dbus.Boolean(False),
849
ApprovedByDefault=dbus.Boolean(False),
850
LastApprovalRequest="2019-01-03T00:00:00",
852
ApprovalDuration=1000,
854
ExtendedTimeout=900000,
855
Expires="2019-02-05T00:00:00",
856
LastCheckerStatus=-2)),
801
self.client = MockClient(
803
KeyID=("92ed150794387c03ce684574b1139a65"
804
"94a34f895daaaf09fd8ea90a27cddb12"),
806
Host="foo.example.org",
807
Enabled=dbus.Boolean(True),
809
LastCheckedOK="2019-02-03T00:00:00",
810
Created="2019-01-02T00:00:00",
812
Fingerprint=("778827225BA7DE539C5A"
813
"7CFA59CFF7CDBD9A5920"),
814
CheckerRunning=dbus.Boolean(False),
815
LastEnabled="2019-01-03T00:00:00",
816
ApprovalPending=dbus.Boolean(False),
817
ApprovedByDefault=dbus.Boolean(True),
818
LastApprovalRequest="",
820
ApprovalDuration=1000,
821
Checker="fping -q -- %(host)s",
822
ExtendedTimeout=900000,
823
Expires="2019-02-04T00:00:00",
825
self.other_client = MockClient(
827
KeyID=("0558568eedd67d622f5c83b35a115f79"
828
"6ab612cff5ad227247e46c2b020f441c"),
831
Enabled=dbus.Boolean(True),
833
LastCheckedOK="2019-02-04T00:00:00",
834
Created="2019-01-03T00:00:00",
836
Fingerprint=("3E393AEAEFB84C7E89E2"
837
"F547B3A107558FCA3A27"),
838
CheckerRunning=dbus.Boolean(True),
839
LastEnabled="2019-01-04T00:00:00",
840
ApprovalPending=dbus.Boolean(False),
841
ApprovedByDefault=dbus.Boolean(False),
842
LastApprovalRequest="2019-01-03T00:00:00",
844
ApprovalDuration=1000,
846
ExtendedTimeout=900000,
847
Expires="2019-02-05T00:00:00",
848
LastCheckerStatus=-2)
849
self.clients = collections.OrderedDict(
851
(self.client, self.client.attributes),
852
(self.other_client, self.other_client.attributes),
858
self.client = self.clients["foo"]
854
self.one_client = {self.client: self.client.attributes}
860
856
class TestPrintTableCmd(TestCmd):
861
857
def test_normal(self):
939
935
json_data = json.loads(DumpJSONCmd().output(self.clients))
940
936
self.assertDictEqual(json_data, self.expected_json)
941
937
def test_one_client(self):
942
clients = {"foo": self.client}
938
clients = self.one_client
943
939
json_data = json.loads(DumpJSONCmd().output(clients))
944
940
expected_json = {"foo": self.expected_json["foo"]}
945
941
self.assertDictEqual(json_data, expected_json)
947
943
class TestIsEnabledCmd(TestCmd):
948
944
def test_is_enabled(self):
949
self.assertTrue(all(IsEnabledCmd().is_enabled(client)
950
for client in self.clients.values()))
951
def test_is_enabled_does_get_attribute(self):
952
self.assertTrue(IsEnabledCmd().is_enabled(self.client))
953
self.assertListEqual(self.client.calls,
955
("se.recompile.Mandos.Client",
957
"org.freedesktop.DBus.Properties"))])
945
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
946
for client, properties in self.clients.items()))
958
947
def test_is_enabled_run_exits_successfully(self):
959
948
with self.assertRaises(SystemExit) as e:
960
IsEnabledCmd().run(None, [self.client])
949
IsEnabledCmd().run(None, self.one_client)
961
950
if e.exception.code is not None:
962
951
self.assertEqual(e.exception.code, 0)
964
953
self.assertIsNone(e.exception.code)
965
954
def test_is_enabled_run_exits_with_failure(self):
966
self.client["Enabled"] = dbus.Boolean(False)
955
self.client.attributes["Enabled"] = dbus.Boolean(False)
967
956
with self.assertRaises(SystemExit) as e:
968
IsEnabledCmd().run(None, [self.client])
957
IsEnabledCmd().run(None, self.one_client)
969
958
if isinstance(e.exception.code, int):
970
959
self.assertNotEqual(e.exception.code, 0)
979
968
def RemoveClient(self, dbus_path):
980
969
self.calls.append(("RemoveClient", (dbus_path,)))
981
970
mandos = MockMandos()
982
RemoveCmd().run(mandos, [self.client])
983
self.assertEqual(len(mandos.calls), 1)
984
self.assertListEqual(mandos.calls,
986
(self.client.__dbus_object_path__,))])
971
super(TestRemoveCmd, self).setUp()
972
RemoveCmd().run(mandos, self.clients)
973
self.assertEqual(len(mandos.calls), 2)
974
for client in self.clients:
975
self.assertIn(("RemoveClient",
976
(client.__dbus_object_path__,)),
988
979
class TestApproveCmd(TestCmd):
989
980
def test_approve(self):
990
ApproveCmd().run(None, [self.client])
991
self.assertListEqual(self.client.calls,
992
[("Approve", (True, client_interface))])
981
ApproveCmd().run(None, self.clients)
982
for client in self.clients:
983
self.assertIn(("Approve", (True, client_interface)),
993
986
class TestDenyCmd(TestCmd):
994
def test_approve(self):
995
DenyCmd().run(None, [self.client])
996
self.assertListEqual(self.client.calls,
997
[("Approve", (False, client_interface))])
988
DenyCmd().run(None, self.clients)
989
for client in self.clients:
990
self.assertIn(("Approve", (False, client_interface)),