/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-18 22:29:25 UTC
  • Revision ID: teddy@recompile.se-20190318222925-jvhek84dgcfgj6g3
mandos-ctl: Refactor tests

* mandos-ctl: Where the clients names "foo" and "barbar" do not refer
              to the actual mock clients in the TestCommand class,
              change all occurrences of these names to "client1" and
              "client2" (or just "client" when only one is used) .
              Also change all test doubles to use correct terminology;
              some things called mocks are actually stubs or spies,
              and rename all true mocks to have "mock" in their names.
              Also eliminate duplicate values in tests; derive values
              from previously defined values whenever possible.

Show diffs side-by-side

added added

removed removed

Lines of Context:
61
61
 
62
62
if sys.version_info.major == 2:
63
63
    str = unicode
 
64
    import StringIO
 
65
    io.StringIO = StringIO.StringIO
64
66
 
65
67
locale.setlocale(locale.LC_ALL, "")
66
68
 
613
615
                        "Checker", "ExtendedTimeout", "Expires",
614
616
                        "LastCheckerStatus")
615
617
 
616
 
        def run(self, clients, bus=None, mandos=None):
617
 
            print(self.output(clients.values()))
618
 
 
619
 
        def output(self, clients):
620
 
            raise NotImplementedError()
621
 
 
622
618
 
623
619
    class DumpJSON(Output):
624
 
        def output(self, clients):
 
620
        def run(self, clients, bus=None, mandos=None):
625
621
            data = {client["Name"]:
626
622
                    {key: self.dbus_boolean_to_bool(client[key])
627
623
                     for key in self.all_keywords}
628
 
                    for client in clients}
629
 
            return json.dumps(data, indent=4, separators=(',', ': '))
 
624
                    for client in clients.values()}
 
625
            print(json.dumps(data, indent=4, separators=(',', ': ')))
630
626
 
631
627
        @staticmethod
632
628
        def dbus_boolean_to_bool(value):
639
635
        def __init__(self, verbose=False):
640
636
            self.verbose = verbose
641
637
 
642
 
        def output(self, clients):
 
638
        def run(self, clients, bus=None, mandos=None):
643
639
            default_keywords = ("Name", "Enabled", "Timeout",
644
640
                                "LastCheckedOK")
645
641
            keywords = default_keywords
646
642
            if self.verbose:
647
643
                keywords = self.all_keywords
648
 
            return str(self.TableOfClients(clients, keywords))
 
644
            print(self.TableOfClients(clients.values(), keywords))
649
645
 
650
646
        class TableOfClients(object):
