/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-17 16:48:56 UTC
  • Revision ID: teddy@recompile.se-20190317164856-de11glvsfwhnj34w
mandos-ctl: Refactor tests

* mandos-ctl (Test_string_to_delta.test_handles_basic_rfc3339): Split
                                                  into multiple tests.
  (Test_string_to_delta.test_rfc3339_zero_seconds): New.
  (Test_string_to_delta.test_rfc3339_zero_days): - '' -
  (Test_string_to_delta.test_rfc3339_one_second): - '' -
  (Test_string_to_delta.test_rfc3339_two_hours): - '' -
  (TestCommand.setUp.MockClient.Get): Remove; unused.
  (TestBaseCommands.test_Remove): Remove unnecessary assertion.
  (TestPropertyCmd.runTest): - '' -

Show diffs side-by-side

added added

removed removed

Lines of Context:
891
891
    # tests, which is run by doctest.
892
892
 
893
893
    def test_rfc3339_zero_seconds(self):
894
 
        self.assertEqual(datetime.timedelta(),
895
 
                         string_to_delta("PT0S"))
 
894
        self.assertEqual(string_to_delta("PT0S"),
 
895
                         datetime.timedelta())
896
896
 
897
897
    def test_rfc3339_zero_days(self):
898
 
        self.assertEqual(datetime.timedelta(), string_to_delta("P0D"))
 
898
        self.assertEqual(string_to_delta("P0D"),
 
899
                         datetime.timedelta())
899
900
 
900
901
    def test_rfc3339_one_second(self):
901
 
        self.assertEqual(datetime.timedelta(0, 1),
902
 
                         string_to_delta("PT1S"))
 
902
        self.assertEqual(string_to_delta("PT1S"),
 
903
                         datetime.timedelta(0, 1))
903
904
 
904
905
    def test_rfc3339_two_hours(self):
905
 
        self.assertEqual(datetime.timedelta(0, 7200),
906
 
                         string_to_delta("PT2H"))
 
906
        self.assertEqual(string_to_delta("PT2H"),
 
907
                         datetime.timedelta(0, 7200))
907
908
 
908
909
    def test_falls_back_to_pre_1_6_1_with_warning(self):
909
910
        with self.assertLogs(log, logging.WARNING):
910
911
            value = string_to_delta("2h")
911
 
        self.assertEqual(datetime.timedelta(0, 7200), value)
 
912
        self.assertEqual(value, datetime.timedelta(0, 7200))
912
913
 
913
914
 
914
915
class Test_check_option_syntax(unittest.TestCase):
957
958
        # Exit code from argparse is guaranteed to be "2".  Reference:
958
959
        # https://docs.python.org/3/library
959
960
        # /argparse.html#exiting-methods
960
 
        self.assertEqual(2, e.exception.code)
 
961
        self.assertEqual(e.exception.code, 2)
961
962
 
962
963
    @staticmethod
963
964
    @contextlib.contextmanager
964
965
    def redirect_stderr_to_devnull():
965
 
        old_stderr = sys.stderr
966
 
        with contextlib.closing(open(os.devnull, "w")) as null:
967
 
            sys.stderr = null
968
 
            try:
969
 
                yield
970
 
            finally:
971
 
                sys.stderr = old_stderr
 
966
        null = os.open(os.path.devnull, os.O_RDWR)
 
967
        stderrcopy = os.dup(sys.stderr.fileno())
 
968
        os.dup2(null, sys.stderr.fileno())
 
969
        os.close(null)
 
970
        try:
 
971
            yield
 
972
        finally:
 
973
            # restore stderr
 
974
            os.dup2(stderrcopy, sys.stderr.fileno())
 
975
            os.close(stderrcopy)
972
976
 
973
977
    def check_option_syntax(self, options):
974
978
        check_option_syntax(self.parser, options)
1085
1089
            def get_object(mockbus_self, busname, dbus_path):
1086
1090
                # Note that "self" is still the testcase instance,
1087
1091
                # this MockBus instance is in "mockbus_self".
1088
 
                self.assertEqual(dbus_busname, busname)
1089
 
                self.assertEqual(server_dbus_path, dbus_path)
 
1092
                self.assertEqual(busname, dbus_busname)
 
1093
                self.assertEqual(dbus_path, server_dbus_path)
1090
1094
                mockbus_self.called = True
1091
1095
                return mockbus_self
1092
1096
 
1104
1108
                bus = get_mandos_dbus_object(bus=MockBusFailing())
1105
1109
 
1106
1110
        if isinstance(e.exception.code, int):
1107
 
            self.assertNotEqual(0, e.exception.code)
 
1111
            self.assertNotEqual(e.exception.code, 0)
1108
1112
        else:
1109
1113
            self.assertIsNotNone(e.exception.code)
1110
1114
 
1143
1147
            dbus_logger.removeFilter(counting_handler)
1144
1148
 
