/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-12 20:37:00 UTC
  • mto: This revision was merged to the branch mainline in revision 382.
  • Revision ID: teddy@recompile.se-20190312203700-psenofay84b3szip
mandos-ctl: Refactor

* mandos-ctl (TestEnableCmd, TestDisableCmd): Rewrite to inherit from
                                              TestPropertyCmd instead
                                              of TestCmd.

Show diffs side-by-side

added added

removed removed

Lines of Context:
93
93
    if options.debug:
94
94
        log.setLevel(logging.DEBUG)
95
95
 
96
 
    bus = dbus.SystemBus()
97
 
 
98
 
    mandos_dbus_object = get_mandos_dbus_object(bus)
99
 
 
100
 
    mandos_serv = dbus.Interface(
101
 
        mandos_dbus_object, dbus_interface=server_dbus_interface)
 
96
    try:
 
97
        bus = dbus.SystemBus()
 
98
        log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
 
99
                  dbus_busname, server_dbus_path)
 
100
        mandos_dbus_objc = bus.get_object(dbus_busname,
 
101
                                          server_dbus_path)
 
102
    except dbus.exceptions.DBusException:
 
103
        log.critical("Could not connect to Mandos server")
 
104
        sys.exit(1)
 
105
 
 
106
    mandos_serv = dbus.Interface(mandos_dbus_objc,
 
107
                                 dbus_interface=server_dbus_interface)
102
108
    mandos_serv_object_manager = dbus.Interface(
103
 
        mandos_dbus_object, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
104
 
 
105
 
    managed_objects = get_managed_objects(mandos_serv_object_manager)
106
 
 
107
 
    all_clients = {}
108
 
    for path, ifs_and_props in managed_objects.items():
109
 
        try:
110
 
            all_clients[path] = ifs_and_props[client_dbus_interface]
111
 
        except KeyError:
112
 
            pass
113
 
 
114
 
    # Compile dict of (clientpath: properties) to process
 
109
        mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
 
110
 
 
111
    # Filter out log message from dbus module
 
112
    dbus_logger = logging.getLogger("dbus.proxies")
 
113
    class NullFilter(logging.Filter):
 
114
        def filter(self, record):
 
115
            return False
 
116
    dbus_filter = NullFilter()
 
117
    try:
 
118
        dbus_logger.addFilter(dbus_filter)
 
119
        log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
 
120
                  server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
 
121
        mandos_clients = {path: ifs_and_props[client_dbus_interface]
 
122
                          for path, ifs_and_props in
 
123
                          mandos_serv_object_manager
 
124
                          .GetManagedObjects().items()
 
125
                          if client_dbus_interface in ifs_and_props}
 
126
    except dbus.exceptions.DBusException as e:
 
127
        log.critical("Failed to access Mandos server through D-Bus:"
 
128
                     "\n%s", e)
 
129
        sys.exit(1)
 
130
    finally:
 
131
        # restore dbus logger
 
132
        dbus_logger.removeFilter(dbus_filter)
 
133
 
 
134
    # Compile dict of (clients: properties) to process
 
135
    clients = {}
 
136
 
115
137
    if not clientnames:
116
 
        clients = all_clients
 
138
        clients = {objpath: properties
 
139
                   for objpath, properties in mandos_clients.items()}
117
140
    else:
118
 
        clients = {}
119
141
        for name in clientnames:
120
 
            for objpath, properties in all_clients.items():
 
142
            for objpath, properties in mandos_clients.items():
121
143
                if properties["Name"] == name:
122
144
                    clients[objpath] = properties
123
145
                    break
424
446
        options.remove = True
425
447
 
426
448
 
427
 
def get_mandos_dbus_object(bus):
428
 
    log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
429
 
              dbus_busname, server_dbus_path)
430
 
    with if_dbus_exception_log_with_exception_and_exit(
431
 
            "Could not connect to Mandos server: %s"):
432
 
        mandos_dbus_object = bus.get_object(dbus_busname,
433
 
                                            server_dbus_path)
434
 
    return mandos_dbus_object
435
 
 
436
 
 
437
 
@contextlib.contextmanager
438
 
