/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-17 18:44:44 UTC
  • Revision ID: teddy@recompile.se-20190317184444-5o7vougp7c3p87ri
mandos-ctl: Refactor tests

* mandos-ctl (TestBaseCommands.test_DumpJSON_normal): Don't run the
  .output method on command; instead, use capture_stdout_to_buffer().
  (TestBaseCommands.test_DumpJSON_one_client): - '' -
  (TestBaseCommands.test_PrintTable_normal): - '' -
  (TestBaseCommands.test_PrintTable_verbose): - '' -
  (TestBaseCommands.test_PrintTable_one_client): - '' -
  (TestBaseCommands.capture_stdout_to_buffer): New.

Show diffs side-by-side

added added

removed removed

Lines of Context:
891
891
    # tests, which is run by doctest.
892
892
 
893
893
    def test_rfc3339_zero_seconds(self):
894
 
        self.assertEqual(string_to_delta("PT0S"),
895
 
                         datetime.timedelta())
 
894
        self.assertEqual(datetime.timedelta(),
 
895
                         string_to_delta("PT0S"))
896
896
 
897
897
    def test_rfc3339_zero_days(self):
898
 
        self.assertEqual(string_to_delta("P0D"),
899
 
                         datetime.timedelta())
 
898
        self.assertEqual(datetime.timedelta(), string_to_delta("P0D"))
900
899
 
901
900
    def test_rfc3339_one_second(self):
902
 
        self.assertEqual(string_to_delta("PT1S"),
903
 
                         datetime.timedelta(0, 1))
 
901
        self.assertEqual(datetime.timedelta(0, 1),
 
902
                         string_to_delta("PT1S"))
904
903
 
905
904
    def test_rfc3339_two_hours(self):
906
 
        self.assertEqual(string_to_delta("PT2H"),
907
 
                         datetime.timedelta(0, 7200))
 
905
        self.assertEqual(datetime.timedelta(0, 7200),
 
906
                         string_to_delta("PT2H"))
908
907
 
909
908
    def test_falls_back_to_pre_1_6_1_with_warning(self):
910
909
        with self.assertLogs(log, logging.WARNING):
911
910
            value = string_to_delta("2h")
912
 
        self.assertEqual(value, datetime.timedelta(0, 7200))
 
911
        self.assertEqual(datetime.timedelta(0, 7200), value)
913
912
 
914
913
 
915
914
class Test_check_option_syntax(unittest.TestCase):
958
957
        # Exit code from argparse is guaranteed to be "2".  Reference:
959
958
        # https://docs.python.org/3/library
960
959
        # /argparse.html#exiting-methods
961
 
        self.assertEqual(e.exception.code, 2)
 
960
        self.assertEqual(2, e.exception.code)
962
961
 
963
962
    @staticmethod
964
963
    @contextlib.contextmanager
965
964
    def redirect_stderr_to_devnull():
966
 
        null = os.open(os.path.devnull, os.O_RDWR)
967
 
        stderrcopy = os.dup(sys.stderr.fileno())
968
 
        os.dup2(null, sys.stderr.fileno())
969
 
        os.close(null)
970
 
        try:
971
 
            yield
972
 
        finally:
973
 
            # restore stderr
974
 
            os.dup2(stderrcopy, sys.stderr.fileno())
975
 
            os.close(stderrcopy)
 
965
        old_stderr = sys.stderr
 
966
        with contextlib.closing(open(os.devnull, "w")) as null:
 
967
            sys.stderr = null
 
968
            try:
 
969
                yield
 
970
            finally:
 
971
                sys.stderr = old_stderr
976
972
 
977
973
    def check_option_syntax(self, options):
978
974
        check_option_syntax(self.parser, options)
1089
1085
            def get_object(mockbus_self, busname, dbus_path):
1090
1086
                # Note that "self" is still the testcase instance,
1091
1087
                # this MockBus instance is in "mockbus_self".
1092
 
                self.assertEqual(busname, dbus_busname)
1093
 
                self.assertEqual(dbus_path, server_dbus_path)
 
1088
                self.assertEqual(dbus_busname, busname)
 
1089
                self.assertEqual(server_dbus_path, dbus_path)
1094
1090
                mockbus_self.called = True
1095
1091
                return mockbus_self
1096
1092
 
1108
1104
                bus = get_mandos_dbus_object(bus=MockBusFailing())
1109
1105
 
1110
1106
        if isinstance(e.exception.code, int):
1111
 
            self.assertNotEqual(e.exception.code, 0)
 
1107
            self.assertNotEqual(0, e.exception.code)
1112
1108
        else:
1113
1109
            self.assertIsNotNone(e.exception.code)
1114
1110
 
1147
1143
            dbus_logger.removeFilter(counting_handler)
1148
1144
 
1149
1145
        # Make sure the dbus logger was suppressed