1145
1149
        # Make sure the dbus logger was suppressed
1146
 
        self.assertEqual(0, counting_handler.count)
 
1150
        self.assertEqual(counting_handler.count, 0)
1147
1151
 
1148
1152
        # Test that the dbus_logger still works
1149
1153
        with self.assertLogs(dbus_logger, logging.ERROR):
1150
1154
            dbus_logger.error("Test")
1151
1155
 
1152
1156
        if isinstance(e.exception.code, int):
1153
 
            self.assertNotEqual(0, e.exception.code)
 
1157
            self.assertNotEqual(e.exception.code, 0)
1154
1158
        else:
1155
1159
            self.assertIsNotNone(e.exception.code)
1156
1160
 
1171
1175
        options = self.parser.parse_args(args)
1172
1176
        check_option_syntax(self.parser, options)
1173
1177
        commands = commands_from_options(options)
1174
 
        self.assertEqual(1, len(commands))
 
1178
        self.assertEqual(len(commands), 1)
1175
1179
        command = commands[0]
1176
1180
        self.assertIsInstance(command, command_cls)
1177
1181
        for key, value in cmd_attrs.items():
1178
 
            self.assertEqual(value, getattr(command, key))
 
1182
            self.assertEqual(getattr(command, key), value)
1179
1183
 
1180
1184
    def test_is_enabled_short(self):
1181
1185
        self.assert_command_from_args(["-V", "foo"],
1203
1207
                                          "foo"])
1204
1208
        check_option_syntax(self.parser, options)
1205
1209
        commands = commands_from_options(options)
1206
 
        self.assertEqual(2, len(commands))
 
1210
        self.assertEqual(len(commands), 2)
1207
1211
        self.assertIsInstance(commands[0], command.Deny)
1208
1212
        self.assertIsInstance(commands[1], command.Remove)
1209
1213
 
1212
1216
                                          "--all"])
1213
1217
        check_option_syntax(self.parser, options)
1214
1218
        commands = commands_from_options(options)
1215
 
        self.assertEqual(2, len(commands))
 
1219
        self.assertEqual(len(commands), 2)
1216
1220
        self.assertIsInstance(commands[0], command.Deny)
1217
1221
        self.assertIsInstance(commands[1], command.Remove)
1218
1222
 
1378
1382
                self.attributes["Name"] = name
1379
1383
                self.calls = []
1380
1384
            def Set(self, interface, propname, value, dbus_interface):
1381
 
                testcase.assertEqual(client_dbus_interface, interface)
1382
 
                testcase.assertEqual(dbus.PROPERTIES_IFACE,
1383
 
                                     dbus_interface)
 
1385
                testcase.assertEqual(interface, client_dbus_interface)
 
1386
                testcase.assertEqual(dbus_interface,
 
1387
                                     dbus.PROPERTIES_IFACE)
1384
1388
                self.attributes[propname] = value
1385
1389
            def Approve(self, approve, dbus_interface):
1386
 
                testcase.assertEqual(client_dbus_interface,
1387
 
                                     dbus_interface)
 
1390
                testcase.assertEqual(dbus_interface,
 
1391
                                     client_dbus_interface)
1388
1392
                self.calls.append(("Approve", (approve,
1389
1393
                                               dbus_interface)))