def if_dbus_exception_log_with_exception_and_exit(*args, **kwargs):
439
 
    try:
440
 
        yield
441
 
    except dbus.exceptions.DBusException as e:
442
 
        log.critical(*(args + (e,)), **kwargs)
443
 
        sys.exit(1)
444
 
 
445
 
 
446
 
def get_managed_objects(object_manager):
447
 
    log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
448
 
              server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
449
 
    with if_dbus_exception_log_with_exception_and_exit(
450
 
            "Failed to access Mandos server through D-Bus:\n%s"):
451
 
        with SilenceLogger("dbus.proxies"):
452
 
            managed_objects = object_manager.GetManagedObjects()
453
 
    return managed_objects
454
 
 
455
 
 
456
 
class SilenceLogger(object):
457
 
    "Simple context manager to silence a particular logger"
458
 
    def __init__(self, loggername):
459
 
        self.logger = logging.getLogger(loggername)
460
 
 
461
 
    def __enter__(self):
462
 
        self.logger.addFilter(self.nullfilter)
463
 
        return self
464
 
 
465
 
    class NullFilter(logging.Filter):
466
 
        def filter(self, record):
467
 
            return False
468
 
 
469
 
    nullfilter = NullFilter()
470
 
 
471
 
    def __exit__(self, exc_type, exc_val, exc_tb):
472
 
        self.logger.removeFilter(self.nullfilter)
473
 
 
474
 
 
475
449
def commands_from_options(options):
476
450
 
477
451
    commands = []
603
577
                    "LastApprovalRequest", "ApprovalDelay",
604
578
                    "ApprovalDuration", "Checker", "ExtendedTimeout",
605
579
                    "Expires", "LastCheckerStatus")
606
 
 
607
580
    def run(self, clients, bus=None, mandos=None):
608
581
        print(self.output(clients.values()))
609
 
 
610
582
    def output(self, clients):
611
583
        raise NotImplementedError()
612
584
 
616
588
        data = {client["Name"]:
617
589
                {key: self.dbus_boolean_to_bool(client[key])
618
590
                 for key in self.all_keywords}
619
 
                for client in clients}
 
591
                for client in clients.values()}
620
592
        return json.dumps(data, indent=4, separators=(',', ': '))
621
 
 
622
593
    @staticmethod
623
594
    def dbus_boolean_to_bool(value):
624
595
        if isinstance(value, dbus.Boolean):
662
633
            "LastCheckerStatus": "Last Checker Status",
663
634
        }
664
635
 
665
 
        def __init__(self, clients, keywords):
 
636
        def __init__(self, clients, keywords, tableheaders=None):
666
637
            self.clients = clients
667
638
            self.keywords = keywords
 
639
            if tableheaders is not None:
 
640
                self.tableheaders = tableheaders
668
641
 
669
642
        def __str__(self):
670
643
            return "\n".join(self.rows())
723
696
 
724
697
class PropertyCmd(Command):
725
698
    """Abstract class for Actions for setting one client property"""
726
 
 
727
699
    def run_on_one_client(self, client, properties):
728
700
        """Set the Client's D-Bus property"""
729
701
        log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
735
707
        client.Set(client_dbus_interface, self.propname,
736
708
                   self.value_to_set,
737
709
                   dbus_interface=dbus.PROPERTIES_IFACE)
738
 
 
739
710
    @property
740
711
    def propname(self):
741
712
        raise NotImplementedError()
792
763
 
793
764
class SetSecretCmd(PropertyValueCmd):
794
765
    propname = "Secret"
795
 
 
796
766
    @property
797
767
    def value_to_set(self):
798
768
        return self._vts
799
 
 
800
769
    @value_to_set.setter
801
770
    def value_to_set(self, value):
802
771
        """When setting, read data from supplied file object"""
807
776
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
808
777
    """Abstract class for PropertyValueCmd taking a value argument as
809
778
a datetime.timedelta() but should store it as milliseconds."""
810
 
 
811
779
    @property
812
780
    def value_to_set(self):
813
781
        return self._vts
814
 
 
815
782
    @value_to_set.setter
816
783
    def value_to_set(self, value):
817
784
        """When setting, convert value from a datetime.timedelta"""
