/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-13 22:21:45 UTC
  • mto: This revision was merged to the branch mainline in revision 382.
  • Revision ID: teddy@recompile.se-20190313222145-2icvxn9caz7zcr0c
mandos-ctl: Refactor

* mandos-ctl (PrintTableCmd.TableOfClients.__init__): Remove unused
                                                      parameter
                                                      "tableheaders".

Show diffs side-by-side

added added

removed removed

Lines of Context:
577
577
                    "LastApprovalRequest", "ApprovalDelay",
578
578
                    "ApprovalDuration", "Checker", "ExtendedTimeout",
579
579
                    "Expires", "LastCheckerStatus")
 
580
 
580
581
    def run(self, clients, bus=None, mandos=None):
581
582
        print(self.output(clients.values()))
 
583
 
582
584
    def output(self, clients):
583
585
        raise NotImplementedError()
584
586
 
588
590
        data = {client["Name"]:
589
591
                {key: self.dbus_boolean_to_bool(client[key])
590
592
                 for key in self.all_keywords}
591
 
                for client in clients.values()}
 
593
                for client in clients}
592
594
        return json.dumps(data, indent=4, separators=(',', ': '))
 
595
 
593
596
    @staticmethod
594
597
    def dbus_boolean_to_bool(value):
595
598
        if isinstance(value, dbus.Boolean):
633
636
            "LastCheckerStatus": "Last Checker Status",
634
637
        }
635
638
 
636
 
        def __init__(self, clients, keywords, tableheaders=None):
 
639
        def __init__(self, clients, keywords):
637
640
            self.clients = clients
638
641
            self.keywords = keywords
639
 
            if tableheaders is not None:
640
 
                self.tableheaders = tableheaders
641
642
 
642
643
        def __str__(self):
643
644
            return "\n".join(self.rows())
696
697
 
697
698
class PropertyCmd(Command):
698
699
    """Abstract class for Actions for setting one client property"""
 
700
 
699
701
    def run_on_one_client(self, client, properties):
700
702
        """Set the Client's D-Bus property"""
701
703
        log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
707
709
        client.Set(client_dbus_interface, self.propname,
708
710
                   self.value_to_set,
709
711
                   dbus_interface=dbus.PROPERTIES_IFACE)
 
712
 
710
713
    @property
711
714
    def propname(self):
712
715
        raise NotImplementedError()
763
766
 
764
767
class SetSecretCmd(PropertyValueCmd):
765
768
    propname = "Secret"
 
769
 
766
770
    @property
767
771
    def value_to_set(self):
768
772
        return self._vts
 
773
 
769
774
    @value_to_set.setter
770
775
    def value_to_set(self, value):
771
776
        """When setting, read data from supplied file object"""
776
781
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
777
782
    """Abstract class for PropertyValueCmd taking a value argument as
778
783
a datetime.timedelta() but should store it as milliseconds."""
 
784
 
779
785
    @property
780
786
    def value_to_set(self):
781
787
        return self._vts
 
788
 
782
789
    @value_to_set.setter
783
790
    def value_to_set(self, value):
784
791
        """When setting, convert value from a datetime.timedelta"""
816
823
                         datetime.timedelta(0, 1))
817
824
        self.assertEqual(string_to_delta("PT2H"),
818
825
                         datetime.timedelta(0, 7200))
 
826
 
819
827
    def test_falls_back_to_pre_1_6_1_with_warning(self):
820
828
        # assertLogs only exists in Python 3.4
821
829
        if hasattr(self, "assertLogs"):
840
848
 
841
849
 
842
850
class Test_check_option_syntax(unittest.TestCase):
 
851
    def setUp(self):
 
852
        self.parser = argparse.ArgumentParser()
 
853
        add_command_line_options(self.parser)
 
854
 
 
855
    def test_actions_requires_client_or_all(self):
 
856
        for action, value in self.actions.items():
 
857
            options = self.parser.parse_args()
 
858
            setattr(options, action, value)
 
859
            with self.assertParseError():
 
860
                self.check_option_syntax(options)
 
861
 
843
862
    # This mostly corresponds to the definition from has_actions() in
844
863
    # check_option_syntax()
845
864
    actions = {
866
885
        "deny": True,
867
886
    }
868
887
 
869
 
    def setUp(self):
870
 
        self.parser = argparse.ArgumentParser()
