=== modified file 'mandos-ctl' --- mandos-ctl 2019-03-17 21:29:32 +0000 +++ mandos-ctl 2019-03-18 22:29:25 +0000 @@ -983,7 +983,7 @@ options = self.parser.parse_args() setattr(options, action, value) options.verbose = True - options.client = ["foo"] + options.client = ["client"] with self.assertParseError(): self.check_option_syntax(options) @@ -1019,7 +1019,7 @@ for action, value in self.actions.items(): options = self.parser.parse_args() setattr(options, action, value) - options.client = ["foo"] + options.client = ["client"] self.check_option_syntax(options) def test_one_client_with_all_actions_except_is_enabled(self): @@ -1028,7 +1028,7 @@ if action == "is_enabled": continue setattr(options, action, value) - options.client = ["foo"] + options.client = ["client"] self.check_option_syntax(options) def test_two_clients_with_all_actions_except_is_enabled(self): @@ -1037,7 +1037,7 @@ if action == "is_enabled": continue setattr(options, action, value) - options.client = ["foo", "barbar"] + options.client = ["client1", "client2"] self.check_option_syntax(options) def test_two_clients_are_ok_with_actions_except_is_enabled(self): @@ -1046,7 +1046,7 @@ continue options = self.parser.parse_args() setattr(options, action, value) - options.client = ["foo", "barbar"] + options.client = ["client1", "client2"] self.check_option_syntax(options) def test_is_enabled_fails_without_client(self): @@ -1058,7 +1058,7 @@ def test_is_enabled_fails_with_two_clients(self): options = self.parser.parse_args() options.is_enabled = True - options.client = ["foo", "barbar"] + options.client = ["client1", "client2"] with self.assertParseError(): self.check_option_syntax(options) @@ -1091,13 +1091,13 @@ self.assertTrue(mockbus.called) def test_logs_and_exits_on_dbus_error(self): - class MockBusFailing(object): + class FailingBusStub(object): def get_object(self, busname, dbus_path): raise dbus.exceptions.DBusException("Test") with self.assertLogs(log, logging.CRITICAL): with self.assertRaises(SystemExit) as e: - bus = get_mandos_dbus_object(bus=MockBusFailing()) + bus = get_mandos_dbus_object(bus=FailingBusStub()) if isinstance(e.exception.code, int): self.assertNotEqual(0, e.exception.code) @@ -1107,17 +1107,17 @@ class Test_get_managed_objects(TestCaseWithAssertLogs): def test_calls_and_returns_GetManagedObjects(self): - managed_objects = {"/clients/foo": { "Name": "foo"}} - class MockObjectManager(object): + managed_objects = {"/clients/client": { "Name": "client"}} + class ObjectManagerStub(object): def GetManagedObjects(self): return managed_objects - retval = get_managed_objects(MockObjectManager()) + retval = get_managed_objects(ObjectManagerStub()) self.assertDictEqual(managed_objects, retval) def test_logs_and_exits_on_dbus_error(self): dbus_logger = logging.getLogger("dbus.proxies") - class MockObjectManagerFailing(object): + class ObjectManagerFailingStub(object): def GetManagedObjects(self): dbus_logger.error("Test") raise dbus.exceptions.DBusException("Test") @@ -1134,7 +1134,7 @@ try: with self.assertLogs(log, logging.CRITICAL) as watcher: with self.assertRaises(SystemExit) as e: - get_managed_objects(MockObjectManagerFailing()) + get_managed_objects(ObjectManagerFailingStub()) finally: dbus_logger.removeFilter(counting_handler) @@ -1157,7 +1157,7 @@ add_command_line_options(self.parser) def test_is_enabled(self): - self.assert_command_from_args(["--is-enabled", "foo"], + self.assert_command_from_args(["--is-enabled", "client"], command.IsEnabled) def assert_command_from_args(self, args, command_cls, @@ -1174,29 +1174,31 @@ self.assertEqual(value, getattr(command, key)) def test_is_enabled_short(self): - self.assert_command_from_args(["-V", "foo"], + self.assert_command_from_args(["-V", "client"], command.IsEnabled) def test_approve(self): - self.assert_command_from_args(["--approve", "foo"], + self.assert_command_from_args(["--approve", "client"], command.Approve) def test_approve_short(self): - self.assert_command_from_args(["-A", "foo"], command.Approve) + self.assert_command_from_args(["-A", "client"], + command.Approve) def test_deny(self): - self.assert_command_from_args(["--deny", "foo"], command.Deny) + self.assert_command_from_args(["--deny", "client"], + command.Deny) def test_deny_short(self): - self.assert_command_from_args(["-D", "foo"], command.Deny) + self.assert_command_from_args(["-D", "client"], command.Deny) def test_remove(self): - self.assert_command_from_args(["--remove", "foo"], + self.assert_command_from_args(["--remove", "client"], command.Remove) def test_deny_before_remove(self): options = self.parser.parse_args(["--deny", "--remove", - "foo"]) + "client"]) check_option_syntax(self.parser, options) commands = commands_from_options(options) self.assertEqual(2, len(commands)) @@ -1213,78 +1215,82 @@ self.assertIsInstance(commands[1], command.Remove) def test_remove_short(self): - self.assert_command_from_args(["-r", "foo"], command.Remove) + self.assert_command_from_args(["-r", "client"], + command.Remove) def test_dump_json(self): self.assert_command_from_args(["--dump-json"], command.DumpJSON) def test_enable(self): - self.assert_command_from_args(["--enable", "foo"], + self.assert_command_from_args(["--enable", "client"], command.Enable) def test_enable_short(self): - self.assert_command_from_args(["-e", "foo"], command.Enable) + self.assert_command_from_args(["-e", "client"], + command.Enable) def test_disable(self): - self.assert_command_from_args(["--disable", "foo"], + self.assert_command_from_args(["--disable", "client"], command.Disable) def test_disable_short(self): - self.assert_command_from_args(["-d", "foo"], command.Disable) + self.assert_command_from_args(["-d", "client"], + command.Disable) def test_bump_timeout(self): - self.assert_command_from_args(["--bump-timeout", "foo"], + self.assert_command_from_args(["--bump-timeout", "client"], command.BumpTimeout) def test_bump_timeout_short(self): - self.assert_command_from_args(["-b", "foo"], + self.assert_command_from_args(["-b", "client"], command.BumpTimeout) def test_start_checker(self): - self.assert_command_from_args(["--start-checker", "foo"], + self.assert_command_from_args(["--start-checker", "client"], command.StartChecker) def test_stop_checker(self): - self.assert_command_from_args(["--stop-checker", "foo"], + self.assert_command_from_args(["--stop-checker", "client"], command.StopChecker) def test_approve_by_default(self): - self.assert_command_from_args(["--approve-by-default", "foo"], + self.assert_command_from_args(["--approve-by-default", + "client"], command.ApproveByDefault) def test_deny_by_default(self): - self.assert_command_from_args(["--deny-by-default", "foo"], + self.assert_command_from_args(["--deny-by-default", "client"], command.DenyByDefault) def test_checker(self): - self.assert_command_from_args(["--checker", ":", "foo"], + self.assert_command_from_args(["--checker", ":", "client"], command.SetChecker, value_to_set=":") def test_checker_empty(self): - self.assert_command_from_args(["--checker", "", "foo"], + self.assert_command_from_args(["--checker", "", "client"], command.SetChecker, value_to_set="") def test_checker_short(self): - self.assert_command_from_args(["-c", ":", "foo"], + self.assert_command_from_args(["-c", ":", "client"], command.SetChecker, value_to_set=":") def test_host(self): - self.assert_command_from_args(["--host", "foo.example.org", - "foo"], command.SetHost, - value_to_set="foo.example.org") + self.assert_command_from_args( + ["--host", "client.example.org", "client"], + command.SetHost, value_to_set="client.example.org") def test_host_short(self): - self.assert_command_from_args(["-H", "foo.example.org", - "foo"], command.SetHost, - value_to_set="foo.example.org") + self.assert_command_from_args( + ["-H", "client.example.org", "client"], command.SetHost, + value_to_set="client.example.org") def test_secret_devnull(self): self.assert_command_from_args(["--secret", os.path.devnull, - "foo"], command.SetSecret, + "client"], command.SetSecret, value_to_set=b"") def test_secret_tempfile(self): @@ -1293,12 +1299,13 @@ f.write(value) f.seek(0) self.assert_command_from_args(["--secret", f.name, - "foo"], command.SetSecret, + "client"], + command.SetSecret, value_to_set=value) def test_secret_devnull_short(self): - self.assert_command_from_args(["-s", os.path.devnull, "foo"], - command.SetSecret, + self.assert_command_from_args(["-s", os.path.devnull, + "client"], command.SetSecret, value_to_set=b"") def test_secret_tempfile_short(self): @@ -1306,45 +1313,45 @@ value = b"secret\0xyzzy\nbar" f.write(value) f.seek(0) - self.assert_command_from_args(["-s", f.name, "foo"], + self.assert_command_from_args(["-s", f.name, "client"], command.SetSecret, value_to_set=value) def test_timeout(self): - self.assert_command_from_args(["--timeout", "PT5M", "foo"], + self.assert_command_from_args(["--timeout", "PT5M", "client"], command.SetTimeout, value_to_set=300000) def test_timeout_short(self): - self.assert_command_from_args(["-t", "PT5M", "foo"], + self.assert_command_from_args(["-t", "PT5M", "client"], command.SetTimeout, value_to_set=300000) def test_extended_timeout(self): self.assert_command_from_args(["--extended-timeout", "PT15M", - "foo"], + "client"], command.SetExtendedTimeout, value_to_set=900000) def test_interval(self): - self.assert_command_from_args(["--interval", "PT2M", "foo"], - command.SetInterval, + self.assert_command_from_args(["--interval", "PT2M", + "client"], command.SetInterval, value_to_set=120000) def test_interval_short(self): - self.assert_command_from_args(["-i", "PT2M", "foo"], + self.assert_command_from_args(["-i", "PT2M", "client"], command.SetInterval, value_to_set=120000) def test_approval_delay(self): self.assert_command_from_args(["--approval-delay", "PT30S", - "foo"], + "client"], command.SetApprovalDelay, value_to_set=30000) def test_approval_duration(self): self.assert_command_from_args(["--approval-duration", "PT1S", - "foo"], + "client"], command.SetApprovalDuration, value_to_set=1000) @@ -1433,24 +1440,27 @@ LastCheckerStatus=-2) self.clients = collections.OrderedDict( [ - ("/clients/foo", self.client.attributes), - ("/clients/barbar", self.other_client.attributes), + (self.client.__dbus_object_path__, + self.client.attributes), + (self.other_client.__dbus_object_path__, + self.other_client.attributes), ]) - self.one_client = {"/clients/foo": self.client.attributes} + self.one_client = {self.client.__dbus_object_path__: + self.client.attributes} @property def bus(self): - class Bus(object): + class MockBus(object): @staticmethod def get_object(client_bus_name, path): self.assertEqual(dbus_busname, client_bus_name) - return { - # Note: "self" here is the TestCmd instance, not - # the Bus instance, since this is a static method! - "/clients/foo": self.client, - "/clients/barbar": self.other_client, - }[path] - return Bus() + # Note: "self" here is the TestCmd instance, not the + # MockBus instance, since this is a static method! + if path == self.client.__dbus_object_path__: + return self.client + elif path == self.other_client.__dbus_object_path__: + return self.other_client + return MockBus() class TestBaseCommands(TestCommand): @@ -1487,12 +1497,12 @@ client.calls) def test_Remove(self): - class MockMandos(object): + class MandosSpy(object): def __init__(self): self.calls = [] def RemoveClient(self, dbus_path): self.calls.append(("RemoveClient", (dbus_path,))) - mandos = MockMandos() + mandos = MandosSpy() command.Remove().run(self.clients, self.bus, mandos) for clientpath in self.clients: self.assertIn(("RemoveClient", (clientpath,)), @@ -1780,7 +1790,7 @@ class TestSetHostCmd(TestPropertyValueCmd): command = command.SetHost propname = "Host" - values_to_set = ["192.0.2.3", "foo.example.org"] + values_to_set = ["192.0.2.3", "client.example.org"] class TestSetSecretCmd(TestPropertyValueCmd): @@ -1788,7 +1798,7 @@ propname = "Secret" values_to_set = [io.BytesIO(b""), io.BytesIO(b"secret\0xyzzy\nbar")] - values_to_get = [b"", b"secret\0xyzzy\nbar"] + values_to_get = [f.getvalue() for f in values_to_set] class TestSetTimeoutCmd(TestPropertyValueCmd): @@ -1799,7 +1809,7 @@ datetime.timedelta(seconds=1), datetime.timedelta(weeks=1), datetime.timedelta(weeks=52)] - values_to_get = [0, 300000, 1000, 604800000, 31449600000] + values_to_get = [dt.total_seconds()*1000 for dt in values_to_set] class TestSetExtendedTimeoutCmd(TestPropertyValueCmd): @@ -1810,7 +1820,7 @@ datetime.timedelta(seconds=1), datetime.timedelta(weeks=1), datetime.timedelta(weeks=52)] - values_to_get = [0, 300000, 1000, 604800000, 31449600000] + values_to_get = [dt.total_seconds()*1000 for dt in values_to_set] class TestSetIntervalCmd(TestPropertyValueCmd): @@ -1821,7 +1831,7 @@ datetime.timedelta(seconds=1), datetime.timedelta(weeks=1), datetime.timedelta(weeks=52)] - values_to_get = [0, 300000, 1000, 604800000, 31449600000] + values_to_get = [dt.total_seconds()*1000 for dt in values_to_set] class TestSetApprovalDelayCmd(TestPropertyValueCmd): @@ -1832,7 +1842,7 @@ datetime.timedelta(seconds=1), datetime.timedelta(weeks=1), datetime.timedelta(weeks=52)] - values_to_get = [0, 300000, 1000, 604800000, 31449600000] + values_to_get = [dt.total_seconds()*1000 for dt in values_to_set] class TestSetApprovalDurationCmd(TestPropertyValueCmd): @@ -1843,7 +1853,7 @@ datetime.timedelta(seconds=1), datetime.timedelta(weeks=1), datetime.timedelta(weeks=52)] - values_to_get = [0, 300000, 1000, 604800000, 31449600000] + values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]