1390
1394
        self.client = MockClient(
1447
1451
        class Bus(object):
1448
1452
            @staticmethod
1449
1453
            def get_object(client_bus_name, path):
1450
 
                self.assertEqual(dbus_busname, client_bus_name)
 
1454
                self.assertEqual(client_bus_name, dbus_busname)
1451
1455
                return {
1452
1456
                    # Note: "self" here is the TestCmd instance, not
1453
1457
                    # the Bus instance, since this is a static method!
1459
1463
 
1460
1464
class TestBaseCommands(TestCommand):
1461
1465
 
1462
 
    def test_IsEnabled_exits_successfully(self):
 
1466
    def test_IsEnabled(self):
 
1467
        self.assertTrue(all(command.IsEnabled().is_enabled(client,
 
1468
                                                      properties)
 
1469
                            for client, properties
 
1470
                            in self.clients.items()))
 
1471
 
 
1472
    def test_IsEnabled_run_exits_successfully(self):
1463
1473
        with self.assertRaises(SystemExit) as e:
1464
1474
            command.IsEnabled().run(self.one_client)
1465
1475
        if e.exception.code is not None:
1466
 
            self.assertEqual(0, e.exception.code)
 
1476
            self.assertEqual(e.exception.code, 0)
1467
1477
        else:
1468
1478
            self.assertIsNone(e.exception.code)
1469
1479
 
1470
 
    def test_IsEnabled_exits_with_failure(self):
 
1480
    def test_IsEnabled_run_exits_with_failure(self):
1471
1481
        self.client.attributes["Enabled"] = dbus.Boolean(False)
1472
1482
        with self.assertRaises(SystemExit) as e:
1473
1483
            command.IsEnabled().run(self.one_client)
1474
1484
        if isinstance(e.exception.code, int):
1475
 
            self.assertNotEqual(0, e.exception.code)
 
1485
            self.assertNotEqual(e.exception.code, 0)
1476
1486
        else:
1477
1487
            self.assertIsNotNone(e.exception.code)
1478
1488
 
1554
1564
    }
1555
1565
 
1556
1566
    def test_DumpJSON_normal(self):
1557
 
        with self.capture_stdout_to_buffer() as buffer:
1558
 
            command.DumpJSON().run(self.clients)
1559
 
        json_data = json.loads(buffer.getvalue())
1560
 
        self.assertDictEqual(self.expected_json, json_data)
1561
 
 
1562
 
    @staticmethod
1563
 
    @contextlib.contextmanager
1564
 
    def capture_stdout_to_buffer():
1565
 
        capture_buffer = io.StringIO()
1566
 
        old_stdout = sys.stdout
1567
 
        sys.stdout = capture_buffer
1568
 
        try:
1569
 
            yield capture_buffer
1570
 
        finally:
1571
 
            sys.stdout = old_stdout
 
1567
        output = command.DumpJSON().output(self.clients.values())
 
1568
        json_data = json.loads(output)
 
1569
        self.assertDictEqual(json_data, self.expected_json)
1572
1570
 
1573
1571
    def test_DumpJSON_one_client(self):
1574
 
        with self.capture_stdout_to_buffer() as buffer:
1575
 
            command.DumpJSON().run(self.one_client)
1576
 
        json_data = json.loads(buffer.getvalue())
 
1572
        output = command.DumpJSON().output(self.one_client.values())
 
1573
        json_data = json.loads(output)
1577
1574
        expected_json = {"foo": self.expected_json["foo"]}
1578
 
        self.assertDictEqual(expected_json, json_data)
 
1575
        self.assertDictEqual(json_data, expected_json)
1579
1576
 
1580
1577
    def test_PrintTable_normal(self):
1581
 
        with self.capture_stdout_to_buffer() as buffer:
1582
 
            command.PrintTable().run(self.clients)
 
1578
        output = command.PrintTable().output(self.clients.values())
1583
1579
        expected_output = "\n".join((
1584
1580
            "Name   Enabled Timeout  Last Successful Check",
1585
1581
            "foo    Yes     00:05:00 2019-02-03T00:00:00  ",
1586
1582
            "barbar Yes     00:05:00 2019-02-04T00:00:00  ",
1587
 
        )) + "\n"
1588
 
        self.assertEqual(expected_output, buffer.getvalue())
 
1583
        ))
 
1584
        self.assertEqual(output, expected_output)
1589
1585
 
1590
1586
    def test_PrintTable_verbose(self):
1591
 
        with self.capture_stdout_to_buffer() as buffer:
1592
 
            command.PrintTable(verbose=True).run(self.clients)
 
1587
        output = command.PrintTable(verbose=True).output(
 
1588
            self.clients.values())
1593
1589
        columns = (
1594
1590
            (
1595
1591
                "Name   ",
1677
1673
            )
1678
1674
        )
1679
1675
        num_lines = max(len(rows) for rows in columns)
1680
 
        expected_output = ("\n".join("".join(rows[line]
1681
 
                                             for rows in columns)
1682
 
                                     for line in range(num_lines))
1683
 
                           + "\n")
1684
 
        self.assertEqual(expected_output, buffer.getvalue())
 
1676
        expected_output = "\n".join("".join(rows[line]
 
1677
                                            for rows in columns)
 
1678
                                    for line in range(num_lines))
 
1679
        self.assertEqual(output, expected_output)
1685
1680
 
1686
1681
    def test_PrintTable_one_client(self):
1687
 
        with self.capture_stdout_to_buffer() as buffer:
1688
 
            command.PrintTable().run(self.one_client)
 
1682
        output = command.PrintTable().output(self.one_client.values())
1689
1683
        expected_output = "\n".join((
1690
1684
            "Name Enabled Timeout  Last Successful Check",
1691
1685
            "foo  Yes     00:05:00 2019-02-03T00:00:00  ",
1692
 
        )) + "\n"
1693
 
        self.assertEqual(expected_output, buffer.getvalue())
 
1686
        ))
 
1687
        self.assertEqual(output, expected_output)
1694
1688
 
1695
1689
 
1696
1690
class TestPropertyCmd(TestCommand):
1711
1705
                client = self.bus.get_object(dbus_busname, clientpath)
1712
1706
                value = client.attributes[self.propname]
1713
1707
                self.assertNotIsInstance(value, self.Unique)
1714
 
                self.assertEqual(value_to_get, value)
 
1708
                self.assertEqual(value, value_to_get)
1715
1709
 
1716
1710
    class Unique(object):
1717
1711
        """Class for objects which exist only to be unique objects,