1150
 
        self.assertEqual(counting_handler.count, 0)
 
1146
        self.assertEqual(0, counting_handler.count)
1151
1147
 
1152
1148
        # Test that the dbus_logger still works
1153
1149
        with self.assertLogs(dbus_logger, logging.ERROR):
1154
1150
            dbus_logger.error("Test")
1155
1151
 
1156
1152
        if isinstance(e.exception.code, int):
1157
 
            self.assertNotEqual(e.exception.code, 0)
 
1153
            self.assertNotEqual(0, e.exception.code)
1158
1154
        else:
1159
1155
            self.assertIsNotNone(e.exception.code)
1160
1156
 
1175
1171
        options = self.parser.parse_args(args)
1176
1172
        check_option_syntax(self.parser, options)
1177
1173
        commands = commands_from_options(options)
1178
 
        self.assertEqual(len(commands), 1)
 
1174
        self.assertEqual(1, len(commands))
1179
1175
        command = commands[0]
1180
1176
        self.assertIsInstance(command, command_cls)
1181
1177
        for key, value in cmd_attrs.items():
1182
 
            self.assertEqual(getattr(command, key), value)
 
1178
            self.assertEqual(value, getattr(command, key))
1183
1179
 
1184
1180
    def test_is_enabled_short(self):
1185
1181
        self.assert_command_from_args(["-V", "foo"],
1207
1203
                                          "foo"])
1208
1204
        check_option_syntax(self.parser, options)
1209
1205
        commands = commands_from_options(options)
1210
 
        self.assertEqual(len(commands), 2)
 
1206
        self.assertEqual(2, len(commands))
1211
1207
        self.assertIsInstance(commands[0], command.Deny)
1212
1208
        self.assertIsInstance(commands[1], command.Remove)
1213
1209
 
1216
1212
                                          "--all"])
1217
1213
        check_option_syntax(self.parser, options)
1218
1214
        commands = commands_from_options(options)
1219
 
        self.assertEqual(len(commands), 2)
 
1215
        self.assertEqual(2, len(commands))
1220
1216
        self.assertIsInstance(commands[0], command.Deny)
1221
1217
        self.assertIsInstance(commands[1], command.Remove)
1222
1218
 
1382
1378
                self.attributes["Name"] = name
1383
1379
                self.calls = []
1384
1380
            def Set(self, interface, propname, value, dbus_interface):
1385
 
                testcase.assertEqual(interface, client_dbus_interface)
1386
 
                testcase.assertEqual(dbus_interface,
1387
 
                                     dbus.PROPERTIES_IFACE)
 
1381
                testcase.assertEqual(client_dbus_interface, interface)
 
1382
                testcase.assertEqual(dbus.PROPERTIES_IFACE,
 
1383
                                     dbus_interface)
1388
1384
                self.attributes[propname] = value
1389
1385
            def Approve(self, approve, dbus_interface):
1390
 
                testcase.assertEqual(dbus_interface,
1391
 
                                     client_dbus_interface)
 
1386
                testcase.assertEqual(client_dbus_interface,
 
1387
                                     dbus_interface)
1392
1388
                self.calls.append(("Approve", (approve,
1393
1389
                                               dbus_interface)))
