=== modified file 'mandos-ctl' --- mandos-ctl 2019-03-04 19:03:10 +0000 +++ mandos-ctl 2019-03-04 20:39:48 +0000 @@ -278,8 +278,8 @@ commands which want to operate on all clients at the same time can override this run() method instead.""" self.mandos = mandos - for client in clients: - self.run_on_one_client(client) + for client, properties in clients.items(): + self.run_on_one_client(client, properties) class PrintCmd(Command): """Abstract class for commands printing client details""" @@ -295,7 +295,7 @@ class PropertyCmd(Command): """Abstract class for Actions for setting one client property""" - def run_on_one_client(self, client): + def run_on_one_client(self, client, properties): """Set the Client's D-Bus property""" client.Set(client_interface, self.property, self.value_to_set, dbus_interface=dbus.PROPERTIES_IFACE) @@ -419,25 +419,24 @@ return value class IsEnabledCmd(Command): - def run_on_one_client(self, client): - if self.is_enabled(client): + def run_on_one_client(self, client, properties): + if self.is_enabled(client, properties): sys.exit(0) sys.exit(1) - def is_enabled(self, client): - return client.Get(client_interface, "Enabled", - dbus_interface=dbus.PROPERTIES_IFACE) + def is_enabled(self, client, properties): + return bool(properties["Enabled"]) class RemoveCmd(Command): - def run_on_one_client(self, client): + def run_on_one_client(self, client, properties): self.mandos.RemoveClient(client.__dbus_object_path__) class ApproveCmd(Command): - def run_on_one_client(self, client): + def run_on_one_client(self, client, properties): client.Approve(dbus.Boolean(True), dbus_interface=client_interface) class DenyCmd(Command): - def run_on_one_client(self, client): + def run_on_one_client(self, client, properties): client.Approve(dbus.Boolean(False), dbus_interface=client_interface) @@ -799,63 +798,60 @@ testcase.assertEqual(dbus_interface, client_interface) self.calls.append(("Approve", (approve, dbus_interface))) - def __getitem__(self, key): - return self.attributes[key] - def __setitem__(self, key, value): - self.attributes[key] = value - self.clients = collections.OrderedDict([ - ("foo", - MockClient( - "foo", - KeyID=("92ed150794387c03ce684574b1139a65" - "94a34f895daaaf09fd8ea90a27cddb12"), - Secret=b"secret", - Host="foo.example.org", - Enabled=dbus.Boolean(True), - Timeout=300000, - LastCheckedOK="2019-02-03T00:00:00", - Created="2019-01-02T00:00:00", - Interval=120000, - Fingerprint=("778827225BA7DE539C5A" - "7CFA59CFF7CDBD9A5920"), - CheckerRunning=dbus.Boolean(False), - LastEnabled="2019-01-03T00:00:00", - ApprovalPending=dbus.Boolean(False), - ApprovedByDefault=dbus.Boolean(True), - LastApprovalRequest="", - ApprovalDelay=0, - ApprovalDuration=1000, - Checker="fping -q -- %(host)s", - ExtendedTimeout=900000, - Expires="2019-02-04T00:00:00", - LastCheckerStatus=0)), - ("barbar", - MockClient( - "barbar", - KeyID=("0558568eedd67d622f5c83b35a115f79" - "6ab612cff5ad227247e46c2b020f441c"), - Secret=b"secretbar", - Host="192.0.2.3", - Enabled=dbus.Boolean(True), - Timeout=300000, - LastCheckedOK="2019-02-04T00:00:00", - Created="2019-01-03T00:00:00", - Interval=120000, - Fingerprint=("3E393AEAEFB84C7E89E2" - "F547B3A107558FCA3A27"), - CheckerRunning=dbus.Boolean(True), - LastEnabled="2019-01-04T00:00:00", - ApprovalPending=dbus.Boolean(False), - ApprovedByDefault=dbus.Boolean(False), - LastApprovalRequest="2019-01-03T00:00:00", - ApprovalDelay=30000, - ApprovalDuration=1000, - Checker=":", - ExtendedTimeout=900000, - Expires="2019-02-05T00:00:00", - LastCheckerStatus=-2)), + self.client = MockClient( + "foo", + KeyID=("92ed150794387c03ce684574b1139a65" + "94a34f895daaaf09fd8ea90a27cddb12"), + Secret=b"secret", + Host="foo.example.org", + Enabled=dbus.Boolean(True), + Timeout=300000, + LastCheckedOK="2019-02-03T00:00:00", + Created="2019-01-02T00:00:00", + Interval=120000, + Fingerprint=("778827225BA7DE539C5A" + "7CFA59CFF7CDBD9A5920"), + CheckerRunning=dbus.Boolean(False), + LastEnabled="2019-01-03T00:00:00", + ApprovalPending=dbus.Boolean(False), + ApprovedByDefault=dbus.Boolean(True), + LastApprovalRequest="", + ApprovalDelay=0, + ApprovalDuration=1000, + Checker="fping -q -- %(host)s", + ExtendedTimeout=900000, + Expires="2019-02-04T00:00:00", + LastCheckerStatus=0) + self.other_client = MockClient( + "barbar", + KeyID=("0558568eedd67d622f5c83b35a115f79" + "6ab612cff5ad227247e46c2b020f441c"), + Secret=b"secretbar", + Host="192.0.2.3", + Enabled=dbus.Boolean(True), + Timeout=300000, + LastCheckedOK="2019-02-04T00:00:00", + Created="2019-01-03T00:00:00", + Interval=120000, + Fingerprint=("3E393AEAEFB84C7E89E2" + "F547B3A107558FCA3A27"), + CheckerRunning=dbus.Boolean(True), + LastEnabled="2019-01-04T00:00:00", + ApprovalPending=dbus.Boolean(False), + ApprovedByDefault=dbus.Boolean(False), + LastApprovalRequest="2019-01-03T00:00:00", + ApprovalDelay=30000, + ApprovalDuration=1000, + Checker=":", + ExtendedTimeout=900000, + Expires="2019-02-05T00:00:00", + LastCheckerStatus=-2) + self.clients = collections.OrderedDict( + [ + (self.client, self.client.attributes), + (self.other_client, self.other_client.attributes), ]) - self.client = self.clients["foo"] + self.one_client = {self.client: self.client.attributes} class TestPrintTableCmd(TestCmd): def test_normal(self): @@ -875,7 +871,7 @@ """[1:-1] self.assertEqual(output, expected_output) def test_one_client(self): - output = PrintTableCmd().output({"foo": self.client}) + output = PrintTableCmd().output(self.one_client) expected_output = """ Name Enabled Timeout Last Successful Check foo Yes 00:05:00 2019-02-03T00:00:00 @@ -939,33 +935,26 @@ json_data = json.loads(DumpJSONCmd().output(self.clients)) self.assertDictEqual(json_data, self.expected_json) def test_one_client(self): - clients = {"foo": self.client} + clients = self.one_client json_data = json.loads(DumpJSONCmd().output(clients)) expected_json = {"foo": self.expected_json["foo"]} self.assertDictEqual(json_data, expected_json) class TestIsEnabledCmd(TestCmd): def test_is_enabled(self): - self.assertTrue(all(IsEnabledCmd().is_enabled(client) - for client in self.clients.values())) - def test_is_enabled_does_get_attribute(self): - self.assertTrue(IsEnabledCmd().is_enabled(self.client)) - self.assertListEqual(self.client.calls, - [("Get", - ("se.recompile.Mandos.Client", - "Enabled", - "org.freedesktop.DBus.Properties"))]) + self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties) + for client, properties in self.clients.items())) def test_is_enabled_run_exits_successfully(self): with self.assertRaises(SystemExit) as e: - IsEnabledCmd().run(None, [self.client]) + IsEnabledCmd().run(None, self.one_client) if e.exception.code is not None: self.assertEqual(e.exception.code, 0) else: self.assertIsNone(e.exception.code) def test_is_enabled_run_exits_with_failure(self): - self.client["Enabled"] = dbus.Boolean(False) + self.client.attributes["Enabled"] = dbus.Boolean(False) with self.assertRaises(SystemExit) as e: - IsEnabledCmd().run(None, [self.client]) + IsEnabledCmd().run(None, self.one_client) if isinstance(e.exception.code, int): self.assertNotEqual(e.exception.code, 0) else: @@ -979,22 +968,27 @@ def RemoveClient(self, dbus_path): self.calls.append(("RemoveClient", (dbus_path,))) mandos = MockMandos() - RemoveCmd().run(mandos, [self.client]) - self.assertEqual(len(mandos.calls), 1) - self.assertListEqual(mandos.calls, - [("RemoveClient", - (self.client.__dbus_object_path__,))]) + super(TestRemoveCmd, self).setUp() + RemoveCmd().run(mandos, self.clients) + self.assertEqual(len(mandos.calls), 2) + for client in self.clients: + self.assertIn(("RemoveClient", + (client.__dbus_object_path__,)), + mandos.calls) class TestApproveCmd(TestCmd): def test_approve(self): - ApproveCmd().run(None, [self.client]) - self.assertListEqual(self.client.calls, - [("Approve", (True, client_interface))]) + ApproveCmd().run(None, self.clients) + for client in self.clients: + self.assertIn(("Approve", (True, client_interface)), + client.calls) + class TestDenyCmd(TestCmd): - def test_approve(self): - DenyCmd().run(None, [self.client]) - self.assertListEqual(self.client.calls, - [("Approve", (False, client_interface))]) + def test_deny(self): + DenyCmd().run(None, self.clients) + for client in self.clients: + self.assertIn(("Approve", (False, client_interface)), + client.calls)