871
 
        add_command_line_options(self.parser)
872
 
 
873
888
    @contextlib.contextmanager
874
889
    def assertParseError(self):
875
890
        with self.assertRaises(SystemExit) as e:
897
912
    def check_option_syntax(self, options):
898
913
        check_option_syntax(self.parser, options)
899
914
 
900
 
    def test_actions_requires_client_or_all(self):
901
 
        for action, value in self.actions.items():
902
 
            options = self.parser.parse_args()
903
 
            setattr(options, action, value)
904
 
            with self.assertParseError():
905
 
                self.check_option_syntax(options)
906
 
 
907
915
    def test_actions_conflicts_with_verbose(self):
908
916
        for action, value in self.actions.items():
909
917
            options = self.parser.parse_args()
971
979
                self.check_option_syntax(options)
972
980
 
973
981
 
974
 
class Test_command_from_options(unittest.TestCase):
 
982
class Test_commands_from_options(unittest.TestCase):
975
983
    def setUp(self):
976
984
        self.parser = argparse.ArgumentParser()
977
985
        add_command_line_options(self.parser)
 
986
 
 
987
    def test_is_enabled(self):
 
988
        self.assert_command_from_args(["--is-enabled", "foo"],
 
989
                                      IsEnabledCmd)
 
990
 
978
991
    def assert_command_from_args(self, args, command_cls,
979
992
                                 **cmd_attrs):
