594
596
class Remove(Base):
595
def run(self, clients, bus, mandos):
596
for clientpath in clients.keys():
597
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)",
598
dbus_busname, server_dbus_path,
599
server_dbus_interface, clientpath)
600
mandos.RemoveClient(clientpath)
597
def run_on_one_client(self, client, properties):
598
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)",
599
dbus_busname, server_dbus_path,
600
server_dbus_interface,
601
str(client.__dbus_object_path__))
602
self.mandos.RemoveClient(client.__dbus_object_path__)
603
605
class Output(Base):
611
613
"Checker", "ExtendedTimeout", "Expires",
612
614
"LastCheckerStatus")
616
def run(self, clients, bus=None, mandos=None):
617
print(self.output(clients.values()))
619
def output(self, clients):
620
raise NotImplementedError()
615
623
class DumpJSON(Output):
616
def run(self, clients, bus=None, mandos=None):
624
def output(self, clients):
617
625
data = {client["Name"]:
618
626
{key: self.dbus_boolean_to_bool(client[key])
619
627
for key in self.all_keywords}
620
for client in clients.values()}
621
print(json.dumps(data, indent=4, separators=(',', ': ')))
628
for client in clients}
629
return json.dumps(data, indent=4, separators=(',', ': '))
624
632
def dbus_boolean_to_bool(value):
745
753
raise NotImplementedError()
748
class Enable(PropertySetter):
756
class Enable(Property):
749
757
propname = "Enabled"
750
758
value_to_set = dbus.Boolean(True)
753
class Disable(PropertySetter):
761
class Disable(Property):
754
762
propname = "Enabled"
755
763
value_to_set = dbus.Boolean(False)
758
class BumpTimeout(PropertySetter):
766
class BumpTimeout(Property):
759
767
propname = "LastCheckedOK"
760
768
value_to_set = ""
763
class StartChecker(PropertySetter):
764
propname = "CheckerRunning"
765
value_to_set = dbus.Boolean(True)
768
class StopChecker(PropertySetter):
769
propname = "CheckerRunning"
770
value_to_set = dbus.Boolean(False)
773
class ApproveByDefault(PropertySetter):
774
propname = "ApprovedByDefault"
775
value_to_set = dbus.Boolean(True)
778
class DenyByDefault(PropertySetter):
779
propname = "ApprovedByDefault"
780
value_to_set = dbus.Boolean(False)
783
class PropertySetterValue(PropertySetter):
784
"""Abstract class for PropertySetter recieving a value as
785
constructor argument instead of a class attribute."""
771
class StartChecker(Property):
772
propname = "CheckerRunning"
773
value_to_set = dbus.Boolean(True)
776
class StopChecker(Property):
777
propname = "CheckerRunning"
778
value_to_set = dbus.Boolean(False)
781
class ApproveByDefault(Property):
782
propname = "ApprovedByDefault"
783
value_to_set = dbus.Boolean(True)
786
class DenyByDefault(Property):
787
propname = "ApprovedByDefault"
788
value_to_set = dbus.Boolean(False)
791
class PropertyValue(Property):
792
"Abstract class for Property recieving a value as argument"
786
793
def __init__(self, value):
787
794
self.value_to_set = value
790
class SetChecker(PropertySetterValue):
797
class SetChecker(PropertyValue):
791
798
propname = "Checker"
794
class SetHost(PropertySetterValue):
801
class SetHost(PropertyValue):
795
802
propname = "Host"
798
class SetSecret(PropertySetterValue):
805
class SetSecret(PropertyValue):
799
806
propname = "Secret"
1106
1115
class Test_get_managed_objects(TestCaseWithAssertLogs):
1107
1116
def test_calls_and_returns_GetManagedObjects(self):
1108
managed_objects = {"/clients/client": { "Name": "client"}}
1109
class ObjectManagerStub(object):
1117
managed_objects = {"/clients/foo": { "Name": "foo"}}
1118
class MockObjectManager(object):
1110
1119
def GetManagedObjects(self):
1111
1120
return managed_objects
1112
retval = get_managed_objects(ObjectManagerStub())
1121
retval = get_managed_objects(MockObjectManager())
1113
1122
self.assertDictEqual(managed_objects, retval)
1115
1124
def test_logs_and_exits_on_dbus_error(self):
1116
1125
dbus_logger = logging.getLogger("dbus.proxies")
1118
class ObjectManagerFailingStub(object):
1127
class MockObjectManagerFailing(object):
1119
1128
def GetManagedObjects(self):
1120
1129
dbus_logger.error("Test")
1121
1130
raise dbus.exceptions.DBusException("Test")
1172
1181
self.assertEqual(value, getattr(command, key))
1174
1183
def test_is_enabled_short(self):
1175
self.assert_command_from_args(["-V", "client"],
1184
self.assert_command_from_args(["-V", "foo"],
1176
1185
command.IsEnabled)
1178
1187
def test_approve(self):
1179
self.assert_command_from_args(["--approve", "client"],
1188
self.assert_command_from_args(["--approve", "foo"],
1180
1189
command.Approve)
1182
1191
def test_approve_short(self):
1183
self.assert_command_from_args(["-A", "client"],
1192
self.assert_command_from_args(["-A", "foo"], command.Approve)
1186
1194
def test_deny(self):
1187
self.assert_command_from_args(["--deny", "client"],
1195
self.assert_command_from_args(["--deny", "foo"], command.Deny)
1190
1197
def test_deny_short(self):
1191
self.assert_command_from_args(["-D", "client"], command.Deny)
1198
self.assert_command_from_args(["-D", "foo"], command.Deny)
1193
1200
def test_remove(self):
1194
self.assert_command_from_args(["--remove", "client"],
1201
self.assert_command_from_args(["--remove", "foo"],
1195
1202
command.Remove)
1197
1204
def test_deny_before_remove(self):
1198
1205
options = self.parser.parse_args(["--deny", "--remove",
1200
1207
check_option_syntax(self.parser, options)
1201
1208
commands = commands_from_options(options)
1202
1209
self.assertEqual(2, len(commands))
1213
1220
self.assertIsInstance(commands[1], command.Remove)
1215
1222
def test_remove_short(self):
1216
self.assert_command_from_args(["-r", "client"],
1223
self.assert_command_from_args(["-r", "foo"], command.Remove)
1219
1225
def test_dump_json(self):
1220
1226
self.assert_command_from_args(["--dump-json"],
1221
1227
command.DumpJSON)
1223
1229
def test_enable(self):
1224
self.assert_command_from_args(["--enable", "client"],
1230
self.assert_command_from_args(["--enable", "foo"],
1225
1231
command.Enable)
1227
1233
def test_enable_short(self):
1228
self.assert_command_from_args(["-e", "client"],
1234
self.assert_command_from_args(["-e", "foo"], command.Enable)
1231
1236
def test_disable(self):
1232
self.assert_command_from_args(["--disable", "client"],
1237
self.assert_command_from_args(["--disable", "foo"],
1233
1238
command.Disable)
1235
1240
def test_disable_short(self):
1236
self.assert_command_from_args(["-d", "client"],
1241
self.assert_command_from_args(["-d", "foo"], command.Disable)
1239
1243
def test_bump_timeout(self):
1240
self.assert_command_from_args(["--bump-timeout", "client"],
1244
self.assert_command_from_args(["--bump-timeout", "foo"],
1241
1245
command.BumpTimeout)
1243
1247
def test_bump_timeout_short(self):
1244
self.assert_command_from_args(["-b", "client"],
1248
self.assert_command_from_args(["-b", "foo"],
1245
1249
command.BumpTimeout)
1247
1251
def test_start_checker(self):
1248
self.assert_command_from_args(["--start-checker", "client"],
1252
self.assert_command_from_args(["--start-checker", "foo"],
1249
1253
command.StartChecker)
1251
1255
def test_stop_checker(self):
1252
self.assert_command_from_args(["--stop-checker", "client"],
1256
self.assert_command_from_args(["--stop-checker", "foo"],
1253
1257
command.StopChecker)
1255
1259
def test_approve_by_default(self):
1256
self.assert_command_from_args(["--approve-by-default",
1260
self.assert_command_from_args(["--approve-by-default", "foo"],
1258
1261
command.ApproveByDefault)
1260
1263
def test_deny_by_default(self):
1261
self.assert_command_from_args(["--deny-by-default", "client"],
1264
self.assert_command_from_args(["--deny-by-default", "foo"],
1262
1265
command.DenyByDefault)
1264
1267
def test_checker(self):
1265
self.assert_command_from_args(["--checker", ":", "client"],
1268
self.assert_command_from_args(["--checker", ":", "foo"],
1266
1269
command.SetChecker,
1267
1270
value_to_set=":")
1269
1272
def test_checker_empty(self):
1270
self.assert_command_from_args(["--checker", "", "client"],
1273
self.assert_command_from_args(["--checker", "", "foo"],
1271
1274
command.SetChecker,
1272
1275
value_to_set="")
1274
1277
def test_checker_short(self):
1275
self.assert_command_from_args(["-c", ":", "client"],
1278
self.assert_command_from_args(["-c", ":", "foo"],
1276
1279
command.SetChecker,
1277
1280
value_to_set=":")
1279
1282
def test_host(self):
1280
self.assert_command_from_args(
1281
["--host", "client.example.org", "client"],
1282
command.SetHost, value_to_set="client.example.org")
1283
self.assert_command_from_args(["--host", "foo.example.org",
1284
"foo"], command.SetHost,
1285
value_to_set="foo.example.org")
1284
1287
def test_host_short(self):
1285
self.assert_command_from_args(
1286
["-H", "client.example.org", "client"], command.SetHost,
1287
value_to_set="client.example.org")
1288
self.assert_command_from_args(["-H", "foo.example.org",
1289
"foo"], command.SetHost,
1290
value_to_set="foo.example.org")
1289
1292
def test_secret_devnull(self):
1290
1293
self.assert_command_from_args(["--secret", os.path.devnull,
1291
"client"], command.SetSecret,
1294
"foo"], command.SetSecret,
1292
1295
value_to_set=b"")
1294
1297
def test_secret_tempfile(self):
1311
1313
value = b"secret\0xyzzy\nbar"
1314
self.assert_command_from_args(["-s", f.name, "client"],
1316
self.assert_command_from_args(["-s", f.name, "foo"],
1315
1317
command.SetSecret,
1316
1318
value_to_set=value)
1318
1320
def test_timeout(self):
1319
self.assert_command_from_args(["--timeout", "PT5M", "client"],
1321
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1320
1322
command.SetTimeout,
1321
1323
value_to_set=300000)
1323
1325
def test_timeout_short(self):
1324
self.assert_command_from_args(["-t", "PT5M", "client"],
1326
self.assert_command_from_args(["-t", "PT5M", "foo"],
1325
1327
command.SetTimeout,
1326
1328
value_to_set=300000)
1328
1330
def test_extended_timeout(self):
1329
1331
self.assert_command_from_args(["--extended-timeout", "PT15M",
1331
1333
command.SetExtendedTimeout,
1332
1334
value_to_set=900000)
1334
1336
def test_interval(self):
1335
self.assert_command_from_args(["--interval", "PT2M",
1336
"client"], command.SetInterval,
1337
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1338
command.SetInterval,
1337
1339
value_to_set=120000)
1339
1341
def test_interval_short(self):
1340
self.assert_command_from_args(["-i", "PT2M", "client"],
1342
self.assert_command_from_args(["-i", "PT2M", "foo"],
1341
1343
command.SetInterval,
1342
1344
value_to_set=120000)
1344
1346
def test_approval_delay(self):
1345
1347
self.assert_command_from_args(["--approval-delay", "PT30S",
1347
1349
command.SetApprovalDelay,
1348
1350
value_to_set=30000)
1350
1352
def test_approval_duration(self):
1351
1353
self.assert_command_from_args(["--approval-duration", "PT1S",
1353
1355
command.SetApprovalDuration,
1354
1356
value_to_set=1000)
1438
1440
LastCheckerStatus=-2)
1439
1441
self.clients = collections.OrderedDict(
1441
(self.client.__dbus_object_path__,
1442
self.client.attributes),
1443
(self.other_client.__dbus_object_path__,
1444
self.other_client.attributes),
1443
("/clients/foo", self.client.attributes),
1444
("/clients/barbar", self.other_client.attributes),
1446
self.one_client = {self.client.__dbus_object_path__:
1447
self.client.attributes}
1446
self.one_client = {"/clients/foo": self.client.attributes}
1451
class MockBus(object):
1453
1452
def get_object(client_bus_name, path):
1454
1453
self.assertEqual(dbus_busname, client_bus_name)
1455
# Note: "self" here is the TestCmd instance, not the
1456
# MockBus instance, since this is a static method!
1457
if path == self.client.__dbus_object_path__:
1459
elif path == self.other_client.__dbus_object_path__:
1460
return self.other_client
1455
# Note: "self" here is the TestCmd instance, not
1456
# the Bus instance, since this is a static method!
1457
"/clients/foo": self.client,
1458
"/clients/barbar": self.other_client,
1464
1463
class TestBaseCommands(TestCommand):
1560
1559
def test_DumpJSON_normal(self):
1561
with self.capture_stdout_to_buffer() as buffer:
1562
command.DumpJSON().run(self.clients)
1563
json_data = json.loads(buffer.getvalue())
1560
output = command.DumpJSON().output(self.clients.values())
1561
json_data = json.loads(output)
1564
1562
self.assertDictEqual(self.expected_json, json_data)
1567
@contextlib.contextmanager
1568
def capture_stdout_to_buffer():
1569
capture_buffer = io.StringIO()
1570
old_stdout = sys.stdout
1571
sys.stdout = capture_buffer
1573
yield capture_buffer
1575
sys.stdout = old_stdout
1577
1564
def test_DumpJSON_one_client(self):
1578
with self.capture_stdout_to_buffer() as buffer:
1579
command.DumpJSON().run(self.one_client)
1580
json_data = json.loads(buffer.getvalue())
1565
output = command.DumpJSON().output(self.one_client.values())
1566
json_data = json.loads(output)
1581
1567
expected_json = {"foo": self.expected_json["foo"]}
1582
1568
self.assertDictEqual(expected_json, json_data)
1584
1570
def test_PrintTable_normal(self):
1585
with self.capture_stdout_to_buffer() as buffer:
1586
command.PrintTable().run(self.clients)
1571
output = command.PrintTable().output(self.clients.values())
1587
1572
expected_output = "\n".join((
1588
1573
"Name Enabled Timeout Last Successful Check",
1589
1574
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1590
1575
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1592
self.assertEqual(expected_output, buffer.getvalue())
1577
self.assertEqual(expected_output, output)
1594
1579
def test_PrintTable_verbose(self):
1595
with self.capture_stdout_to_buffer() as buffer:
1596
command.PrintTable(verbose=True).run(self.clients)
1580
output = command.PrintTable(verbose=True).output(
1581
self.clients.values())
1683
1668
num_lines = max(len(rows) for rows in columns)
1684
expected_output = ("\n".join("".join(rows[line]
1685
for rows in columns)
1686
for line in range(num_lines))
1688
self.assertEqual(expected_output, buffer.getvalue())
1669
expected_output = "\n".join("".join(rows[line]
1670
for rows in columns)
1671
for line in range(num_lines))
1672
self.assertEqual(expected_output, output)
1690
1674
def test_PrintTable_one_client(self):
1691
with self.capture_stdout_to_buffer() as buffer:
1692
command.PrintTable().run(self.one_client)
1675
output = command.PrintTable().output(self.one_client.values())
1693
1676
expected_output = "\n".join((
1694
1677
"Name Enabled Timeout Last Successful Check",
1695
1678
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1697
self.assertEqual(expected_output, buffer.getvalue())
1700
class TestPropertySetterCmd(TestCommand):
1701
"""Abstract class for tests of command.PropertySetter classes"""
1680
self.assertEqual(expected_output, output)
1683
class TestPropertyCmd(TestCommand):
1684
"""Abstract class for tests of command.Property classes"""
1702
1685
def runTest(self):
1703
1686
if not hasattr(self, "command"):
1725
1708
self.command().run(clients, self.bus)
1728
class TestEnableCmd(TestPropertySetterCmd):
1711
class TestEnableCmd(TestPropertyCmd):
1729
1712
command = command.Enable
1730
1713
propname = "Enabled"
1731
1714
values_to_set = [dbus.Boolean(True)]
1734
class TestDisableCmd(TestPropertySetterCmd):
1717
class TestDisableCmd(TestPropertyCmd):
1735
1718
command = command.Disable
1736
1719
propname = "Enabled"
1737
1720
values_to_set = [dbus.Boolean(False)]
1740
class TestBumpTimeoutCmd(TestPropertySetterCmd):
1723
class TestBumpTimeoutCmd(TestPropertyCmd):
1741
1724
command = command.BumpTimeout
1742
1725
propname = "LastCheckedOK"
1743
1726
values_to_set = [""]
1746
class TestStartCheckerCmd(TestPropertySetterCmd):
1729
class TestStartCheckerCmd(TestPropertyCmd):
1747
1730
command = command.StartChecker
1748
1731
propname = "CheckerRunning"
1749
1732
values_to_set = [dbus.Boolean(True)]
1752
class TestStopCheckerCmd(TestPropertySetterCmd):
1735
class TestStopCheckerCmd(TestPropertyCmd):
1753
1736
command = command.StopChecker
1754
1737
propname = "CheckerRunning"
1755
1738
values_to_set = [dbus.Boolean(False)]
1758
class TestApproveByDefaultCmd(TestPropertySetterCmd):
1741
class TestApproveByDefaultCmd(TestPropertyCmd):
1759
1742
command = command.ApproveByDefault
1760
1743
propname = "ApprovedByDefault"
1761
1744
values_to_set = [dbus.Boolean(True)]
1764
class TestDenyByDefaultCmd(TestPropertySetterCmd):
1747
class TestDenyByDefaultCmd(TestPropertyCmd):
1765
1748
command = command.DenyByDefault
1766
1749
propname = "ApprovedByDefault"
1767
1750
values_to_set = [dbus.Boolean(False)]
1770
class TestPropertySetterValueCmd(TestPropertySetterCmd):
1771
"""Abstract class for tests of PropertySetterValueCmd classes"""
1753
class TestPropertyValueCmd(TestPropertyCmd):
1754
"""Abstract class for tests of PropertyValueCmd classes"""
1773
1756
def runTest(self):
1774
if type(self) is TestPropertySetterValueCmd:
1757
if type(self) is TestPropertyValueCmd:
1776
return super(TestPropertySetterValueCmd, self).runTest()
1759
return super(TestPropertyValueCmd, self).runTest()
1778
1761
def run_command(self, value, clients):
1779
1762
self.command(value).run(clients, self.bus)
1782
class TestSetCheckerCmd(TestPropertySetterValueCmd):
1765
class TestSetCheckerCmd(TestPropertyValueCmd):
1783
1766
command = command.SetChecker
1784
1767
propname = "Checker"
1785
1768
values_to_set = ["", ":", "fping -q -- %s"]
1788
class TestSetHostCmd(TestPropertySetterValueCmd):
1771
class TestSetHostCmd(TestPropertyValueCmd):
1789
1772
command = command.SetHost
1790
1773
propname = "Host"
1791
values_to_set = ["192.0.2.3", "client.example.org"]
1794
class TestSetSecretCmd(TestPropertySetterValueCmd):
1774
values_to_set = ["192.0.2.3", "foo.example.org"]
1777
class TestSetSecretCmd(TestPropertyValueCmd):
1795
1778
command = command.SetSecret
1796
1779
propname = "Secret"
1797
1780
values_to_set = [io.BytesIO(b""),
1798
1781
io.BytesIO(b"secret\0xyzzy\nbar")]
1799
values_to_get = [f.getvalue() for f in values_to_set]
1802
class TestSetTimeoutCmd(TestPropertySetterValueCmd):
1782
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1785
class TestSetTimeoutCmd(TestPropertyValueCmd):
1803
1786
command = command.SetTimeout
1804
1787
propname = "Timeout"
1805
1788
values_to_set = [datetime.timedelta(),