849
816
                         datetime.timedelta(0, 1))
850
817
        self.assertEqual(string_to_delta("PT2H"),
851
818
                         datetime.timedelta(0, 7200))
852
 
 
853
819
    def test_falls_back_to_pre_1_6_1_with_warning(self):
854
820
        # assertLogs only exists in Python 3.4
855
821
        if hasattr(self, "assertLogs"):
874
840
 
875
841
 
876
842
class Test_check_option_syntax(unittest.TestCase):
877
 
    def setUp(self):
878
 
        self.parser = argparse.ArgumentParser()
879
 
        add_command_line_options(self.parser)
880
 
 
881
 
    def test_actions_requires_client_or_all(self):
882
 
        for action, value in self.actions.items():
883
 
            options = self.parser.parse_args()
884
 
            setattr(options, action, value)
885
 
            with self.assertParseError():
886
 
                self.check_option_syntax(options)
887
 
 
888
843
    # This mostly corresponds to the definition from has_actions() in
889
844
    # check_option_syntax()
890
845
    actions = {
911
866
        "deny": True,
912
867
    }
913
868
 
 
869
    def setUp(self):
 
870
        self.parser = argparse.ArgumentParser()
 
871
        add_command_line_options(self.parser)
 
872
 
914
873
    @contextlib.contextmanager
915
874
    def assertParseError(self):
916
875
        with self.assertRaises(SystemExit) as e:
938
897
    def check_option_syntax(self, options):
939
898
        check_option_syntax(self.parser, options)
940
899
 
 
900
    def test_actions_requires_client_or_all(self):
 
901
        for action, value in self.actions.items():
 
902
            options = self.parser.parse_args()
 
903
            setattr(options, action, value)
 
904
            with self.assertParseError():
 
905
                self.check_option_syntax(options)
 
906
 
941
907
    def test_actions_conflicts_with_verbose(self):
942
908
        for action, value in self.actions.items():
943
909
            options = self.parser.parse_args()
1005
971
                self.check_option_syntax(options)
1006
972
 
1007
973
 
1008
 
class Test_get_mandos_dbus_object(unittest.TestCase):
1009
 
    def test_calls_and_returns_get_object_on_bus(self):
1010
 
        class MockBus(object):
1011
 
            called = False
1012
 
            def get_object(mockbus_self, busname, dbus_path):
1013
 
                # Note that "self" is still the testcase instance,
1014
 
                # this MockBus instance is in "mockbus_self".
1015
 
                self.assertEqual(busname, dbus_busname)
1016
 
                self.assertEqual(dbus_path, server_dbus_path)
1017
 
                mockbus_self.called = True
1018
 
                return mockbus_self
1019
 
 
1020
 
        mockbus = get_mandos_dbus_object(bus=MockBus())
1021
 
        self.assertIsInstance(mockbus, MockBus)
1022
 
        self.assertTrue(mockbus.called)
1023
 
 
1024
 
    def test_logs_and_exits_on_dbus_error(self):
1025
 
        class MockBusFailing(object):
1026
 
            def get_object(self, busname, dbus_path):
1027
 
                raise dbus.exceptions.DBusException("Test")
1028
 
 
1029
 
        # assertLogs only exists in Python 3.4
1030
 
        if hasattr(self, "assertLogs"):
1031
 
            with self.assertLogs(log, logging.CRITICAL):
1032
 
                with self.assertRaises(SystemExit) as e:
1033
 
                    bus = get_mandos_dbus_object(bus=MockBus())
1034
 
        else:
1035
 
            critical_filter = self.CriticalFilter()
1036
 
            log.addFilter(critical_filter)
1037
 
            try:
1038
 
                with self.assertRaises(SystemExit) as e:
1039
 
                    get_mandos_dbus_object(bus=MockBusFailing())
1040
 
            finally:
1041
 
                log.removeFilter(critical_filter)
1042
 
            self.assertTrue(critical_filter.found)
1043
 
        if isinstance(e.exception.code, int):
1044
 
            self.assertNotEqual(e.exception.code, 0)
1045
 
        else:
1046
 
            self.assertIsNotNone(e.exception.code)