980
993
        """Assert that parsing ARGS should result in an instance of
987
1000
        self.assertIsInstance(command, command_cls)
988
1001
        for key, value in cmd_attrs.items():
989
1002
            self.assertEqual(getattr(command, key), value)
990
 
    def test_print_table(self):
991
 
        self.assert_command_from_args([], PrintTableCmd,
992
 
                                      verbose=False)
993
 
 
994
 
    def test_print_table_verbose(self):
995
 
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
996
 
                                      verbose=True)
997
 
 
998
 
    def test_print_table_verbose_short(self):
999
 
        self.assert_command_from_args(["-v"], PrintTableCmd,
1000
 
                                      verbose=True)
 
1003
 
 
1004
    def test_is_enabled_short(self):
 
1005
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
 
1006
 
 
1007
    def test_approve(self):
 
1008
        self.assert_command_from_args(["--approve", "foo"],
 
1009
                                      ApproveCmd)
 
1010
 
 
1011
    def test_approve_short(self):
 
1012
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
 
1013
 
 
1014
    def test_deny(self):
 
1015
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
 
1016
 
 
1017
    def test_deny_short(self):
 
1018
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
 
1019
 
 
1020
    def test_remove(self):
 
1021
        self.assert_command_from_args(["--remove", "foo"],
 
1022
                                      RemoveCmd)
 
1023
 
 
1024
    def test_deny_before_remove(self):
 
1025
        options = self.parser.parse_args(["--deny", "--remove",
 
1026
                                          "foo"])
 
1027
        check_option_syntax(self.parser, options)
 
1028
        commands = commands_from_options(options)
 
1029
        self.assertEqual(len(commands), 2)
 
1030
        self.assertIsInstance(commands[0], DenyCmd)
 
1031
        self.assertIsInstance(commands[1], RemoveCmd)
 
1032
 
 
1033
    def test_deny_before_remove_reversed(self):
 
1034
        options = self.parser.parse_args(["--remove", "--deny",
 
1035
                                          "--all"])
 
1036
        check_option_syntax(self.parser, options)
 
1037
        commands = commands_from_options(options)
 
1038
        self.assertEqual(len(commands), 2)
 
1039
        self.assertIsInstance(commands[0], DenyCmd)
 
1040
        self.assertIsInstance(commands[1], RemoveCmd)
 
1041
 
 
1042
    def test_remove_short(self):
 
1043
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
 
1044
 
 
1045
    def test_dump_json(self):
 
1046
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1001
1047
 
1002
1048
    def test_enable(self):
1003
1049
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1027
1073
        self.assert_command_from_args(["--stop-checker", "foo"],
1028
1074
                                      StopCheckerCmd)
1029
1075
 
1030
 
    def test_remove(self):
1031
 
        self.assert_command_from_args(["--remove", "foo"],
1032
 
                                      RemoveCmd)
 
1076
    def test_approve_by_default(self):
 
1077
        self.assert_command_from_args(["--approve-by-default", "foo"],
 
1078
                                      ApproveByDefaultCmd)
1033
1079
 
1034
 
    def test_remove_short(self):
1035
 
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
 
1080
    def test_deny_by_default(self):
 
1081
        self.assert_command_from_args(["--deny-by-default", "foo"],
 
1082
                                      DenyByDefaultCmd)
1036
1083
 
1037
1084
    def test_checker(self):
1038
1085
        self.assert_command_from_args(["--checker", ":", "foo"],
1046
1093
        self.assert_command_from_args(["-c", ":", "foo"],
1047
1094
                                      SetCheckerCmd, value_to_set=":")
1048
1095
 
 
1096
    def test_host(self):
 
1097
        self.assert_command_from_args(["--host", "foo.example.org",
 
1098
                                       "foo"], SetHostCmd,
 
1099
                                      value_to_set="foo.example.org")
 
1100
 
 
1101
    def test_host_short(self):
 
1102
        self.assert_command_from_args(["-H", "foo.example.org",
 
1103
                                       "foo"], SetHostCmd,
 
1104
                                      value_to_set="foo.example.org")
 
1105
 
 
1106
    def test_secret_devnull(self):
 
1107
        self.assert_command_from_args(["--secret", os.path.devnull,
 
1108
                                       "foo"], SetSecretCmd,
 
1109
                                      value_to_set=b"")
 
1110
 
 
1111
    def test_secret_tempfile(self):
 
1112
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1113
            value = b"secret\0xyzzy\nbar"
 
1114
            f.write(value)
 
1115
            f.seek(0)
 
1116
            self.assert_command_from_args(["--secret", f.name,
 
1117
                                           "foo"], SetSecretCmd,
 
1118
                                          value_to_set=value)
 
1119
 
 
1120
    def test_secret_devnull_short(self):
 
1121
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
 
1122
                                      SetSecretCmd, value_to_set=b"")
 
1123
 
 
1124
    def test_secret_tempfile_short(self):
 
1125
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1126
            value = b"secret\0xyzzy\nbar"
 
1127
            f.write(value)
 
1128
            f.seek(0)
 
1129
            self.assert_command_from_args(["-s", f.name, "foo"],
 
1130
                                          SetSecretCmd,
 
1131
                                          value_to_set=value)
 
1132
 
1049
1133
    def test_timeout(self):
1050
1134
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1051
1135
                                      SetTimeoutCmd,
1072
1156
                                      SetIntervalCmd,
1073
1157
                                      value_to_set=120000)
1074
1158
 
1075
 
    def test_approve_by_default(self):
1076
 
        self.assert_command_from_args(["--approve-by-default", "foo"],
1077
 
                                      ApproveByDefaultCmd)
1078
 
 
1079
 
    def test_deny_by_default(self):
1080
 
        self.assert_command_from_args(["--deny-by-default", "foo"],
1081
 
                                      DenyByDefaultCmd)
1082
 
 
1083
1159
    def test_approval_delay(self):
1084
1160
        self.assert_command_from_args(["--approval-delay", "PT30S",
1085
1161
                                       "foo"], SetApprovalDelayCmd,
1090
1166
                                       "foo"], SetApprovalDurationCmd,
1091
1167
                                      value_to_set=1000)
1092
1168
 
1093
 
    def test_host(self):
1094
 
        self.assert_command_from_args(["--host", "foo.example.org",
1095
 
                                       "foo"], SetHostCmd,
1096
 
                                      value_to_set="foo.example.org")
1097
 
 
1098
 
    def test_host_short(self):
1099
 
        self.assert_command_from_args(["-H", "foo.example.org",
1100
 
                                       "foo"], SetHostCmd,
1101
 
                                      value_to_set="foo.example.org")
1102
 
 
1103
 
    def test_secret_devnull(self):
1104
 
        self.assert_command_from_args(["--secret", os.path.devnull,
1105
 
                                       "foo"], SetSecretCmd,
1106
 
                                      value_to_set=b"")
1107
 
 
1108
 
    def test_secret_tempfile(self):
1109
 
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
1110
 
            value = b"secret\0xyzzy\nbar"
1111
 
            f.write(value)
1112
 
            f.seek(0)
1113
 
            self.assert_command_from_args(["--secret", f.name,
1114
 
                                           "foo"], SetSecretCmd,
1115
 
                                          value_to_set=value)
1116
 
 
1117
 
    def test_secret_devnull_short(self):
1118
 
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1119
 
                                      SetSecretCmd, value_to_set=b"")
1120
 
 
1121
 
    def test_secret_tempfile_short(self):
1122
 
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
1123
 
            value = b"secret\0xyzzy\nbar"
1124
 
            f.write(value)
1125
 
            f.seek(0)
1126
 
            self.assert_command_from_args(["-s", f.name, "foo"],
1127
 
                                          SetSecretCmd,
1128
 
                                          value_to_set=value)
1129
 
 
1130
 
    def test_approve(self):
1131
 
        self.assert_command_from_args(["--approve", "foo"],
1132
 
                                      ApproveCmd)
1133
 
 
1134
 
    def test_approve_short(self):
1135
 
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1136
 
 
1137
 
    def test_deny(self):
1138
 
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1139
 
 
1140
 
    def test_deny_short(self):
1141
 
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
1142
 
 
1143
 
    def test_dump_json(self):
1144
 
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1145
 
 
1146
 
    def test_is_enabled(self):
1147
 
        self.assert_command_from_args(["--is-enabled", "foo"],
1148
 
                                      IsEnabledCmd)
1149
 
 
1150
 
    def test_is_enabled_short(self):
1151
 
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1152
 
 
1153
 
    def test_deny_before_remove(self):
1154
 
        options = self.parser.parse_args(["--deny", "--remove",
1155
 
                                          "foo"])
1156
 
        check_option_syntax(self.parser, options)
1157
 
        commands = commands_from_options(options)
1158
 
        self.assertEqual(len(commands), 2)
1159
 
        self.assertIsInstance(commands[0], DenyCmd)
1160
 
        self.assertIsInstance(commands[1], RemoveCmd)
1161
 
 
1162
 
    def test_deny_before_remove_reversed(self):
1163
 
        options = self.parser.parse_args(["--remove", "--deny",
1164
 
                                          "--all"])
1165
 
        check_option_syntax(self.parser, options)
1166
 
        commands = commands_from_options(options)
1167
 
        self.assertEqual(len(commands), 2)
1168
 
        self.assertIsInstance(commands[0], DenyCmd)
1169
 
        self.assertIsInstance(commands[1], RemoveCmd)
 
1169
    def test_print_table(self):
 
1170
        self.assert_command_from_args([], PrintTableCmd,
 
1171
                                      verbose=False)
 
1172
 
 
1173
    def test_print_table_verbose(self):
 
1174
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
 
1175
                                      verbose=True)
 
1176
 
 
1177
    def test_print_table_verbose_short(self):
 
1178
        self.assert_command_from_args(["-v"], PrintTableCmd,
 
1179
                                      verbose=True)
1170
1180
 
1171
1181
 
1172
1182
class TestCmd(unittest.TestCase):
1173
1183
    """Abstract class for tests of command classes"""
 
1184
 
1174
1185
    def setUp(self):
1175
1186
        testcase = self
1176
1187
        class MockClient(object):
1248
1259
                ("/clients/barbar", self.other_client.attributes),
1249
1260
            ])
1250
1261
        self.one_client = {"/clients/foo": self.client.attributes}
 
1262
 
1251
1263
    @property
1252
1264
    def bus(self):
1253
1265
        class Bus(object):
1255
1267
            def get_object(client_bus_name, path):
1256
1268
                self.assertEqual(client_bus_name, dbus_busname)
1257
1269
                return {
 
1270
                    # Note: "self" here is the TestCmd instance, not
 
1271
                    # the Bus instance, since this is a static method!
1258
1272
                    "/clients/foo": self.client,
1259
1273
                    "/clients/barbar": self.other_client,
1260
1274
                }[path]
1267
1281
                                                      properties)
1268
1282
                            for client, properties
1269
1283
                            in self.clients.items()))
 
1284
 
1270
1285
    def test_is_enabled_run_exits_successfully(self):
1271
1286
        with self.assertRaises(SystemExit) as e:
1272
1287
            IsEnabledCmd().run(self.one_client)
1274
1289
            self.assertEqual(e.exception.code, 0)
1275
1290
        else:
1276
1291
            self.assertIsNone(e.exception.code)
 
1292
 
1277
1293
    def test_is_enabled_run_exits_with_failure(self):
1278
1294
        self.client.attributes["Enabled"] = dbus.Boolean(False)
1279
1295
        with self.assertRaises(SystemExit) as e:
1301
1317
            self.assertIn(("Approve", (False, client_dbus_interface)),
1302
1318
                          client.calls)
1303
1319
 
 
1320
 
1304
1321
class TestRemoveCmd(TestCmd):
1305
1322
    def test_remove(self):
1306
1323
        class MockMandos(object):
1370
1387
            },
1371
1388
        }
1372
1389
        return super(TestDumpJSONCmd, self).setUp()
 
1390
 
1373
1391
    def test_normal(self):
1374
 
        json_data = json.loads(DumpJSONCmd().output(self.clients))
 
1392
        output = DumpJSONCmd().output(self.clients.values())
 
1393
        json_data = json.loads(output)
1375
1394
        self.assertDictEqual(json_data, self.expected_json)
 
1395
 
1376
1396
    def test_one_client(self):
1377
 
        clients = self.one_client
1378
 
        json_data = json.loads(DumpJSONCmd().output(clients))
 
1397
        output = DumpJSONCmd().output(self.one_client.values())
 
1398
        json_data = json.loads(output)
1379
1399
        expected_json = {"foo": self.expected_json["foo"]}
1380
1400
        self.assertDictEqual(json_data, expected_json)
1381
1401
 
1389
1409
            "barbar Yes     00:05:00 2019-02-04T00:00:00  ",
1390
1410
        ))
1391
1411
        self.assertEqual(output, expected_output)
 
1412
 
1392
1413
    def test_verbose(self):
1393
1414
        output = PrintTableCmd(verbose=True).output(
1394
1415
            self.clients.values())
1483
1504
                                            for rows in columns)
1484
1505
                                    for line in range(num_lines))
1485
1506
        self.assertEqual(output, expected_output)
 
1507
 
1486
1508
    def test_one_client(self):
1487
1509
        output = PrintTableCmd().output(self.one_client.values())
1488
1510
        expected_output = "\n".join((
1492
1514
        self.assertEqual(output, expected_output)
1493
1515
 
1494
1516
 
1495
 
class Unique(object):
1496
 
    """Class for objects which exist only to be unique objects, since
