276
276
# Abstract classes first
277
277
class Command(object):
278
278
"""Abstract class for commands"""
279
def run(self, mandos, clients):
279
def run(self, clients, bus=None, mandos=None):
280
280
"""Normal commands should implement run_on_one_client(), but
281
281
commands which want to operate on all clients at the same time
282
282
can override this run() method instead."""
283
283
self.mandos = mandos
284
for client, properties in clients.items():
284
for clientpath, properties in clients.items():
285
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
286
busname, str(clientpath))
287
client = bus.get_object(busname, clientpath)
285
288
self.run_on_one_client(client, properties)
287
290
class PrintCmd(Command):
293
296
"LastApprovalRequest", "ApprovalDelay",
294
297
"ApprovalDuration", "Checker", "ExtendedTimeout",
295
298
"Expires", "LastCheckerStatus")
296
def run(self, mandos, clients):
299
def run(self, clients, bus=None, mandos=None):
297
300
print(self.output(clients.values()))
298
301
def output(self, clients):
299
302
raise NotImplementedError()
435
438
class IsEnabledCmd(Command):
436
def run_on_one_client(self, client, properties):
439
def run(self, clients, bus=None, mandos=None):
440
client, properties = next(iter(clients.items()))
437
441
if self.is_enabled(client, properties):
450
454
class ApproveCmd(Command):
451
455
def run_on_one_client(self, client, properties):
452
log.debug("D-Bus: %s:%s.Approve(True)",
456
log.debug("D-Bus: %s:%s:%s.Approve(True)", busname,
453
457
client.__dbus_object_path__, client_interface)
454
458
client.Approve(dbus.Boolean(True),
455
459
dbus_interface=client_interface)
457
461
class DenyCmd(Command):
458
462
def run_on_one_client(self, client, properties):
459
log.debug("D-Bus: %s:%s.Approve(False)",
463
log.debug("D-Bus: %s:%s:%s.Approve(False)", busname,
460
464
client.__dbus_object_path__, client_interface)
461
465
client.Approve(dbus.Boolean(False),
462
466
dbus_interface=client_interface)
764
768
if not clientnames:
765
clients = {(log.debug("D-Bus: Connect to: (name=%r, path=%r)",
766
busname, str(path)) and False) or
767
bus.get_object(busname, path): properties
768
for path, properties in mandos_clients.items()}
769
clients = {objpath: properties
770
for objpath, properties in mandos_clients.items()}
770
772
for name in clientnames:
771
for path, client in mandos_clients.items():
772
if client["Name"] == name:
773
log.debug("D-Bus: Connect to: (name=%r, path=%r)",
775
client_objc = bus.get_object(busname, path)
776
clients[client_objc] = client
773
for objpath, properties in mandos_clients.items():
774
if properties["Name"] == name:
775
clients[objpath] = properties
779
778
log.critical("Client not found on server: %r", name)
905
904
LastCheckerStatus=-2)
906
905
self.clients = collections.OrderedDict(
908
(self.client, self.client.attributes),
909
(self.other_client, self.other_client.attributes),
907
("/clients/foo", self.client.attributes),
908
("/clients/barbar", self.other_client.attributes),
911
self.one_client = {self.client: self.client.attributes}
910
self.one_client = {"/clients/foo": self.client.attributes}
915
def get_object(client_bus_name, path):
916
self.assertEqual(client_bus_name, busname)
918
"/clients/foo": self.client,
919
"/clients/barbar": self.other_client,
913
923
class TestPrintTableCmd(TestCmd):
914
924
def test_normal(self):
1004
1014
for client, properties in self.clients.items()))
1005
1015
def test_is_enabled_run_exits_successfully(self):
1006
1016
with self.assertRaises(SystemExit) as e:
1007
IsEnabledCmd().run(None, self.one_client)
1017
IsEnabledCmd().run(self.one_client)
1008
1018
if e.exception.code is not None:
1009
1019
self.assertEqual(e.exception.code, 0)
1012
1022
def test_is_enabled_run_exits_with_failure(self):
1013
1023
self.client.attributes["Enabled"] = dbus.Boolean(False)
1014
1024
with self.assertRaises(SystemExit) as e:
1015
IsEnabledCmd().run(None, self.one_client)
1025
IsEnabledCmd().run(self.one_client)
1016
1026
if isinstance(e.exception.code, int):
1017
1027
self.assertNotEqual(e.exception.code, 0)
1027
1037
self.calls.append(("RemoveClient", (dbus_path,)))
1028
1038
mandos = MockMandos()
1029
1039
super(TestRemoveCmd, self).setUp()
1030
RemoveCmd().run(mandos, self.clients)
1040
RemoveCmd().run(self.clients, self.bus, mandos)
1031
1041
self.assertEqual(len(mandos.calls), 2)
1032
for client in self.clients:
1033
self.assertIn(("RemoveClient",
1034
(client.__dbus_object_path__,)),
1042
for clientpath in self.clients:
1043
self.assertIn(("RemoveClient", (clientpath,)),
1037
1046
class TestApproveCmd(TestCmd):
1038
1047
def test_approve(self):
1039
ApproveCmd().run(None, self.clients)
1040
for client in self.clients:
1048
ApproveCmd().run(self.clients, self.bus)
1049
for clientpath in self.clients:
1050
client = self.bus.get_object(busname, clientpath)
1041
1051
self.assertIn(("Approve", (True, client_interface)),
1044
1054
class TestDenyCmd(TestCmd):
1045
1055
def test_deny(self):
1046
DenyCmd().run(None, self.clients)
1047
for client in self.clients:
1056
DenyCmd().run(self.clients, self.bus)
1057
for clientpath in self.clients:
1058
client = self.bus.get_object(busname, clientpath)
1048
1059
self.assertIn(("Approve", (False, client_interface)),
1051
1062
class TestEnableCmd(TestCmd):
1052
1063
def test_enable(self):
1053
for client in self.clients:
1064
for clientpath in self.clients:
1065
client = self.bus.get_object(busname, clientpath)
1054
1066
client.attributes["Enabled"] = False
1056
EnableCmd().run(None, self.clients)
1068
EnableCmd().run(self.clients, self.bus)
1058
for client in self.clients:
1070
for clientpath in self.clients:
1071
client = self.bus.get_object(busname, clientpath)
1059
1072
self.assertTrue(client.attributes["Enabled"])
1061
1074
class TestDisableCmd(TestCmd):
1062
1075
def test_disable(self):
1063
DisableCmd().run(None, self.clients)
1065
for client in self.clients:
1076
DisableCmd().run(self.clients, self.bus)
1077
for clientpath in self.clients:
1078
client = self.bus.get_object(busname, clientpath)
1066
1079
self.assertFalse(client.attributes["Enabled"])
1068
1081
class Unique(object):
1078
1091
self.values_to_set)
1079
1092
for value_to_set, value_to_get in zip(self.values_to_set,
1080
1093
values_to_get):
1081
for client in self.clients:
1094
for clientpath in self.clients:
1095
client = self.bus.get_object(busname, clientpath)
1082
1096
old_value = client.attributes[self.propname]
1083
1097
self.assertNotIsInstance(old_value, Unique)
1084
1098
client.attributes[self.propname] = Unique()
1085
1099
self.run_command(value_to_set, self.clients)
1086
for client in self.clients:
1100
for clientpath in self.clients:
1101
client = self.bus.get_object(busname, clientpath)
1087
1102
value = client.attributes[self.propname]
1088
1103
self.assertNotIsInstance(value, Unique)
1089
1104
self.assertEqual(value, value_to_get)
1090
1105
def run_command(self, value, clients):
1091
self.command().run(None, clients)
1106
self.command().run(clients, self.bus)
1093
1108
class TestBumpTimeoutCmd(TestPropertyCmd):
1094
1109
command = BumpTimeoutCmd