1047
 
 
1048
 
    class CriticalFilter(logging.Filter):
1049
 
        """Don't show, but register, critical messages"""
1050
 
        found = False
1051
 
        def filter(self, record):
1052
 
            is_critical = record.levelno >= logging.CRITICAL
1053
 
            self.found = is_critical or self.found
1054
 
            return not is_critical
1055
 
 
1056
 
 
1057
 
class Test_get_managed_objects(unittest.TestCase):
1058
 
    def test_calls_and_returns_GetManagedObjects(self):
1059
 
        managed_objects = {"/clients/foo": { "Name": "foo"}}
1060
 
        class MockObjectManager(object):
1061
 
            @staticmethod
1062
 
            def GetManagedObjects():
1063
 
                return managed_objects
1064
 
        retval = get_managed_objects(MockObjectManager())
1065
 
        self.assertDictEqual(managed_objects, retval)
1066
 
 
1067
 
    def test_logs_and_exits_on_dbus_error(self):
1068
 
        class MockObjectManagerFailing(object):
1069
 
            @staticmethod
1070
 
            def GetManagedObjects():
1071
 
                raise dbus.exceptions.DBusException("Test")
1072
 
 
1073
 
        if hasattr(self, "assertLogs"):
1074
 
            with self.assertLogs(log, logging.CRITICAL):
1075
 
                with self.assertRaises(SystemExit):
1076
 
                    get_managed_objects(MockObjectManagerFailing())
1077
 
        else:
1078
 
            critical_filter = self.CriticalFilter()
1079
 
            log.addFilter(critical_filter)
1080
 
            try:
1081
 
                with self.assertRaises(SystemExit) as e:
1082
 
                    get_managed_objects(MockObjectManagerFailing())
1083
 
            finally:
1084
 
                log.removeFilter(critical_filter)
1085
 
            self.assertTrue(critical_filter.found)
1086
 
        if isinstance(e.exception.code, int):
1087
 
            self.assertNotEqual(e.exception.code, 0)
1088
 
        else:
1089
 
            self.assertIsNotNone(e.exception.code)
1090
 
 
1091
 
    class CriticalFilter(logging.Filter):
1092
 
        """Don't show, but register, critical messages"""
1093
 
        found = False
1094
 
        def filter(self, record):
1095
 
            is_critical = record.levelno >= logging.CRITICAL
1096
 
            self.found = is_critical or self.found
1097
 
            return not is_critical
1098
 
 
1099
 
 
1100
 
class Test_SilenceLogger(unittest.TestCase):
1101
 
    loggername = "mandos-ctl.Test_SilenceLogger"
1102
 
    log = logging.getLogger(loggername)
1103
 
    log.propagate = False
1104
 
    log.addHandler(logging.NullHandler())
1105
 
 
1106
 
    def setUp(self):
1107
 
        self.counting_filter = self.CountingFilter()
1108
 
 
1109
 
    class CountingFilter(logging.Filter):
1110
 
        "Count number of records"
1111
 
        count = 0
1112
 
        def filter(self, record):
1113
 
            self.count += 1
1114
 
            return True
1115
 
 
1116
 
    def test_should_filter_records_only_when_active(self):
1117
 
        try:
1118
 
            with SilenceLogger(self.loggername):
1119
 
                self.log.addFilter(self.counting_filter)
1120
 
                self.log.info("Filtered log message 1")
1121
 
            self.log.info("Non-filtered message 2")
1122
 
            self.log.info("Non-filtered message 3")
1123
 
        finally:
1124
 
            self.log.removeFilter(self.counting_filter)
1125
 
        self.assertEqual(self.counting_filter.count, 2)
1126
 
 
1127
 
 
1128
 
class Test_commands_from_options(unittest.TestCase):
 
974
class Test_command_from_options(unittest.TestCase):
1129
975
    def setUp(self):
1130
976
        self.parser = argparse.ArgumentParser()
1131
977
        add_command_line_options(self.parser)
1132
 
 
1133
 
    def test_is_enabled(self):
1134
 
        self.assert_command_from_args(["--is-enabled", "foo"],
1135
 
                                      IsEnabledCmd)