1497
 
unittest.mock.sentinel only exists in Python 3.3"""
1498
 
 
1499
 
 
1500
1517
class TestPropertyCmd(TestCmd):
1501
1518
    """Abstract class for tests of PropertyCmd classes"""
1502
1519
    def runTest(self):
1509
1526
            for clientpath in self.clients:
1510
1527
                client = self.bus.get_object(dbus_busname, clientpath)
1511
1528
                old_value = client.attributes[self.propname]
1512
 
                self.assertNotIsInstance(old_value, Unique)
1513
 
                client.attributes[self.propname] = Unique()
 
1529
                self.assertNotIsInstance(old_value, self.Unique)
 
1530
                client.attributes[self.propname] = self.Unique()
1514
1531
            self.run_command(value_to_set, self.clients)
1515
1532
            for clientpath in self.clients:
1516
1533
                client = self.bus.get_object(dbus_busname, clientpath)
1517
1534
                value = client.attributes[self.propname]
1518
 
                self.assertNotIsInstance(value, Unique)
 
1535
                self.assertNotIsInstance(value, self.Unique)
1519
1536
                self.assertEqual(value, value_to_get)
 
1537
 
 
1538
    class Unique(object):
 
1539
        """Class for objects which exist only to be unique objects,
 
1540
since unittest.mock.sentinel only exists in Python 3.3"""
 
1541
 
1520
1542
    def run_command(self, value, clients):
1521
1543
        self.command().run(clients, self.bus)
1522
1544
 
1565
1587
 
1566
1588
class TestPropertyValueCmd(TestPropertyCmd):
1567
1589
    """Abstract class for tests of PropertyValueCmd classes"""
 
1590
 
1568
1591
    def runTest(self):
1569
1592
        if type(self) is TestPropertyValueCmd:
1570
1593
            return
1571
1594
        return super(TestPropertyValueCmd, self).runTest()
 
1595
 
1572
1596
    def run_command(self, value, clients):
1573
1597
        self.command(value).run(clients, self.bus)
1574
1598