651
647
            tableheaders = {
886
882
 
887
883
 
888
884
class Test_string_to_delta(TestCaseWithAssertLogs):
889
 
    def test_handles_basic_rfc3339(self):
890
 
        self.assertEqual(string_to_delta("PT0S"),
891
 
                         datetime.timedelta())
892
 
        self.assertEqual(string_to_delta("P0D"),
893
 
                         datetime.timedelta())
894
 
        self.assertEqual(string_to_delta("PT1S"),
895
 
                         datetime.timedelta(0, 1))
896
 
        self.assertEqual(string_to_delta("PT2H"),
897
 
                         datetime.timedelta(0, 7200))
 
885
    # Just test basic RFC 3339 functionality here, the doc string for
 
886
    # rfc3339_duration_to_delta() already has more comprehensive
 
887
    # tests, which is run by doctest.
 
888
 
 
889
    def test_rfc3339_zero_seconds(self):
 
890
        self.assertEqual(datetime.timedelta(),
 
891
                         string_to_delta("PT0S"))
 
892
 
 
893
    def test_rfc3339_zero_days(self):
 
894
        self.assertEqual(datetime.timedelta(), string_to_delta("P0D"))
 
895
 
 
896
    def test_rfc3339_one_second(self):
 
897
        self.assertEqual(datetime.timedelta(0, 1),
 
898
                         string_to_delta("PT1S"))
 
899
 
 
900
    def test_rfc3339_two_hours(self):
 
901
        self.assertEqual(datetime.timedelta(0, 7200),
 
902
                         string_to_delta("PT2H"))
898
903
 
899
904
    def test_falls_back_to_pre_1_6_1_with_warning(self):
900
905
        with self.assertLogs(log, logging.WARNING):
901
906
            value = string_to_delta("2h")
902
 
        self.assertEqual(value, datetime.timedelta(0, 7200))
 
907
        self.assertEqual(datetime.timedelta(0, 7200), value)
903
908
 
904
909
 
905
910
class Test_check_option_syntax(unittest.TestCase):
948
953
        # Exit code from argparse is guaranteed to be "2".  Reference:
949
954
        # https://docs.python.org/3/library
950
955
        # /argparse.html#exiting-methods
951
 
        self.assertEqual(e.exception.code, 2)
 
956
        self.assertEqual(2, e.exception.code)
952
957
 
953
958
    @staticmethod
954
959
    @contextlib.contextmanager
955
960
    def redirect_stderr_to_devnull():
956
 
        null = os.open(os.path.devnull, os.O_RDWR)
957
 
        stderrcopy = os.dup(sys.stderr.fileno())
958
 
        os.dup2(null, sys.stderr.fileno())
959
 
        os.close(null)
960
 
        try:
961
 
            yield
962
 
        finally:
963
 
            # restore stderr
964
 
            os.dup2(stderrcopy, sys.stderr.fileno())
965
 
            os.close(stderrcopy)
 
961
        old_stderr = sys.stderr
 
962
        with contextlib.closing(open(os.devnull, "w")) as null:
 
963
            sys.stderr = null
 
964
            try:
 
965
                yield
 
966
            finally:
 
967
                sys.stderr = old_stderr
966
968
 
967
969
    def check_option_syntax(self, options):
968
970
        check_option_syntax(self.parser, options)
981
983
            options = self.parser.parse_args()
982
984
            setattr(options, action, value)
983
985
            options.verbose = True
984
 
            options.client = ["foo"]
 
986
            options.client = ["client"]
985
987
            with self.assertParseError():
986
988
                self.check_option_syntax(options)
987
989
 
1017
1019
        for action, value in self.actions.items():
1018
1020
            options = self.parser.parse_args()
1019
1021
            setattr(options, action, value)
1020
 
            options.client = ["foo"]
 
1022
            options.client = ["client"]
1021
1023
            self.check_option_syntax(options)
1022
1024
 
1023
1025
    def test_one_client_with_all_actions_except_is_enabled(self):
1026
1028
            if action == "is_enabled":
1027
1029
                continue
1028
1030
            setattr(options, action, value)
1029
 
        options.client = ["foo"]
 
1031
        options.client = ["client"]
1030
1032
        self.check_option_syntax(options)
1031
1033
 
1032
1034
    def test_two_clients_with_all_actions_except_is_enabled(self):
1035
1037
            if action == "is_enabled":
1036
1038
                continue
1037
1039
            setattr(options, action, value)
1038
 
        options.client = ["foo", "barbar"]
 
1040
        options.client = ["client1", "client2"]
1039
1041
        self.check_option_syntax(options)
1040
1042
 
1041
1043
    def test_two_clients_are_ok_with_actions_except_is_enabled(self):
1044
1046
                continue
1045
1047
            options = self.parser.parse_args()
1046
1048
            setattr(options, action, value)
1047
 
            options.client = ["foo", "barbar"]
 
1049
            options.client = ["client1", "client2"]
1048
1050
            self.check_option_syntax(options)
1049
1051
 
1050
1052
    def test_is_enabled_fails_without_client(self):
1056
1058
    def test_is_enabled_fails_with_two_clients(self):
1057
1059
        options = self.parser.parse_args()
1058
1060
        options.is_enabled = True
1059
 
        options.client = ["foo", "barbar"]
 
1061
        options.client = ["client1", "client2"]
1060
1062
        with self.assertParseError():
1061
1063
            self.check_option_syntax(options)
1062
1064
 
1079
1081
            def get_object(mockbus_self, busname, dbus_path):
1080
1082
                # Note that "self" is still the testcase instance,
1081
1083
                # this MockBus instance is in "mockbus_self".
1082
 
                self.assertEqual(busname, dbus_busname)
1083
 
                self.assertEqual(dbus_path, server_dbus_path)
 
1084
                self.assertEqual(dbus_busname, busname)
 
1085
                self.assertEqual(server_dbus_path, dbus_path)
1084
1086
                mockbus_self.called = True
1085
1087
                return mockbus_self
1086
1088
 
1089
1091
        self.assertTrue(mockbus.called)
1090
1092
 
1091
1093
    def test_logs_and_exits_on_dbus_error(self):
1092
 
        class MockBusFailing(object):
 
1094
        class FailingBusStub(object):
1093
1095
            def get_object(self, busname, dbus_path):
1094
1096
                raise dbus.exceptions.DBusException("Test")
1095
1097
 
1096
1098
        with self.assertLogs(log, logging.CRITICAL):
1097
1099
            with self.assertRaises(SystemExit) as e:
1098
 
                bus = get_mandos_dbus_object(bus=MockBusFailing())
 
1100
                bus = get_mandos_dbus_object(bus=FailingBusStub())
1099
1101
 
1100
1102
        if isinstance(e.exception.code, int):
1101
 
            self.assertNotEqual(e.exception.code, 0)
 
1103
            self.assertNotEqual(0, e.exception.code)
1102
1104
        else:
1103
1105
            self.assertIsNotNone(e.exception.code)
1104
1106
 
1105
1107
 
1106
1108
class Test_get_managed_objects(TestCaseWithAssertLogs):
1107
1109
    def test_calls_and_returns_GetManagedObjects(self):
1108
 
        managed_objects = {"/clients/foo": { "Name": "foo"}}
1109
 
        class MockObjectManager(object):
 
1110
        managed_objects = {"/clients/client": { "Name": "client"}}
 
1111
        class ObjectManagerStub(object):
1110
1112
            def GetManagedObjects(self):
1111
1113
                return managed_objects
1112
 
        retval = get_managed_objects(MockObjectManager())
 
1114
        retval = get_managed_objects(ObjectManagerStub())
1113
1115
        self.assertDictEqual(managed_objects, retval)
1114
1116
 
1115
1117
    def test_logs_and_exits_on_dbus_error(self):
1116
1118
        dbus_logger = logging.getLogger("dbus.proxies")
1117
1119
 
1118
 
        class MockObjectManagerFailing(object):
 
1120
        class ObjectManagerFailingStub(object):
1119
1121
            def GetManagedObjects(self):
1120
1122
                dbus_logger.error("Test")
1121
1123
                raise dbus.exceptions.DBusException("Test")
1132
1134
        try:
1133
1135
            with self.assertLogs(log, logging.CRITICAL) as watcher:
1134
1136
                with self.assertRaises(SystemExit) as e:
1135
 
                    get_managed_objects(MockObjectManagerFailing())
 
1137
                    get_managed_objects(ObjectManagerFailingStub())
1136
1138
        finally:
1137
1139
            dbus_logger.removeFilter(counting_handler)
1138
1140
 
1139
1141
        # Make sure the dbus logger was suppressed
1140
 
        self.assertEqual(counting_handler.count, 0)
 
1142
        self.assertEqual(0, counting_handler.count)
1141
1143
 
1142
1144
        # Test that the dbus_logger still works
1143
1145
        with self.assertLogs(dbus_logger, logging.ERROR):
1144
1146
            dbus_logger.error("Test")
1145
1147
 
1146
1148
        if isinstance(e.exception.code, int):
1147
 
            self.assertNotEqual(e.exception.code, 0)
 
1149
            self.assertNotEqual(0, e.exception.code)
1148
1150
        else:
1149
1151
            self.assertIsNotNone(e.exception.code)
1150
1152
 
1155
1157
        add_command_line_options(self.parser)
1156
1158
 
1157
1159
    def test_is_enabled(self):
1158
 
        self.assert_command_from_args(["--is-enabled", "foo"],
 
1160
        self.assert_command_from_args(["--is-enabled", "client"],
1159
1161
                                      command.IsEnabled)
1160
1162
 
1161
1163
    def assert_command_from_args(self, args, command_cls,
1165
1167
        options = self.parser.parse_args(args)
1166
1168
        check_option_syntax(self.parser, options)
1167
1169
        commands = commands_from_options(options)
1168
 
        self.assertEqual(len(commands), 1)
 
1170
        self.assertEqual(1, len(commands))
1169
1171
        command = commands[0]
1170
1172
        self.assertIsInstance(command, command_cls)
1171
1173
        for key, value in cmd_attrs.items():
1172
 
            self.assertEqual(getattr(command, key), value)
 
1174
            self.assertEqual(value, getattr(command, key))
1173
1175
 
1174
1176
    def test_is_enabled_short(self):
1175
 
        self.assert_command_from_args(["-V", "foo"],
 
1177
        self.assert_command_from_args(["-V", "client"],
1176
1178
                                      command.IsEnabled)
1177
1179
 
1178
1180
    def test_approve(self):
1179
 
        self.assert_command_from_args(["--approve", "foo"],
 
1181
        self.assert_command_from_args(["--approve", "client"],
1180
1182
                                      command.Approve)
1181
1183
 
1182
1184
    def test_approve_short(self):
1183
 
        self.assert_command_from_args(["-A", "foo"], command.Approve)
 
1185
        self.assert_command_from_args(["-A", "client"],
 
1186
                                      command.Approve)
1184
1187
 
1185
1188
    def test_deny(self):
1186
 
        self.assert_command_from_args(["--deny", "foo"], command.Deny)
 
1189
        self.assert_command_from_args(["--deny", "client"],
 
1190
                                      command.Deny)
1187
1191
 
1188
1192
    def test_deny_short(self):
1189
 
        self.assert_command_from_args(["-D", "foo"], command.Deny)
 
1193
        self.assert_command_from_args(["-D", "client"], command.Deny)
1190
1194
 
1191
1195
    def test_remove(self):
1192
 
        self.assert_command_from_args(["--remove", "foo"],
 
1196
        self.assert_command_from_args(["--remove", "client"],
1193
1197
                                      command.Remove)
1194
1198
 
1195
1199
    def test_deny_before_remove(self):
1196
1200
        options = self.parser.parse_args(["--deny", "--remove",
1197
 
                                          "foo"])
 
1201
                                          "client"])
1198
1202
        check_option_syntax(self.parser, options)
1199
1203
        commands = commands_from_options(options)
1200
 
        self.assertEqual(len(commands), 2)
 
1204
        self.assertEqual(2, len(commands))
1201
1205
        self.assertIsInstance(commands[0], command.Deny)
1202
1206
        self.assertIsInstance(commands[1], command.Remove)
1203
1207
 
1206
1210
                                          "--all"])
1207
1211
        check_option_syntax(self.parser, options)
1208
1212
        commands = commands_from_options(options)
1209
 
        self.assertEqual(len(commands), 2)
 
1213
        self.assertEqual(2, len(commands))
1210
1214
        self.assertIsInstance(commands[0], command.Deny)
1211
1215
        self.assertIsInstance(commands[1], command.Remove)
1212
1216
 
1213
1217
    def test_remove_short(self):
1214
 
        self.assert_command_from_args(["-r", "foo"], command.Remove)
 
1218
        self.assert_command_from_args(["-r", "client"],
 
1219
                                      command.Remove)
1215
1220
 
1216
1221
    def test_dump_json(self):
1217
1222
        self.assert_command_from_args(["--dump-json"],
1218
1223
                                      command.DumpJSON)
1219
1224
 
1220
1225
    def test_enable(self):
1221
 
        self.assert_command_from_args(["--enable", "foo"],
 
1226
        self.assert_command_from_args(["--enable", "client"],
1222
1227
                                      command.Enable)
1223
1228
 
1224
1229
    def test_enable_short(self):
1225
 
        self.assert_command_from_args(["-e", "foo"], command.Enable)
 
1230
        self.assert_command_from_args(["-e", "client"],
 
1231
                                      command.Enable)
1226
1232
 
1227
1233
    def test_disable(self):
1228
 
        self.assert_command_from_args(["--disable", "foo"],
 
1234
        self.assert_command_from_args(["--disable", "client"],
1229
1235
                                      command.Disable)
1230
1236
 
1231
1237
    def test_disable_short(self):
1232
 
        self.assert_command_from_args(["-d", "foo"], command.Disable)
 
1238
        self.assert_command_from_args(["-d", "client"],
 
1239
                                      command.Disable)
1233
1240
 
1234
1241
    def test_bump_timeout(self):
1235
 
        self.assert_command_from_args(["--bump-timeout", "foo"],
 
1242
        self.assert_command_from_args(["--bump-timeout", "client"],
1236
1243
                                      command.BumpTimeout)
1237
1244
 
1238
1245
    def test_bump_timeout_short(self):
1239
 
        self.assert_command_from_args(["-b", "foo"],
 
1246
        self.assert_command_from_args(["-b", "client"],
1240
1247
                                      command.BumpTimeout)
1241
1248
 
1242
1249
    def test_start_checker(self):
1243
 
        self.assert_command_from_args(["--start-checker", "foo"],
 
1250
        self.assert_command_from_args(["--start-checker", "client"],
1244
1251
                                      command.StartChecker)
1245
1252
 
1246
1253
    def test_stop_checker(self):
1247
 
        self.assert_command_from_args(["--stop-checker", "foo"],
 
1254
        self.assert_command_from_args(["--stop-checker", "client"],
1248
1255
                                      command.StopChecker)
1249
1256
 
1250
1257
    def test_approve_by_default(self):
1251
 
        self.assert_command_from_args(["--approve-by-default", "foo"],
 
1258
        self.assert_command_from_args(["--approve-by-default",
 
1259
                                       "client"],
1252
1260
                                      command.ApproveByDefault)
1253
1261
 
1254
1262
    def test_deny_by_default(self):
1255
 
        self.assert_command_from_args(["--deny-by-default", "foo"],
 
1263
        self.assert_command_from_args(["--deny-by-default", "client"],
1256
1264
                                      command.DenyByDefault)
1257
1265
 
1258
1266
    def test_checker(self):
1259
 
        self.assert_command_from_args(["--checker", ":", "foo"],
 
1267
        self.assert_command_from_args(["--checker", ":", "client"],
1260
1268
                                      command.SetChecker,
1261
1269
                                      value_to_set=":")
1262
1270
 
1263
1271
    def test_checker_empty(self):
1264
 
        self.assert_command_from_args(["--checker", "", "foo"],
 
1272
        self.assert_command_from_args(["--checker", "", "client"],
1265
1273
                                      command.SetChecker,
1266
1274
                                      value_to_set="")
1267
1275
 
1268
1276
    def test_checker_short(self):
1269
 
        self.assert_command_from_args(["-c", ":", "foo"],
 
1277
        self.assert_command_from_args(["-c", ":", "client"],
1270
1278
                                      command.SetChecker,
1271
1279
                                      value_to_set=":")
1272
1280
 
1273
1281
    def test_host(self):
1274
 
        self.assert_command_from_args(["--host", "foo.example.org",
1275
 
                                       "foo"], command.SetHost,
1276
 
                                      value_to_set="foo.example.org")
 
1282
        self.assert_command_from_args(
 
1283
            ["--host", "client.example.org", "client"],
 
1284
            command.SetHost, value_to_set="client.example.org")
1277
1285
 
1278
1286
    def test_host_short(self):
1279
 
        self.assert_command_from_args(["-H", "foo.example.org",
1280
 
                                       "foo"], command.SetHost,
1281
 
                                      value_to_set="foo.example.org")
 
1287
        self.assert_command_from_args(
 
1288
            ["-H", "client.example.org", "client"], command.SetHost,
 
1289
            value_to_set="client.example.org")
1282
1290
 
1283
1291
    def test_secret_devnull(self):
1284
1292
        self.assert_command_from_args(["--secret", os.path.devnull,
1285
 
                                       "foo"], command.SetSecret,
 
1293
                                       "client"], command.SetSecret,
1286
1294
                                      value_to_set=b"")
1287
1295
 
1288
1296
    def test_secret_tempfile(self):
1291
1299
            f.write(value)
1292
1300
            f.seek(0)
1293
1301
            self.assert_command_from_args(["--secret", f.name,
1294
 
                                           "foo"], command.SetSecret,
 
1302
                                           "client"],
 
1303
                                          command.SetSecret,
1295
1304
                                          value_to_set=value)
1296
1305
 
1297
1306
    def test_secret_devnull_short(self):
1298
 
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1299
 
                                      command.SetSecret,
 
1307
        self.assert_command_from_args(["-s", os.path.devnull,
 
1308
                                       "client"], command.SetSecret,
1300
1309
                                      value_to_set=b"")
1301
1310
 
1302
1311
    def test_secret_tempfile_short(self):
1304
1313
            value = b"secret\0xyzzy\nbar"
1305
1314
            f.write(value)
1306
1315
            f.seek(0)
1307
 
            self.assert_command_from_args(["-s", f.name, "foo"],
 
1316
            self.assert_command_from_args(["-s", f.name, "client"],
1308
1317
                                          command.SetSecret,
1309
1318
                                          value_to_set=value)
1310
1319
 
1311
1320
    def test_timeout(self):
1312
 
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
 
1321
        self.assert_command_from_args(["--timeout", "PT5M", "client"],
1313
1322
                                      command.SetTimeout,
1314
1323
                                      value_to_set=300000)
1315
1324
 
1316
1325
    def test_timeout_short(self):
1317
 
        self.assert_command_from_args(["-t", "PT5M", "foo"],
 
1326
        self.assert_command_from_args(["-t", "PT5M", "client"],
1318
1327
                                      command.SetTimeout,
1319
1328
                                      value_to_set=300000)
1320
1329
 
1321
1330
    def test_extended_timeout(self):
1322
1331
        self.assert_command_from_args(["--extended-timeout", "PT15M",
1323
 
                                       "foo"],
 
1332
                                       "client"],
1324
1333
                                      command.SetExtendedTimeout,
1325
1334
                                      value_to_set=900000)
1326
1335
 
1327
1336
    def test_interval(self):
1328
 
        self.assert_command_from_args(["--interval", "PT2M", "foo"],
1329
 
                                      command.SetInterval,
 
1337
        self.assert_command_from_args(["--interval", "PT2M",
 
1338
                                       "client"], command.SetInterval,
1330
1339
                                      value_to_set=120000)
1331
1340
 
1332
1341
    def test_interval_short(self):
1333
 
        self.assert_command_from_args(["-i", "PT2M", "foo"],
 
1342
        self.assert_command_from_args(["-i", "PT2M", "client"],
1334
1343
                                      command.SetInterval,
1335
1344
                                      value_to_set=120000)
1336
1345
 
1337
1346
    def test_approval_delay(self):
1338
1347
        self.assert_command_from_args(["--approval-delay", "PT30S",
1339
 
                                       "foo"],
 
1348
                                       "client"],
1340
1349
                                      command.SetApprovalDelay,
1341
1350
                                      value_to_set=30000)
1342
1351
 
1343
1352
    def test_approval_duration(self):
1344
1353
        self.assert_command_from_args(["--approval-duration", "PT1S",
1345
 
                                       "foo"],
 
1354
                                       "client"],
1346
1355
                                      command.SetApprovalDuration,
1347
1356
                                      value_to_set=1000)
1348
1357
 
1372
1381
                self.attributes["Name"] = name
1373
1382
                self.calls = []
1374
1383
            def Set(self, interface, propname, value, dbus_interface):
1375
 
                testcase.assertEqual(interface, client_dbus_interface)
1376
 
                testcase.assertEqual(dbus_interface,
1377
 
                                     dbus.PROPERTIES_IFACE)
 
1384
                testcase.assertEqual(client_dbus_interface, interface)
 
1385
                testcase.assertEqual(dbus.PROPERTIES_IFACE,
 
1386
                                     dbus_interface)
1378
1387
                self.attributes[propname] = value
1379
 
            def Get(self, interface, propname, dbus_interface):
1380
 
                testcase.assertEqual(interface, client_dbus_interface)
1381
 
                testcase.assertEqual(dbus_interface,
1382
 
                                     dbus.PROPERTIES_IFACE)
1383
 
                return self.attributes[propname]
1384
1388
            def Approve(self, approve, dbus_interface):
1385
 
                testcase.assertEqual(dbus_interface,
1386
 
                                     client_dbus_interface)
 
1389
                testcase.assertEqual(client_dbus_interface,
 
1390
                                     dbus_interface)
1387
1391
                self.calls.append(("Approve", (approve,
1388
1392
                                               dbus_interface)))
1389
1393
        self.client = MockClient(
1436
1440
            LastCheckerStatus=-2)
1437
1441
        self.clients =  collections.OrderedDict(
1438
1442
            [
1439
 
                ("/clients/foo", self.client.attributes),
1440
 
                ("/clients/barbar", self.other_client.attributes),
 
1443
                (self.client.__dbus_object_path__,
 
1444
                 self.client.attributes),
 
1445
                (self.other_client.__dbus_object_path__,
 
1446
                 self.other_client.attributes),
1441
1447
            ])
1442
 
        self.one_client = {"/clients/foo": self.client.attributes}
 
1448
        self.one_client = {self.client.__dbus_object_path__:
 
1449
                           self.client.attributes}
1443
1450
 
1444
1451
    @property
1445
1452
    def bus(self):
1446
 
        class Bus(object):
 
1453
        class MockBus(object):
1447
1454
            @staticmethod
1448
1455
            def get_object(client_bus_name, path):
1449
 
                self.assertEqual(client_bus_name, dbus_busname)
1450
 
                return {
1451
 
                    # Note: "self" here is the TestCmd instance, not
1452
 
                    # the Bus instance, since this is a static method!
1453
 
                    "/clients/foo": self.client,
1454
 
                    "/clients/barbar": self.other_client,
1455
 
                }[path]
1456
 
        return Bus()
 
1456
                self.assertEqual(dbus_busname, client_bus_name)
 
1457
                # Note: "self" here is the TestCmd instance, not the
 
1458
                # MockBus instance, since this is a static method!
 
1459
                if path == self.client.__dbus_object_path__:
 
1460
                    return self.client
 
1461
                elif path == self.other_client.__dbus_object_path__:
 
1462
                    return self.other_client
 
1463
        return MockBus()
1457
1464
 
1458
1465
 
1459
1466
class TestBaseCommands(TestCommand):
1460
1467
 
1461
 
    def test_is_enabled(self):
1462
 
        self.assertTrue(all(command.IsEnabled().is_enabled(client,
1463
 
                                                      properties)
1464
 
                            for client, properties
1465
 
                            in self.clients.items()))
1466
 
 
1467
 
    def test_is_enabled_run_exits_successfully(self):
 
1468
    def test_IsEnabled_exits_successfully(self):
1468
1469
        with self.assertRaises(SystemExit) as e:
1469
1470
            command.IsEnabled().run(self.one_client)
1470
1471
        if e.exception.code is not None:
1471
 
            self.assertEqual(e.exception.code, 0)
 
1472
            self.assertEqual(0, e.exception.code)
1472
1473
        else:
1473
1474
            self.assertIsNone(e.exception.code)
1474
1475
 
1475
 
    def test_is_enabled_run_exits_with_failure(self):
 
1476
    def test_IsEnabled_exits_with_failure(self):
1476
1477
        self.client.attributes["Enabled"] = dbus.Boolean(False)
1477
1478
        with self.assertRaises(SystemExit) as e:
1478
1479
            command.IsEnabled().run(self.one_client)
1479
1480
        if isinstance(e.exception.code, int):
1480
 
            self.assertNotEqual(e.exception.code, 0)
 
1481
            self.assertNotEqual(0, e.exception.code)
1481
1482
        else:
1482
1483
            self.assertIsNotNone(e.exception.code)
1483
1484
 
1484
 
    def test_approve(self):
 
1485
    def test_Approve(self):
1485
1486
        command.Approve().run(self.clients, self.bus)
1486
1487
        for clientpath in self.clients:
1487
1488
            client = self.bus.get_object(dbus_busname, clientpath)
1488
1489
            self.assertIn(("Approve", (True, client_dbus_interface)),
1489
1490
                          client.calls)
1490
1491
 
1491
 
    def test_deny(self):
 
1492
    def test_Deny(self):
1492
1493
        command.Deny().run(self.clients, self.bus)
1493
1494
        for clientpath in self.clients:
1494
1495
            client = self.bus.get_object(dbus_busname, clientpath)
1495
1496
            self.assertIn(("Approve", (False, client_dbus_interface)),
1496
1497
                          client.calls)
1497
1498
 
1498
 
    def test_remove(self):
1499
 
        class MockMandos(object):
 
1499
    def test_Remove(self):
 
1500
        class MandosSpy(object):
1500
1501
            def __init__(self):
1501
1502
                self.calls = []
1502
1503
            def RemoveClient(self, dbus_path):
1503
1504
                self.calls.append(("RemoveClient", (dbus_path,)))
1504
 
        mandos = MockMandos()
 
1505
        mandos = MandosSpy()
1505
1506
        command.Remove().run(self.clients, self.bus, mandos)
1506
 
        self.assertEqual(len(mandos.calls), 2)
1507
1507
        for clientpath in self.clients:
1508
1508
            self.assertIn(("RemoveClient", (clientpath,)),
1509
1509
                          mandos.calls)
1560
1560
    }
1561
1561
 
1562
1562
    def test_DumpJSON_normal(self):
1563
 
        output = command.DumpJSON().output(self.clients.values())
1564
 
        json_data = json.loads(output)
1565
 
        self.assertDictEqual(json_data, self.expected_json)
 
1563
        with self.capture_stdout_to_buffer() as buffer:
 
1564
            command.DumpJSON().run(self.clients)
 
1565
        json_data = json.loads(buffer.getvalue())
 
1566
        self.assertDictEqual(self.expected_json, json_data)
 
1567
 
 
1568
    @staticmethod
 
1569
    @contextlib.contextmanager
 
1570
    def capture_stdout_to_buffer():
 
1571
        capture_buffer = io.StringIO()
 
1572
        old_stdout = sys.stdout
 
1573
        sys.stdout = capture_buffer
 
1574
        try:
 
1575
            yield capture_buffer
 
1576
        finally:
 
1577
            sys.stdout = old_stdout
1566
1578
 
1567
1579
    def test_DumpJSON_one_client(self):
1568
 
        output = command.DumpJSON().output(self.one_client.values())
1569
 
        json_data = json.loads(output)
 
1580
        with self.capture_stdout_to_buffer() as buffer:
 
1581
            command.DumpJSON().run(self.one_client)
 
1582
        json_data = json.loads(buffer.getvalue())
1570
1583
        expected_json = {"foo": self.expected_json["foo"]}
1571
 
        self.assertDictEqual(json_data, expected_json)
 
1584
        self.assertDictEqual(expected_json, json_data)
1572
1585
 
1573
1586
    def test_PrintTable_normal(self):
1574
 
        output = command.PrintTable().output(self.clients.values())
 
1587
        with self.capture_stdout_to_buffer() as buffer:
 
1588
            command.PrintTable().run(self.clients)
1575
1589
        expected_output = "\n".join((
1576
1590
            "Name   Enabled Timeout  Last Successful Check",
1577
1591
            "foo    Yes     00:05:00 2019-02-03T00:00:00  ",
1578
1592
            "barbar Yes     00:05:00 2019-02-04T00:00:00  ",
1579
 
        ))
1580
 
        self.assertEqual(output, expected_output)
 
1593
        )) + "\n"
 
1594
        self.assertEqual(expected_output, buffer.getvalue())
1581
1595
 
1582
1596
    def test_PrintTable_verbose(self):
1583
 
        output = command.PrintTable(verbose=True).output(
1584
 
            self.clients.values())
 
1597
        with self.capture_stdout_to_buffer() as buffer:
 
1598
            command.PrintTable(verbose=True).run(self.clients)
1585
1599
        columns = (
1586
1600
            (
1587
1601
                "Name   ",
1669
1683
            )
1670
1684
        )
1671
1685
        num_lines = max(len(rows) for rows in columns)
1672
 
        expected_output = "\n".join("".join(rows[line]
1673
 
                                            for rows in columns)
1674
 
                                    for line in range(num_lines))
1675
 
        self.assertEqual(output, expected_output)
 
1686
        expected_output = ("\n".join("".join(rows[line]
 
1687
                                             for rows in columns)
 
1688
                                     for line in range(num_lines))
 
1689
                           + "\n")
 
1690
        self.assertEqual(expected_output, buffer.getvalue())
1676
1691
 
1677
1692
    def test_PrintTable_one_client(self):
1678
 
        output = command.PrintTable().output(self.one_client.values())
 
1693
        with self.capture_stdout_to_buffer() as buffer:
 
1694
            command.PrintTable().run(self.one_client)
1679
1695
        expected_output = "\n".join((
1680
1696
            "Name Enabled Timeout  Last Successful Check",
1681
1697
            "foo  Yes     00:05:00 2019-02-03T00:00:00  ",
1682
 
        ))
1683
 
        self.assertEqual(output, expected_output)
 
1698
        )) + "\n"
 
1699
        self.assertEqual(expected_output, buffer.getvalue())
1684
1700
 
1685
1701
 
1686
1702
class TestPropertyCmd(TestCommand):
1695
1711
            for clientpath in self.clients:
1696
1712
                client = self.bus.get_object(dbus_busname, clientpath)
1697
1713
                old_value = client.attributes[self.propname]
1698
 
                self.assertNotIsInstance(old_value, self.Unique)
1699
1714
                client.attributes[self.propname] = self.Unique()
1700
1715
            self.run_command(value_to_set, self.clients)
1701
1716
            for clientpath in self.clients:
1702
1717
                client = self.bus.get_object(dbus_busname, clientpath)
1703
1718
                value = client.attributes[self.propname]
1704
1719
                self.assertNotIsInstance(value, self.Unique)
1705
 
                self.assertEqual(value, value_to_get)
 
1720
                self.assertEqual(value_to_get, value)
1706
1721
 
1707
1722
    class Unique(object):
1708
1723
        """Class for objects which exist only to be unique objects,
1775
1790
class TestSetHostCmd(TestPropertyValueCmd):
1776
1791
    command = command.SetHost
1777
1792
    propname = "Host"
1778
 
    values_to_set = ["192.0.2.3", "foo.example.org"]
 
1793
    values_to_set = ["192.0.2.3", "client.example.org"]
1779
1794
 
1780
1795
 
1781
1796
class TestSetSecretCmd(TestPropertyValueCmd):
1783
1798
    propname = "Secret"
1784
1799
    values_to_set = [io.BytesIO(b""),
1785
1800
                     io.BytesIO(b"secret\0xyzzy\nbar")]
1786
 
    values_to_get = [b"", b"secret\0xyzzy\nbar"]
 
1801
    values_to_get = [f.getvalue() for f in values_to_set]
1787
1802
 
1788
1803
 
1789
1804
class TestSetTimeoutCmd(TestPropertyValueCmd):
1794
1809
                     datetime.timedelta(seconds=1),
1795
1810
                     datetime.timedelta(weeks=1),
1796
1811
                     datetime.timedelta(weeks=52)]
1797
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1812
    values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1798
1813
 
1799
1814
 
1800
1815
class TestSetExtendedTimeoutCmd(TestPropertyValueCmd):
1805
1820
                     datetime.timedelta(seconds=1),
1806
1821
                     datetime.timedelta(weeks=1),
1807
1822
                     datetime.timedelta(weeks=52)]
1808
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1823
    values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1809
1824
 
1810
1825
 
1811
1826
class TestSetIntervalCmd(TestPropertyValueCmd):
1816
1831
                     datetime.timedelta(seconds=1),
1817
1832
                     datetime.timedelta(weeks=1),
1818
1833
                     datetime.timedelta(weeks=52)]
1819
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1834
    values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1820
1835
 
1821
1836
 
1822
1837
class TestSetApprovalDelayCmd(TestPropertyValueCmd):
1827
1842
                     datetime.timedelta(seconds=1),
1828
1843
                     datetime.timedelta(weeks=1),
1829
1844
                     datetime.timedelta(weeks=52)]
1830
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1845
    values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1831
1846
 
1832
1847
 
1833
1848
class TestSetApprovalDurationCmd(TestPropertyValueCmd):
1838
1853
                     datetime.timedelta(seconds=1),
1839
1854
                     datetime.timedelta(weeks=1),
1840
1855
                     datetime.timedelta(weeks=52)]
1841
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1856
    values_to_get = [dt.total_seconds()*1000 for dt in values_to_set]
1842
1857
 
1843
1858
 
1844
1859