1136
 
 
1137
978
    def assert_command_from_args(self, args, command_cls,
1138
979
                                 **cmd_attrs):
1139
980
        """Assert that parsing ARGS should result in an instance of
1146
987
        self.assertIsInstance(command, command_cls)
1147
988
        for key, value in cmd_attrs.items():
1148
989
            self.assertEqual(getattr(command, key), value)
1149
 
 
1150
 
    def test_is_enabled_short(self):
1151
 
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1152
 
 
1153
 
    def test_approve(self):
1154
 
        self.assert_command_from_args(["--approve", "foo"],
1155
 
                                      ApproveCmd)
1156
 
 
1157
 
    def test_approve_short(self):
1158
 
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1159
 
 
1160
 
    def test_deny(self):
1161
 
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1162
 
 
1163
 
    def test_deny_short(self):
1164
 
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
1165
 
 
1166
 
    def test_remove(self):
1167
 
        self.assert_command_from_args(["--remove", "foo"],
1168
 
                                      RemoveCmd)
1169
 
 
1170
 
    def test_deny_before_remove(self):
1171
 
        options = self.parser.parse_args(["--deny", "--remove",
1172
 
                                          "foo"])
1173
 
        check_option_syntax(self.parser, options)
1174
 
        commands = commands_from_options(options)
1175
 
        self.assertEqual(len(commands), 2)
1176
 
        self.assertIsInstance(commands[0], DenyCmd)
1177
 
        self.assertIsInstance(commands[1], RemoveCmd)
1178
 
 
1179
 
    def test_deny_before_remove_reversed(self):
1180
 
        options = self.parser.parse_args(["--remove", "--deny",
1181
 
                                          "--all"])
1182
 
        check_option_syntax(self.parser, options)
1183
 
        commands = commands_from_options(options)
1184
 
        self.assertEqual(len(commands), 2)
1185
 
        self.assertIsInstance(commands[0], DenyCmd)
1186
 
        self.assertIsInstance(commands[1], RemoveCmd)
1187
 
 
1188
 
    def test_remove_short(self):
1189
 
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1190
 
 
1191
 
    def test_dump_json(self):
1192
 
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
 
990
    def test_print_table(self):
 
991
        self.assert_command_from_args([], PrintTableCmd,
 
992
                                      verbose=False)
 
993
 
 
994
    def test_print_table_verbose(self):
 
995
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
 
996
                                      verbose=True)
 
997
 
 
998
    def test_print_table_verbose_short(self):
 
999
        self.assert_command_from_args(["-v"], PrintTableCmd,
 
1000
                                      verbose=True)
1193
1001
 
1194
1002
    def test_enable(self):
1195
1003
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1219
1027
        self.assert_command_from_args(["--stop-checker", "foo"],
1220
1028
                                      StopCheckerCmd)
1221
1029
 
1222
 
    def test_approve_by_default(self):
1223
 
        self.assert_command_from_args(["--approve-by-default", "foo"],
1224
 
                                      ApproveByDefaultCmd)
 
1030
    def test_remove(self):
 
1031
        self.assert_command_from_args(["--remove", "foo"],
 
1032
                                      RemoveCmd)
1225
1033
 
1226
 
    def test_deny_by_default(self):
1227
 
        self.assert_command_from_args(["--deny-by-default", "foo"],
1228
 
                                      DenyByDefaultCmd)
 
1034
    def test_remove_short(self):
 
1035
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1229
1036
 
1230
1037
    def test_checker(self):
1231
1038
        self.assert_command_from_args(["--checker", ":", "foo"],
1239
1046
        self.assert_command_from_args(["-c", ":", "foo"],
1240
1047
                                      SetCheckerCmd, value_to_set=":")
1241
1048
 
1242
 
    def test_host(self):
1243
 
        self.assert_command_from_args(["--host", "foo.example.org",
1244
 
                                       "foo"], SetHostCmd,
1245
 
                                      value_to_set="foo.example.org")
1246
 
 
1247
 
    def test_host_short(self):
1248
 
        self.assert_command_from_args(["-H", "foo.example.org",
1249
 
                                       "foo"], SetHostCmd,
1250
 
                                      value_to_set="foo.example.org")
1251
 
 
1252
 
    def test_secret_devnull(self):
1253
 
        self.assert_command_from_args(["--secret", os.path.devnull,
1254
 
                                       "foo"], SetSecretCmd,
1255
 
                                      value_to_set=b"")
1256
 
 
1257
 
    def test_secret_tempfile(self):
1258
 
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
1259
 
            value = b"secret\0xyzzy\nbar"
1260
 
            f.write(value)
1261
 
            f.seek(0)
1262
 
            self.assert_command_from_args(["--secret", f.name,
1263
 
                                           "foo"], SetSecretCmd,
1264
 
                                          value_to_set=value)
1265
 
 
1266
 
    def test_secret_devnull_short(self):
1267
 
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1268
 
                                      SetSecretCmd, value_to_set=b"")
1269
 
 
1270
 
    def test_secret_tempfile_short(self):
1271
 
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
1272
 
            value = b"secret\0xyzzy\nbar"
1273
 
            f.write(value)
1274
 
            f.seek(0)
1275
 
            self.assert_command_from_args(["-s", f.name, "foo"],
1276
 
                                          SetSecretCmd,
1277
 
                                          value_to_set=value)
1278
 
 
1279
1049
    def test_timeout(self):
1280
1050
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1281
1051
                                      SetTimeoutCmd,
1302
1072
                                      SetIntervalCmd,
1303
1073
                                      value_to_set=120000)
1304
1074
 
 
1075
    def test_approve_by_default(self):
 
1076
        self.assert_command_from_args(["--approve-by-default", "foo"],
 
1077
                                      ApproveByDefaultCmd)
 
1078
 
 
1079
    def test_deny_by_default(self):
 
1080
        self.assert_command_from_args(["--deny-by-default", "foo"],
 
1081
                                      DenyByDefaultCmd)
 
1082
 
1305
1083
    def test_approval_delay(self):
1306
1084
        self.assert_command_from_args(["--approval-delay", "PT30S",
1307
1085
                                       "foo"], SetApprovalDelayCmd,
1312
1090
                                       "foo"], SetApprovalDurationCmd,
1313
1091
                                      value_to_set=1000)
1314
1092
 
1315
 
    def test_print_table(self):
1316
 
        self.assert_command_from_args([], PrintTableCmd,
1317
 
                                      verbose=False)
1318
 
 
1319
 
    def test_print_table_verbose(self):
1320
 
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
1321
 
                                      verbose=True)
1322
 
 
1323
 
    def test_print_table_verbose_short(self):
1324
 
        self.assert_command_from_args(["-v"], PrintTableCmd,
1325
 
                                      verbose=True)
 
1093
    def test_host(self):
 
1094
        self.assert_command_from_args(["--host", "foo.example.org",
 
1095
                                       "foo"], SetHostCmd,
 
1096
                                      value_to_set="foo.example.org")
 
1097
 
 
1098
    def test_host_short(self):
 
1099
        self.assert_command_from_args(["-H", "foo.example.org",
 
1100
                                       "foo"], SetHostCmd,
 
1101
                                      value_to_set="foo.example.org")
 
1102
 
 
1103
    def test_secret_devnull(self):
 
1104
        self.assert_command_from_args(["--secret", os.path.devnull,
 
1105
                                       "foo"], SetSecretCmd,
 
1106
                                      value_to_set=b"")
 
1107
 
 
1108
    def test_secret_tempfile(self):
 
1109
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1110
            value = b"secret\0xyzzy\nbar"
 
1111
            f.write(value)
 
1112
            f.seek(0)
 
1113
            self.assert_command_from_args(["--secret", f.name,
 
1114
                                           "foo"], SetSecretCmd,
 
1115
                                          value_to_set=value)
 
1116
 
 
1117
    def test_secret_devnull_short(self):
 
1118
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
 
1119
                                      SetSecretCmd, value_to_set=b"")
 
1120
 
 
1121
    def test_secret_tempfile_short(self):
 
1122
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1123
            value = b"secret\0xyzzy\nbar"
 
1124
            f.write(value)
 
1125
            f.seek(0)
 
1126
            self.assert_command_from_args(["-s", f.name, "foo"],
 
1127
                                          SetSecretCmd,
 
1128
                                          value_to_set=value)
 
1129
 
 
1130
    def test_approve(self):
 
1131
        self.assert_command_from_args(["--approve", "foo"],
 
1132
                                      ApproveCmd)
 
1133
 
 
1134
    def test_approve_short(self):
 
1135
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
 
1136
 
 
1137
    def test_deny(self):
 
1138
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
 
1139
 
 
1140
    def test_deny_short(self):
 
1141
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
 
1142
 
 
1143
    def test_dump_json(self):
 
1144
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
 
1145
 
 
1146
    def test_is_enabled(self):
 
1147
        self.assert_command_from_args(["--is-enabled", "foo"],
 
1148
                                      IsEnabledCmd)
 
1149
 
 
1150
    def test_is_enabled_short(self):
 
1151
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
 
1152
 
 
1153
    def test_deny_before_remove(self):
 
1154
        options = self.parser.parse_args(["--deny", "--remove",
 
1155
                                          "foo"])
 
1156
        check_option_syntax(self.parser, options)
 
1157
        commands = commands_from_options(options)
 
1158
        self.assertEqual(len(commands), 2)
 
1159
        self.assertIsInstance(commands[0], DenyCmd)
 
1160
        self.assertIsInstance(commands[1], RemoveCmd)
 
1161
 
 
1162
    def test_deny_before_remove_reversed(self):
 
1163
        options = self.parser.parse_args(["--remove", "--deny",
 
1164
                                          "--all"])
 
1165
        check_option_syntax(self.parser, options)
 
1166
        commands = commands_from_options(options)
 
1167
        self.assertEqual(len(commands), 2)
 
1168
        self.assertIsInstance(commands[0], DenyCmd)
 
1169
        self.assertIsInstance(commands[1], RemoveCmd)
1326
1170
 
1327
1171
 
1328
1172
class TestCmd(unittest.TestCase):
1329
1173
    """Abstract class for tests of command classes"""
1330
 
 
1331
1174
    def setUp(self):
1332
1175
        testcase = self
1333
1176
        class MockClient(object):
1405
1248
                ("/clients/barbar", self.other_client.attributes),
1406
1249
            ])
1407
1250
        self.one_client = {"/clients/foo": self.client.attributes}
1408
 
 
1409
1251
    @property
1410
1252
    def bus(self):
1411
1253
        class Bus(object):
1413
1255
            def get_object(client_bus_name, path):
1414
1256
                self.assertEqual(client_bus_name, dbus_busname)
1415
1257
                return {
1416
 
                    # Note: "self" here is the TestCmd instance, not
1417
 
                    # the Bus instance, since this is a static method!
1418
1258
                    "/clients/foo": self.client,
1419
1259
                    "/clients/barbar": self.other_client,
1420
1260
                }[path]
1427
1267
                                                      properties)
1428
1268
                            for client, properties
1429
1269
                            in self.clients.items()))
1430
 
 
1431
1270
    def test_is_enabled_run_exits_successfully(self):
1432
1271
        with self.assertRaises(SystemExit) as e:
1433
1272
            IsEnabledCmd().run(self.one_client)
1435
1274
            self.assertEqual(e.exception.code, 0)
1436
1275
        else:
1437
1276
            self.assertIsNone(e.exception.code)
1438
 
 
1439
1277
    def test_is_enabled_run_exits_with_failure(self):
1440
1278
        self.client.attributes["Enabled"] = dbus.Boolean(False)
1441
1279
        with self.assertRaises(SystemExit) as e:
1463
1301
            self.assertIn(("Approve", (False, client_dbus_interface)),
1464
1302
                          client.calls)
1465
1303
 
1466
 
 
1467
1304
class TestRemoveCmd(TestCmd):
1468
1305
    def test_remove(self):
1469
1306
        class MockMandos(object):
1533
1370
            },
1534
1371
        }
1535
1372
        return super(TestDumpJSONCmd, self).setUp()
1536
 
 
1537
1373
    def test_normal(self):
1538
 
        output = DumpJSONCmd().output(self.clients.values())
1539
 
        json_data = json.loads(output)
 
1374
        json_data = json.loads(DumpJSONCmd().output(self.clients))
1540
1375
        self.assertDictEqual(json_data, self.expected_json)
1541
 
 
1542
1376
    def test_one_client(self):
1543
 
        output = DumpJSONCmd().output(self.one_client.values())
1544
 
        json_data = json.loads(output)
 
1377
        clients = self.one_client
 
1378
        json_data = json.loads(DumpJSONCmd().output(clients))
1545
1379
        expected_json = {"foo": self.expected_json["foo"]}
1546
1380
        self.assertDictEqual(json_data, expected_json)
1547
1381
 
1555
1389
            "barbar Yes     00:05:00 2019-02-04T00:00:00  ",
1556
1390
        ))
1557
1391
        self.assertEqual(output, expected_output)
1558
 
 
1559
1392
    def test_verbose(self):
1560
1393
        output = PrintTableCmd(verbose=True).output(
1561
1394
            self.clients.values())
1650
1483
                                            for rows in columns)
1651
1484
                                    for line in range(num_lines))
1652
1485
        self.assertEqual(output, expected_output)
1653
 
 
1654
1486
    def test_one_client(self):
1655
1487
        output = PrintTableCmd().output(self.one_client.values())
1656
1488
        expected_output = "\n".join((
1660
1492
        self.assertEqual(output, expected_output)
1661
1493
 
1662
1494
 
 
1495
class Unique(object):
 
1496
    """Class for objects which exist only to be unique objects, since
 
