596
594
class Remove(Base):
597
def run_on_one_client(self, client, properties):
598
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)",
599
dbus_busname, server_dbus_path,
600
server_dbus_interface,
601
str(client.__dbus_object_path__))
602
self.mandos.RemoveClient(client.__dbus_object_path__)
595
def run(self, clients, bus, mandos):
596
for clientpath in clients.keys():
597
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)",
598
dbus_busname, server_dbus_path,
599
server_dbus_interface, clientpath)
600
mandos.RemoveClient(clientpath)
605
603
class Output(Base):
613
611
"Checker", "ExtendedTimeout", "Expires",
614
612
"LastCheckerStatus")
616
def run(self, clients, bus=None, mandos=None):
617
print(self.output(clients.values()))
619
def output(self, clients):
620
raise NotImplementedError()
623
615
class DumpJSON(Output):
624
def output(self, clients):
616
def run(self, clients, bus=None, mandos=None):
625
617
data = {client["Name"]:
626
618
{key: self.dbus_boolean_to_bool(client[key])
627
619
for key in self.all_keywords}
628
for client in clients}
629
return json.dumps(data, indent=4, separators=(',', ': '))
620
for client in clients.values()}
621
print(json.dumps(data, indent=4, separators=(',', ': ')))
632
624
def dbus_boolean_to_bool(value):
753
745
raise NotImplementedError()
756
class Enable(Property):
748
class Enable(PropertySetter):
757
749
propname = "Enabled"
758
750
value_to_set = dbus.Boolean(True)
761
class Disable(Property):
753
class Disable(PropertySetter):
762
754
propname = "Enabled"
763
755
value_to_set = dbus.Boolean(False)
766
class BumpTimeout(Property):
758
class BumpTimeout(PropertySetter):
767
759
propname = "LastCheckedOK"
768
760
value_to_set = ""
771
class StartChecker(Property):
772
propname = "CheckerRunning"
773
value_to_set = dbus.Boolean(True)
776
class StopChecker(Property):
777
propname = "CheckerRunning"
778
value_to_set = dbus.Boolean(False)
781
class ApproveByDefault(Property):
782
propname = "ApprovedByDefault"
783
value_to_set = dbus.Boolean(True)
786
class DenyByDefault(Property):
787
propname = "ApprovedByDefault"
788
value_to_set = dbus.Boolean(False)
791
class PropertyValue(Property):
792
"Abstract class for Property recieving a value as argument"
763
class StartChecker(PropertySetter):
764
propname = "CheckerRunning"
765
value_to_set = dbus.Boolean(True)
768
class StopChecker(PropertySetter):
769
propname = "CheckerRunning"
770
value_to_set = dbus.Boolean(False)
773
class ApproveByDefault(PropertySetter):
774
propname = "ApprovedByDefault"
775
value_to_set = dbus.Boolean(True)
778
class DenyByDefault(PropertySetter):
779
propname = "ApprovedByDefault"
780
value_to_set = dbus.Boolean(False)
783
class PropertySetterValue(PropertySetter):
784
"""Abstract class for PropertySetter recieving a value as
785
constructor argument instead of a class attribute."""
793
786
def __init__(self, value):
794
787
self.value_to_set = value
797
class SetChecker(PropertyValue):
790
class SetChecker(PropertySetterValue):
798
791
propname = "Checker"
801
class SetHost(PropertyValue):
794
class SetHost(PropertySetterValue):
802
795
propname = "Host"
805
class SetSecret(PropertyValue):
798
class SetSecret(PropertySetterValue):
806
799
propname = "Secret"
1112
1106
class Test_get_managed_objects(TestCaseWithAssertLogs):
1113
1107
def test_calls_and_returns_GetManagedObjects(self):
1114
managed_objects = {"/clients/foo": { "Name": "foo"}}
1115
class MockObjectManager(object):
1108
managed_objects = {"/clients/client": { "Name": "client"}}
1109
class ObjectManagerStub(object):
1116
1110
def GetManagedObjects(self):
1117
1111
return managed_objects
1118
retval = get_managed_objects(MockObjectManager())
1112
retval = get_managed_objects(ObjectManagerStub())
1119
1113
self.assertDictEqual(managed_objects, retval)
1121
1115
def test_logs_and_exits_on_dbus_error(self):
1122
1116
dbus_logger = logging.getLogger("dbus.proxies")
1124
class MockObjectManagerFailing(object):
1118
class ObjectManagerFailingStub(object):
1125
1119
def GetManagedObjects(self):
1126
1120
dbus_logger.error("Test")
1127
1121
raise dbus.exceptions.DBusException("Test")
1178
1172
self.assertEqual(value, getattr(command, key))
1180
1174
def test_is_enabled_short(self):
1181
self.assert_command_from_args(["-V", "foo"],
1175
self.assert_command_from_args(["-V", "client"],
1182
1176
command.IsEnabled)
1184
1178
def test_approve(self):
1185
self.assert_command_from_args(["--approve", "foo"],
1179
self.assert_command_from_args(["--approve", "client"],
1186
1180
command.Approve)
1188
1182
def test_approve_short(self):
1189
self.assert_command_from_args(["-A", "foo"], command.Approve)
1183
self.assert_command_from_args(["-A", "client"],
1191
1186
def test_deny(self):
1192
self.assert_command_from_args(["--deny", "foo"], command.Deny)
1187
self.assert_command_from_args(["--deny", "client"],
1194
1190
def test_deny_short(self):
1195
self.assert_command_from_args(["-D", "foo"], command.Deny)
1191
self.assert_command_from_args(["-D", "client"], command.Deny)
1197
1193
def test_remove(self):
1198
self.assert_command_from_args(["--remove", "foo"],
1194
self.assert_command_from_args(["--remove", "client"],
1199
1195
command.Remove)
1201
1197
def test_deny_before_remove(self):
1202
1198
options = self.parser.parse_args(["--deny", "--remove",
1204
1200
check_option_syntax(self.parser, options)
1205
1201
commands = commands_from_options(options)
1206
1202
self.assertEqual(2, len(commands))
1217
1213
self.assertIsInstance(commands[1], command.Remove)
1219
1215
def test_remove_short(self):
1220
self.assert_command_from_args(["-r", "foo"], command.Remove)
1216
self.assert_command_from_args(["-r", "client"],
1222
1219
def test_dump_json(self):
1223
1220
self.assert_command_from_args(["--dump-json"],
1224
1221
command.DumpJSON)
1226
1223
def test_enable(self):
1227
self.assert_command_from_args(["--enable", "foo"],
1224
self.assert_command_from_args(["--enable", "client"],
1228
1225
command.Enable)
1230
1227
def test_enable_short(self):
1231
self.assert_command_from_args(["-e", "foo"], command.Enable)
1228
self.assert_command_from_args(["-e", "client"],
1233
1231
def test_disable(self):
1234
self.assert_command_from_args(["--disable", "foo"],
1232
self.assert_command_from_args(["--disable", "client"],
1235
1233
command.Disable)
1237
1235
def test_disable_short(self):
1238
self.assert_command_from_args(["-d", "foo"], command.Disable)
1236
self.assert_command_from_args(["-d", "client"],
1240
1239
def test_bump_timeout(self):
1241
self.assert_command_from_args(["--bump-timeout", "foo"],
1240
self.assert_command_from_args(["--bump-timeout", "client"],
1242
1241
command.BumpTimeout)
1244
1243
def test_bump_timeout_short(self):
1245
self.assert_command_from_args(["-b", "foo"],
1244
self.assert_command_from_args(["-b", "client"],
1246
1245
command.BumpTimeout)
1248
1247
def test_start_checker(self):
1249
self.assert_command_from_args(["--start-checker", "foo"],
1248
self.assert_command_from_args(["--start-checker", "client"],
1250
1249
command.StartChecker)
1252
1251
def test_stop_checker(self):
1253
self.assert_command_from_args(["--stop-checker", "foo"],
1252
self.assert_command_from_args(["--stop-checker", "client"],
1254
1253
command.StopChecker)
1256
1255
def test_approve_by_default(self):
1257
self.assert_command_from_args(["--approve-by-default", "foo"],
1256
self.assert_command_from_args(["--approve-by-default",
1258
1258
command.ApproveByDefault)
1260
1260
def test_deny_by_default(self):
1261
self.assert_command_from_args(["--deny-by-default", "foo"],
1261
self.assert_command_from_args(["--deny-by-default", "client"],
1262
1262
command.DenyByDefault)
1264
1264
def test_checker(self):
1265
self.assert_command_from_args(["--checker", ":", "foo"],
1265
self.assert_command_from_args(["--checker", ":", "client"],
1266
1266
command.SetChecker,
1267
1267
value_to_set=":")
1269
1269
def test_checker_empty(self):
1270
self.assert_command_from_args(["--checker", "", "foo"],
1270
self.assert_command_from_args(["--checker", "", "client"],
1271
1271
command.SetChecker,
1272
1272
value_to_set="")
1274
1274
def test_checker_short(self):
1275
self.assert_command_from_args(["-c", ":", "foo"],
1275
self.assert_command_from_args(["-c", ":", "client"],
1276
1276
command.SetChecker,
1277
1277
value_to_set=":")
1279
1279
def test_host(self):
1280
self.assert_command_from_args(["--host", "foo.example.org",
1281
"foo"], command.SetHost,
1282
value_to_set="foo.example.org")
1280
self.assert_command_from_args(
1281
["--host", "client.example.org", "client"],
1282
command.SetHost, value_to_set="client.example.org")
1284
1284
def test_host_short(self):
1285
self.assert_command_from_args(["-H", "foo.example.org",
1286
"foo"], command.SetHost,
1287
value_to_set="foo.example.org")
1285
self.assert_command_from_args(
1286
["-H", "client.example.org", "client"], command.SetHost,
1287
value_to_set="client.example.org")
1289
1289
def test_secret_devnull(self):
1290
1290
self.assert_command_from_args(["--secret", os.path.devnull,
1291
"foo"], command.SetSecret,
1291
"client"], command.SetSecret,
1292
1292
value_to_set=b"")
1294
1294
def test_secret_tempfile(self):
1310
1311
value = b"secret\0xyzzy\nbar"
1313
self.assert_command_from_args(["-s", f.name, "foo"],
1314
self.assert_command_from_args(["-s", f.name, "client"],
1314
1315
command.SetSecret,
1315
1316
value_to_set=value)
1317
1318
def test_timeout(self):
1318
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1319
self.assert_command_from_args(["--timeout", "PT5M", "client"],
1319
1320
command.SetTimeout,
1320
1321
value_to_set=300000)
1322
1323
def test_timeout_short(self):
1323
self.assert_command_from_args(["-t", "PT5M", "foo"],
1324
self.assert_command_from_args(["-t", "PT5M", "client"],
1324
1325
command.SetTimeout,
1325
1326
value_to_set=300000)
1327
1328
def test_extended_timeout(self):
1328
1329
self.assert_command_from_args(["--extended-timeout", "PT15M",
1330
1331
command.SetExtendedTimeout,
1331
1332
value_to_set=900000)
1333
1334
def test_interval(self):
1334
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1335
command.SetInterval,
1335
self.assert_command_from_args(["--interval", "PT2M",
1336
"client"], command.SetInterval,
1336
1337
value_to_set=120000)
1338
1339
def test_interval_short(self):
1339
self.assert_command_from_args(["-i", "PT2M", "foo"],
1340
self.assert_command_from_args(["-i", "PT2M", "client"],
1340
1341
command.SetInterval,
1341
1342
value_to_set=120000)
1343
1344
def test_approval_delay(self):
1344
1345
self.assert_command_from_args(["--approval-delay", "PT30S",
1346
1347
command.SetApprovalDelay,
1347
1348
value_to_set=30000)
1349
1350
def test_approval_duration(self):
1350
1351
self.assert_command_from_args(["--approval-duration", "PT1S",
1352
1353
command.SetApprovalDuration,
1353
1354
value_to_set=1000)
1437
1438
LastCheckerStatus=-2)
1438
1439
self.clients = collections.OrderedDict(
1440
("/clients/foo", self.client.attributes),
1441
("/clients/barbar", self.other_client.attributes),
1441
(self.client.__dbus_object_path__,
1442
self.client.attributes),
1443
(self.other_client.__dbus_object_path__,
1444
self.other_client.attributes),
1443
self.one_client = {"/clients/foo": self.client.attributes}
1446
self.one_client = {self.client.__dbus_object_path__:
1447
self.client.attributes}
1451
class MockBus(object):
1449
1453
def get_object(client_bus_name, path):
1450
1454
self.assertEqual(dbus_busname, client_bus_name)
1452
# Note: "self" here is the TestCmd instance, not
1453
# the Bus instance, since this is a static method!
1454
"/clients/foo": self.client,
1455
"/clients/barbar": self.other_client,
1455
# Note: "self" here is the TestCmd instance, not the
1456
# MockBus instance, since this is a static method!
1457
if path == self.client.__dbus_object_path__:
1459
elif path == self.other_client.__dbus_object_path__:
1460
return self.other_client
1460
1464
class TestBaseCommands(TestCommand):
1721
1725
self.command().run(clients, self.bus)
1724
class TestEnableCmd(TestPropertyCmd):
1728
class TestEnableCmd(TestPropertySetterCmd):
1725
1729
command = command.Enable
1726
1730
propname = "Enabled"
1727
1731
values_to_set = [dbus.Boolean(True)]
1730
class TestDisableCmd(TestPropertyCmd):
1734
class TestDisableCmd(TestPropertySetterCmd):
1731
1735
command = command.Disable
1732
1736
propname = "Enabled"
1733
1737
values_to_set = [dbus.Boolean(False)]
1736
class TestBumpTimeoutCmd(TestPropertyCmd):
1740
class TestBumpTimeoutCmd(TestPropertySetterCmd):
1737
1741
command = command.BumpTimeout
1738
1742
propname = "LastCheckedOK"
1739
1743
values_to_set = [""]
1742
class TestStartCheckerCmd(TestPropertyCmd):
1746
class TestStartCheckerCmd(TestPropertySetterCmd):
1743
1747
command = command.StartChecker
1744
1748
propname = "CheckerRunning"
1745
1749
values_to_set = [dbus.Boolean(True)]
1748
class TestStopCheckerCmd(TestPropertyCmd):
1752
class TestStopCheckerCmd(TestPropertySetterCmd):
1749
1753
command = command.StopChecker
1750
1754
propname = "CheckerRunning"
1751
1755
values_to_set = [dbus.Boolean(False)]
1754
class TestApproveByDefaultCmd(TestPropertyCmd):
1758
class TestApproveByDefaultCmd(TestPropertySetterCmd):
1755
1759
command = command.ApproveByDefault
1756
1760
propname = "ApprovedByDefault"
1757
1761
values_to_set = [dbus.Boolean(True)]
1760
class TestDenyByDefaultCmd(TestPropertyCmd):
1764
class TestDenyByDefaultCmd(TestPropertySetterCmd):
1761
1765
command = command.DenyByDefault
1762
1766
propname = "ApprovedByDefault"
1763
1767
values_to_set = [dbus.Boolean(False)]
1766
class TestPropertyValueCmd(TestPropertyCmd):
1767
"""Abstract class for tests of PropertyValueCmd classes"""
1770
class TestPropertySetterValueCmd(TestPropertySetterCmd):
1771
"""Abstract class for tests of PropertySetterValueCmd classes"""
1769
1773
def runTest(self):
1770
if type(self) is TestPropertyValueCmd:
1774
if type(self) is TestPropertySetterValueCmd:
1772
return super(TestPropertyValueCmd, self).runTest()
1776
return super(TestPropertySetterValueCmd, self).runTest()
1774
1778
def run_command(self, value, clients):
1775
1779
self.command(value).run(clients, self.bus)
1778
class TestSetCheckerCmd(TestPropertyValueCmd):
1782
class TestSetCheckerCmd(TestPropertySetterValueCmd):
1779
1783
command = command.SetChecker
1780
1784
propname = "Checker"
1781
1785
values_to_set = ["", ":", "fping -q -- %s"]
1784
class TestSetHostCmd(TestPropertyValueCmd):
1788
class TestSetHostCmd(TestPropertySetterValueCmd):
1785
1789
command = command.SetHost
1786
1790
propname = "Host"
1787
values_to_set = ["192.0.2.3", "foo.example.org"]
1790
class TestSetSecretCmd(TestPropertyValueCmd):
1791
values_to_set = ["192.0.2.3", "client.example.org"]
1794
class TestSetSecretCmd(TestPropertySetterValueCmd):
1791
1795
command = command.SetSecret
1792
1796
propname = "Secret"
1793
1797
values_to_set = [io.BytesIO(b""),
1794
1798
io.BytesIO(b"secret\0xyzzy\nbar")]
1795
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1798
class TestSetTimeoutCmd(TestPropertyValueCmd):
1799
values_to_get = [f.getvalue() for f in values_to_set]
1802
class TestSetTimeoutCmd(TestPropertySetterValueCmd):
1799
1803
command = command.SetTimeout
1800
1804
propname = "Timeout"
1801
1805
values_to_set = [datetime.timedelta(),