1394
1390
        self.client = MockClient(
1451
1447
        class Bus(object):
1452
1448
            @staticmethod
1453
1449
            def get_object(client_bus_name, path):
1454
 
                self.assertEqual(client_bus_name, dbus_busname)
 
1450
                self.assertEqual(dbus_busname, client_bus_name)
1455
1451
                return {
1456
1452
                    # Note: "self" here is the TestCmd instance, not
1457
1453
                    # the Bus instance, since this is a static method!
1463
1459
 
1464
1460
class TestBaseCommands(TestCommand):
1465
1461
 
1466
 
    def test_IsEnabled(self):
1467
 
        self.assertTrue(all(command.IsEnabled().is_enabled(client,
1468
 
                                                      properties)
1469
 
                            for client, properties
1470
 
                            in self.clients.items()))
1471
 
 
1472
 
    def test_IsEnabled_run_exits_successfully(self):
 
1462
    def test_IsEnabled_exits_successfully(self):
1473
1463
        with self.assertRaises(SystemExit) as e:
1474
1464
            command.IsEnabled().run(self.one_client)
1475
1465
        if e.exception.code is not None:
1476
 
            self.assertEqual(e.exception.code, 0)
 
1466
            self.assertEqual(0, e.exception.code)
1477
1467
        else:
1478
1468
            self.assertIsNone(e.exception.code)
1479
1469
 
1480
 
    def test_IsEnabled_run_exits_with_failure(self):
 
1470
    def test_IsEnabled_exits_with_failure(self):
1481
1471
        self.client.attributes["Enabled"] = dbus.Boolean(False)
1482
1472
        with self.assertRaises(SystemExit) as e:
1483
1473
            command.IsEnabled().run(self.one_client)
1484
1474
        if isinstance(e.exception.code, int):
1485
 
            self.assertNotEqual(e.exception.code, 0)
 
1475
            self.assertNotEqual(0, e.exception.code)
1486
1476
        else:
1487
1477
            self.assertIsNotNone(e.exception.code)
1488
1478
 
1564
1554
    }
1565
1555
 
1566
1556
    def test_DumpJSON_normal(self):
1567
 
        output = command.DumpJSON().output(self.clients.values())
1568
 
        json_data = json.loads(output)
1569
 
        self.assertDictEqual(json_data, self.expected_json)
 
1557
        with self.capture_stdout_to_buffer() as buffer:
 
1558
            command.DumpJSON().run(self.clients)
 
1559
        json_data = json.loads(buffer.getvalue())
 
1560
        self.assertDictEqual(self.expected_json, json_data)
 
1561
 
 
1562
    @staticmethod
 
1563
    @contextlib.contextmanager
 
1564
    def capture_stdout_to_buffer():
 
1565
        capture_buffer = io.StringIO()
 
1566
        old_stdout = sys.stdout
 
1567
        sys.stdout = capture_buffer
 
1568
        try:
 
1569
            yield capture_buffer
 
1570
        finally:
 
1571
            sys.stdout = old_stdout
1570
1572
 
1571
1573
    def test_DumpJSON_one_client(self):
1572
 
        output = command.DumpJSON().output(self.one_client.values())
1573
 
        json_data = json.loads(output)
 
1574
        with self.capture_stdout_to_buffer() as buffer:
 
1575
            command.DumpJSON().run(self.one_client)
 
1576
        json_data = json.loads(buffer.getvalue())
1574
1577
        expected_json = {"foo": self.expected_json["foo"]}
1575
 
        self.assertDictEqual(json_data, expected_json)
 
1578
        self.assertDictEqual(expected_json, json_data)
1576
1579
 
1577
1580
    def test_PrintTable_normal(self):
1578
 
        output = command.PrintTable().output(self.clients.values())
 
1581
        with self.capture_stdout_to_buffer() as buffer:
 
1582
            command.PrintTable().run(self.clients)
1579
1583
        expected_output = "\n".join((
1580
1584
            "Name   Enabled Timeout  Last Successful Check",
1581
1585
            "foo    Yes     00:05:00 2019-02-03T00:00:00  ",
1582
1586
            "barbar Yes     00:05:00 2019-02-04T00:00:00  ",
1583
 
        ))
1584
 
        self.assertEqual(output, expected_output)
 
1587
        )) + "\n"
 
1588
        self.assertEqual(expected_output, buffer.getvalue())
1585
1589
 
1586
1590
    def test_PrintTable_verbose(self):
1587
 
        output = command.PrintTable(verbose=True).output(
1588
 
            self.clients.values())
 
1591
        with self.capture_stdout_to_buffer() as buffer:
 
1592
            command.PrintTable(verbose=True).run(self.clients)
1589
1593
        columns = (
1590
1594
            (
1591
1595
                "Name   ",
1673
1677
            )
1674
1678
        )
1675
1679
        num_lines = max(len(rows) for rows in columns)
1676
 
        expected_output = "\n".join("".join(rows[line]
1677
 
                                            for rows in columns)
1678
 
                                    for line in range(num_lines))
1679
 
        self.assertEqual(output, expected_output)
 
1680
        expected_output = ("\n".join("".join(rows[line]
 
1681
                                             for rows in columns)
 
1682
                                     for line in range(num_lines))
 
1683
                           + "\n")
 
1684
        self.assertEqual(expected_output, buffer.getvalue())
1680
1685
 
1681
1686
    def test_PrintTable_one_client(self):
1682
 
        output = command.PrintTable().output(self.one_client.values())
 
1687
        with self.capture_stdout_to_buffer() as buffer:
 
1688
            command.PrintTable().run(self.one_client)
1683
1689
        expected_output = "\n".join((
1684
1690
            "Name Enabled Timeout  Last Successful Check",
1685
1691
            "foo  Yes     00:05:00 2019-02-03T00:00:00  ",
1686
 
        ))
1687
 
        self.assertEqual(output, expected_output)
 
1692
        )) + "\n"
 
1693
        self.assertEqual(expected_output, buffer.getvalue())
1688
1694
 
1689
1695
 
1690
1696
class TestPropertyCmd(TestCommand):
1705
1711
                client = self.bus.get_object(dbus_busname, clientpath)
1706
1712
                value = client.attributes[self.propname]
1707
1713
                self.assertNotIsInstance(value, self.Unique)
1708
 
                self.assertEqual(value, value_to_get)
 
1714
                self.assertEqual(value_to_get, value)
1709
1715
 
1710
1716
    class Unique(object):
1711
1717
        """Class for objects which exist only to be unique objects,