1497
unittest.mock.sentinel only exists in Python 3.3"""
 
1498
 
 
1499
 
1663
1500
class TestPropertyCmd(TestCmd):
1664
1501
    """Abstract class for tests of PropertyCmd classes"""
1665
1502
    def runTest(self):
1672
1509
            for clientpath in self.clients:
1673
1510
                client = self.bus.get_object(dbus_busname, clientpath)
1674
1511
                old_value = client.attributes[self.propname]
1675
 
                self.assertNotIsInstance(old_value, self.Unique)
1676
 
                client.attributes[self.propname] = self.Unique()
 
1512
                self.assertNotIsInstance(old_value, Unique)
 
1513
                client.attributes[self.propname] = Unique()
1677
1514
            self.run_command(value_to_set, self.clients)
1678
1515
            for clientpath in self.clients:
1679
1516
                client = self.bus.get_object(dbus_busname, clientpath)
1680
1517
                value = client.attributes[self.propname]
1681
 
                self.assertNotIsInstance(value, self.Unique)
 
1518
                self.assertNotIsInstance(value, Unique)
1682
1519
                self.assertEqual(value, value_to_get)
1683
 
 
1684
 
    class Unique(object):
1685
 
        """Class for objects which exist only to be unique objects,
1686
 
since unittest.mock.sentinel only exists in Python 3.3"""
1687
 
 
1688
1520
    def run_command(self, value, clients):
1689
1521
        self.command().run(clients, self.bus)
1690
1522
 
1733
1565
 
1734
1566
class TestPropertyValueCmd(TestPropertyCmd):
1735
1567
    """Abstract class for tests of PropertyValueCmd classes"""
1736
 
 
1737
1568
    def runTest(self):
1738
1569
        if type(self) is TestPropertyValueCmd:
1739
1570
            return
1740
1571
        return super(TestPropertyValueCmd, self).runTest()
1741
 
 
1742
1572
    def run_command(self, value, clients):
1743
1573
        self.command(value).run(clients, self.bus)
1744
1574
 
1836
1666
    return tests
1837
1667
 
1838
1668
if __name__ == "__main__":
1839
 
    try:
1840
 
        if should_only_run_tests():
1841
 
            # Call using ./tdd-python-script --check [--verbose]
1842
 
            unittest.main()
1843
 
        else:
1844
 
            main()
1845
 
    finally:
1846
 
        logging.shutdown()
 
1669
    if should_only_run_tests():
 
1670
        # Call using ./tdd-python-script --check [--verbose]
 
1671
        unittest.main()
 
1672
    else:
 
1673
        main()