/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-12 20:24:58 UTC
  • mto: This revision was merged to the branch mainline in revision 382.
  • Revision ID: teddy@recompile.se-20190312202458-14d4f7bvc61kwp2l
mandos-ctl: Refactor

* mandos-ctl (milliseconds_to_string): Move to
  "PrintTableCmd.TableOfClients.milliseconds_to_string".
  (PrintTableCmd.TableOfClients.valuetostring): Change to a class
                                                method to be able to
                                                call
                                         cls.milliseconds_to_string().
  (TestCmd.other_client): Change "ApprovalDuration" to a more odd
                          value to compensate for the removal of
                          "Test_milliseconds_to_string", which
                          contained a test for this value.  All users
                          changed.
  (Test_milliseconds_to_string): Remove.

Show diffs side-by-side

added added

removed removed

Lines of Context:
93
93
    if options.debug:
94
94
        log.setLevel(logging.DEBUG)
95
95
 
96
 
    bus = dbus.SystemBus()
97
 
 
98
 
    mandos_dbus_object = get_mandos_dbus_object(bus)
99
 
 
100
 
    mandos_serv = dbus.Interface(
101
 
        mandos_dbus_object, dbus_interface=server_dbus_interface)
 
96
    try:
 
97
        bus = dbus.SystemBus()
 
98
        log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
 
99
                  dbus_busname, server_dbus_path)
 
100
        mandos_dbus_objc = bus.get_object(dbus_busname,
 
101
                                          server_dbus_path)
 
102
    except dbus.exceptions.DBusException:
 
103
        log.critical("Could not connect to Mandos server")
 
104
        sys.exit(1)
 
105
 
 
106
    mandos_serv = dbus.Interface(mandos_dbus_objc,
 
107
                                 dbus_interface=server_dbus_interface)
102
108
    mandos_serv_object_manager = dbus.Interface(
103
 
        mandos_dbus_object, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
104
 
 
105
 
    managed_objects = get_managed_objects(mandos_serv_object_manager)
106
 
 
107
 
    all_clients = {}
108
 
    for path, ifs_and_props in managed_objects.items():
109
 
        try:
110
 
            all_clients[path] = ifs_and_props[client_dbus_interface]
111
 
        except KeyError:
112
 
            pass
113
 
 
114
 
    # Compile dict of (clientpath: properties) to process
 
109
        mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
 
110
 
 
111
    # Filter out log message from dbus module
 
112
    dbus_logger = logging.getLogger("dbus.proxies")
 
113
    class NullFilter(logging.Filter):
 
114
        def filter(self, record):
 
115
            return False
 
116
    dbus_filter = NullFilter()
 
117
    try:
 
118
        dbus_logger.addFilter(dbus_filter)
 
119
        log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
 
120
                  server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
 
121
        mandos_clients = {path: ifs_and_props[client_dbus_interface]
 
122
                          for path, ifs_and_props in
 
123
                          mandos_serv_object_manager
 
124
                          .GetManagedObjects().items()
 
125
                          if client_dbus_interface in ifs_and_props}
 
126
    except dbus.exceptions.DBusException as e:
 
127
        log.critical("Failed to access Mandos server through D-Bus:"
 
128
                     "\n%s", e)
 
129
        sys.exit(1)
 
130
    finally:
 
131
        # restore dbus logger
 
132
        dbus_logger.removeFilter(dbus_filter)
 
133
 
 
134
    # Compile dict of (clients: properties) to process
 
135
    clients = {}
 
136
 
115
137
    if not clientnames:
116
 
        clients = all_clients
 
138
        clients = {objpath: properties
 
139
                   for objpath, properties in mandos_clients.items()}
117
140
    else:
118
 
        clients = {}
119
141
        for name in clientnames:
120
 
            for objpath, properties in all_clients.items():
 
142
            for objpath, properties in mandos_clients.items():
121
143
                if properties["Name"] == name:
122
144
                    clients[objpath] = properties
123
145
                    break
125
147
                log.critical("Client not found on server: %r", name)
126
148
                sys.exit(1)
127
149
 
 
150
    # Run all commands on clients
128
151
    commands = commands_from_options(options)
129
 
 
130
152
    for command in commands:
131
153
        command.run(clients, bus, mandos_serv)
132
154
 
232
254
    >>> rfc3339_duration_to_delta("")
233
255
    Traceback (most recent call last):
234
256
    ...
235
 
    ValueError: Invalid RFC 3339 duration: ""
 
257
    ValueError: Invalid RFC 3339 duration: u''
236
258
    >>> # Must start with "P":
237
259
    >>> rfc3339_duration_to_delta("1D")
238
260
    Traceback (most recent call last):
239
261
    ...
240
 
    ValueError: Invalid RFC 3339 duration: "1D"
 
262
    ValueError: Invalid RFC 3339 duration: u'1D'
241
263
    >>> # Must use correct order
242
264
    >>> rfc3339_duration_to_delta("PT1S2M")
243
265
    Traceback (most recent call last):
244
266
    ...
245
 
    ValueError: Invalid RFC 3339 duration: "PT1S2M"
 
267
    ValueError: Invalid RFC 3339 duration: u'PT1S2M'
246
268
    >>> # Time needs time marker
247
269
    >>> rfc3339_duration_to_delta("P1H2S")
248
270
    Traceback (most recent call last):
249
271
    ...
250
 
    ValueError: Invalid RFC 3339 duration: "P1H2S"
 
272
    ValueError: Invalid RFC 3339 duration: u'P1H2S'
251
273
    >>> # Weeks can not be combined with anything else
252
274
    >>> rfc3339_duration_to_delta("P1D2W")
253
275
    Traceback (most recent call last):
254
276
    ...
255
 
    ValueError: Invalid RFC 3339 duration: "P1D2W"
 
277
    ValueError: Invalid RFC 3339 duration: u'P1D2W'
256
278
    >>> rfc3339_duration_to_delta("P2W2H")
257
279
    Traceback (most recent call last):
258
280
    ...
259
 
    ValueError: Invalid RFC 3339 duration: "P2W2H"
 
281
    ValueError: Invalid RFC 3339 duration: u'P2W2H'
260
282
    """
261
283
 
262
284
    # Parsing an RFC 3339 duration with regular expressions is not
333
355
                break
334
356
        else:
335
357
            # No currently valid tokens were found
336
 
            raise ValueError("Invalid RFC 3339 duration: \"{}\""
 
358
            raise ValueError("Invalid RFC 3339 duration: {!r}"
337
359
                             .format(duration))
338
360
    # End token found
339
361
    return value
424
446
        options.remove = True
425
447
 
426
448
 
427
 
def get_mandos_dbus_object(bus):
428
 
    log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
429
 
              dbus_busname, server_dbus_path)
430
 
    with if_dbus_exception_log_with_exception_and_exit(
431
 
            "Could not connect to Mandos server: %s"):
432
 
        mandos_dbus_object = bus.get_object(dbus_busname,
433
 
                                            server_dbus_path)
434
 
    return mandos_dbus_object
435
 
 
436
 
 
437
 
@contextlib.contextmanager
438
 
def if_dbus_exception_log_with_exception_and_exit(*args, **kwargs):
439
 
    try:
440
 
        yield
441
 
    except dbus.exceptions.DBusException as e:
442
 
        log.critical(*(args + (e,)), **kwargs)
443
 
        sys.exit(1)
444
 
 
445
 
 
446
 
def get_managed_objects(object_manager):
447
 
    log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
448
 
              server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
449
 
    with if_dbus_exception_log_with_exception_and_exit(
450
 
            "Failed to access Mandos server through D-Bus:\n%s"):
451
 
        with SilenceLogger("dbus.proxies"):
452
 
            managed_objects = object_manager.GetManagedObjects()
453
 
    return managed_objects
454
 
 
455
 
 
456
 
class SilenceLogger(object):
457
 
    "Simple context manager to silence a particular logger"
458
 
    def __init__(self, loggername):
459
 
        self.logger = logging.getLogger(loggername)
460
 
 
461
 
    def __enter__(self):
462
 
        self.logger.addFilter(self.nullfilter)
463
 
        return self
464
 
 
465
 
    class NullFilter(logging.Filter):
466
 
        def filter(self, record):
467
 
            return False
468
 
 
469
 
    nullfilter = NullFilter()
470
 
 
471
 
    def __exit__(self, exc_type, exc_val, exc_tb):
472
 
        self.logger.removeFilter(self.nullfilter)
473
 
 
474
 
 
475
449
def commands_from_options(options):
476
450
 
477
451
    commands = []
594
568
        self.mandos.RemoveClient(client.__dbus_object_path__)
595
569
 
596
570
 
597
 
class OutputCmd(Command):
598
 
    """Abstract class for commands outputting client details"""
 
571
class PrintCmd(Command):
 
572
    """Abstract class for commands printing client details"""
599
573
    all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
600
574
                    "Created", "Interval", "Host", "KeyID",
601
575
                    "Fingerprint", "CheckerRunning", "LastEnabled",
603
577
                    "LastApprovalRequest", "ApprovalDelay",
604
578
                    "ApprovalDuration", "Checker", "ExtendedTimeout",
605
579
                    "Expires", "LastCheckerStatus")
606
 
 
607
580
    def run(self, clients, bus=None, mandos=None):
608
581
        print(self.output(clients.values()))
609
 
 
610
582
    def output(self, clients):
611
583
        raise NotImplementedError()
612
584
 
613
585
 
614
 
class DumpJSONCmd(OutputCmd):
 
586
class DumpJSONCmd(PrintCmd):
615
587
    def output(self, clients):
616
588
        data = {client["Name"]:
617
589
                {key: self.dbus_boolean_to_bool(client[key])
618
590
                 for key in self.all_keywords}
619
 
                for client in clients}
 
591
                for client in clients.values()}
620
592
        return json.dumps(data, indent=4, separators=(',', ': '))
621
 
 
622
593
    @staticmethod
623
594
    def dbus_boolean_to_bool(value):
624
595
        if isinstance(value, dbus.Boolean):
626
597
        return value
627
598
 
628
599
 
629
 
class PrintTableCmd(OutputCmd):
 
600
class PrintTableCmd(PrintCmd):
630
601
    def __init__(self, verbose=False):
631
602
        self.verbose = verbose
632
603
 
662
633
            "LastCheckerStatus": "Last Checker Status",
663
634
        }
664
635
 
665
 
        def __init__(self, clients, keywords):
 
636
        def __init__(self, clients, keywords, tableheaders=None):
666
637
            self.clients = clients
667
638
            self.keywords = keywords
 
639
            if tableheaders is not None:
 
640
                self.tableheaders = tableheaders
668
641
 
669
642
        def __str__(self):
670
643
            return "\n".join(self.rows())
723
696
 
724
697
class PropertyCmd(Command):
725
698
    """Abstract class for Actions for setting one client property"""
726
 
 
727
699
    def run_on_one_client(self, client, properties):
728
700
        """Set the Client's D-Bus property"""
729
701
        log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
735
707
        client.Set(client_dbus_interface, self.propname,
736
708
                   self.value_to_set,
737
709
                   dbus_interface=dbus.PROPERTIES_IFACE)
738
 
 
739
710
    @property
740
711
    def propname(self):
741
712
        raise NotImplementedError()
792
763
 
793
764
class SetSecretCmd(PropertyValueCmd):
794
765
    propname = "Secret"
795
 
 
796
766
    @property
797
767
    def value_to_set(self):
798
768
        return self._vts
799
 
 
800
769
    @value_to_set.setter
801
770
    def value_to_set(self, value):
802
771
        """When setting, read data from supplied file object"""
807
776
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
808
777
    """Abstract class for PropertyValueCmd taking a value argument as
809
778
a datetime.timedelta() but should store it as milliseconds."""
810
 
 
811
779
    @property
812
780
    def value_to_set(self):
813
781
        return self._vts
814
 
 
815
782
    @value_to_set.setter
816
783
    def value_to_set(self, value):
817
784
        """When setting, convert value from a datetime.timedelta"""
839
806
 
840
807
 
841
808
 
842
 
class TestCaseWithAssertLogs(unittest.TestCase):
843
 
    """unittest.TestCase.assertLogs only exists in Python 3.4"""
844
 
 
845
 
    if not hasattr(unittest.TestCase, "assertLogs"):
846
 
        @contextlib.contextmanager
847
 
        def assertLogs(self, logger, level=logging.INFO):
848
 
            capturing_handler = self.CapturingLevelHandler(level)
849
 
            old_level = logger.level
850
 
            old_propagate = logger.propagate
851
 
            logger.addHandler(capturing_handler)
852
 
            logger.setLevel(level)
853
 
            logger.propagate = False
854
 
            try:
855
 
                yield capturing_handler.watcher
856
 
            finally:
857
 
                logger.propagate = old_propagate
858
 
                logger.removeHandler(capturing_handler)
859
 
                logger.setLevel(old_level)
860
 
            self.assertGreater(len(capturing_handler.watcher.records),
861
 
                               0)
862
 
 
863
 
        class CapturingLevelHandler(logging.Handler):
864
 
            def __init__(self, level, *args, **kwargs):
865
 
                logging.Handler.__init__(self, *args, **kwargs)
866
 
                self.watcher = self.LoggingWatcher([], [])
867
 
            def emit(self, record):
868
 
                self.watcher.records.append(record)
869
 
                self.watcher.output.append(self.format(record))
870
 
 
871
 
            LoggingWatcher = collections.namedtuple("LoggingWatcher",
872
 
                                                    ("records",
873
 
                                                     "output"))
874
 
 
875
 
 
876
 
class Test_string_to_delta(TestCaseWithAssertLogs):
 
809
class Test_string_to_delta(unittest.TestCase):
877
810
    def test_handles_basic_rfc3339(self):
878
811
        self.assertEqual(string_to_delta("PT0S"),
879
812
                         datetime.timedelta())
883
816
                         datetime.timedelta(0, 1))
884
817
        self.assertEqual(string_to_delta("PT2H"),
885
818
                         datetime.timedelta(0, 7200))
886
 
 
887
819
    def test_falls_back_to_pre_1_6_1_with_warning(self):
888
 
        with self.assertLogs(log, logging.WARNING):
889
 
            value = string_to_delta("2h")
 
820
        # assertLogs only exists in Python 3.4
 
821
        if hasattr(self, "assertLogs"):
 
822
            with self.assertLogs(log, logging.WARNING):
 
823
                value = string_to_delta("2h")
 
824
        else:
 
825
            class WarningFilter(logging.Filter):
 
826
                """Don't show, but record the presence of, warnings"""
 
827
                def filter(self, record):
 
828
                    is_warning = record.levelno >= logging.WARNING
 
829
                    self.found = is_warning or getattr(self, "found",
 
830
                                                       False)
 
831
                    return not is_warning
 
832
            warning_filter = WarningFilter()
 
833
            log.addFilter(warning_filter)
 
834
            try:
 
835
                value = string_to_delta("2h")
 
836
            finally:
 
837
                log.removeFilter(warning_filter)
 
838
            self.assertTrue(getattr(warning_filter, "found", False))
890
839
        self.assertEqual(value, datetime.timedelta(0, 7200))
891
840
 
892
841
 
893
842
class Test_check_option_syntax(unittest.TestCase):
894
 
    def setUp(self):
895
 
        self.parser = argparse.ArgumentParser()
896
 
        add_command_line_options(self.parser)
897
 
 
898
 
    def test_actions_requires_client_or_all(self):
899
 
        for action, value in self.actions.items():
900
 
            options = self.parser.parse_args()
901
 
            setattr(options, action, value)
902
 
            with self.assertParseError():
903
 
                self.check_option_syntax(options)
904
 
 
905
843
    # This mostly corresponds to the definition from has_actions() in
906
844
    # check_option_syntax()
907
845
    actions = {
928
866
        "deny": True,
929
867
    }
930
868
 
 
869
    def setUp(self):
 
870
        self.parser = argparse.ArgumentParser()
 
871
        add_command_line_options(self.parser)
 
872
 
931
873
    @contextlib.contextmanager
932
874
    def assertParseError(self):
933
875
        with self.assertRaises(SystemExit) as e:
934
 
            with self.redirect_stderr_to_devnull():
 
876
            with self.temporarily_suppress_stderr():
935
877
                yield
936
878
        # Exit code from argparse is guaranteed to be "2".  Reference:
937
879
        # https://docs.python.org/3/library
940
882
 
941
883
    @staticmethod
942
884
    @contextlib.contextmanager
943
 
    def redirect_stderr_to_devnull():
 
885
    def temporarily_suppress_stderr():
944
886
        null = os.open(os.path.devnull, os.O_RDWR)
945
887
        stderrcopy = os.dup(sys.stderr.fileno())
946
888
        os.dup2(null, sys.stderr.fileno())
955
897
    def check_option_syntax(self, options):
956
898
        check_option_syntax(self.parser, options)
957
899
 
958
 
    def test_actions_all_conflicts_with_verbose(self):
 
900
    def test_actions_requires_client_or_all(self):
959
901
        for action, value in self.actions.items():
960
902
            options = self.parser.parse_args()
961
903
            setattr(options, action, value)
962
 
            options.all = True
963
 
            options.verbose = True
964
904
            with self.assertParseError():
965
905
                self.check_option_syntax(options)
966
906
 
967
 
    def test_actions_with_client_conflicts_with_verbose(self):
 
907
    def test_actions_conflicts_with_verbose(self):
968
908
        for action, value in self.actions.items():
969
909
            options = self.parser.parse_args()
970
910
            setattr(options, action, value)
971
911
            options.verbose = True
972
 
            options.client = ["foo"]
973
912
            with self.assertParseError():
974
913
                self.check_option_syntax(options)
975
914
 
1001
940
            options.all = True
1002
941
            self.check_option_syntax(options)
1003
942
 
1004
 
    def test_any_action_is_ok_with_one_client(self):
1005
 
        for action, value in self.actions.items():
1006
 
            options = self.parser.parse_args()
1007
 
            setattr(options, action, value)
1008
 
            options.client = ["foo"]
 
943
    def test_is_enabled_fails_without_client(self):
 
944
        options = self.parser.parse_args()
 
945
        options.is_enabled = True
 
946
        with self.assertParseError():
1009
947
            self.check_option_syntax(options)
1010
948
 
1011
 
    def test_one_client_with_all_actions_except_is_enabled(self):
 
949
    def test_is_enabled_works_with_one_client(self):
1012
950
        options = self.parser.parse_args()
1013
 
        for action, value in self.actions.items():
1014
 
            if action == "is_enabled":
1015
 
                continue
1016
 
            setattr(options, action, value)
 
951
        options.is_enabled = True
1017
952
        options.client = ["foo"]
1018
953
        self.check_option_syntax(options)
1019
954
 
1020
 
    def test_two_clients_with_all_actions_except_is_enabled(self):
1021
 
        options = self.parser.parse_args()
1022
 
        for action, value in self.actions.items():
1023
 
            if action == "is_enabled":
1024
 
                continue
1025
 
            setattr(options, action, value)
1026
 
        options.client = ["foo", "barbar"]
1027
 
        self.check_option_syntax(options)
1028
 
 
1029
 
    def test_two_clients_are_ok_with_actions_except_is_enabled(self):
1030
 
        for action, value in self.actions.items():
1031
 
            if action == "is_enabled":
1032
 
                continue
1033
 
            options = self.parser.parse_args()
1034
 
            setattr(options, action, value)
1035
 
            options.client = ["foo", "barbar"]
1036
 
            self.check_option_syntax(options)
1037
 
 
1038
 
    def test_is_enabled_fails_without_client(self):
1039
 
        options = self.parser.parse_args()
1040
 
        options.is_enabled = True
1041
 
        with self.assertParseError():
1042
 
            self.check_option_syntax(options)
1043
 
 
1044
955
    def test_is_enabled_fails_with_two_clients(self):
1045
956
        options = self.parser.parse_args()
1046
957
        options.is_enabled = True
1060
971
                self.check_option_syntax(options)
1061
972
 
1062
973
 
1063
 
class Test_get_mandos_dbus_object(TestCaseWithAssertLogs):
1064
 
    def test_calls_and_returns_get_object_on_bus(self):
1065
 
        class MockBus(object):
1066
 
            called = False
1067
 
            def get_object(mockbus_self, busname, dbus_path):
1068
 
                # Note that "self" is still the testcase instance,
1069
 
                # this MockBus instance is in "mockbus_self".
1070
 
                self.assertEqual(busname, dbus_busname)
1071
 
                self.assertEqual(dbus_path, server_dbus_path)
1072
 
                mockbus_self.called = True
1073
 
                return mockbus_self
1074
 
 
1075
 
        mockbus = get_mandos_dbus_object(bus=MockBus())
1076
 
        self.assertIsInstance(mockbus, MockBus)
1077
 
        self.assertTrue(mockbus.called)
1078
 
 
1079
 
    def test_logs_and_exits_on_dbus_error(self):
1080
 
        class MockBusFailing(object):
1081
 
            def get_object(self, busname, dbus_path):
1082
 
                raise dbus.exceptions.DBusException("Test")
1083
 
 
1084
 
        with self.assertLogs(log, logging.CRITICAL):
1085
 
            with self.assertRaises(SystemExit) as e:
1086
 
                bus = get_mandos_dbus_object(bus=MockBusFailing())
1087
 
 
1088
 
        if isinstance(e.exception.code, int):
1089
 
            self.assertNotEqual(e.exception.code, 0)
1090
 
        else:
1091
 
            self.assertIsNotNone(e.exception.code)
1092
 
 
1093
 
 
1094
 
class Test_get_managed_objects(TestCaseWithAssertLogs):
1095
 
    def test_calls_and_returns_GetManagedObjects(self):
1096
 
        managed_objects = {"/clients/foo": { "Name": "foo"}}
1097
 
        class MockObjectManager(object):
1098
 
            def GetManagedObjects(self):
1099
 
                return managed_objects
1100
 
        retval = get_managed_objects(MockObjectManager())
1101
 
        self.assertDictEqual(managed_objects, retval)
1102
 
 
1103
 
    def test_logs_and_exits_on_dbus_error(self):
1104
 
        dbus_logger = logging.getLogger("dbus.proxies")
1105
 
 
1106
 
        class MockObjectManagerFailing(object):
1107
 
            def GetManagedObjects(self):
1108
 
                dbus_logger.error("Test")
1109
 
                raise dbus.exceptions.DBusException("Test")
1110
 
 
1111
 
        class CountingHandler(logging.Handler):
1112
 
            count = 0
1113
 
            def emit(self, record):
1114
 
                self.count += 1
1115
 
 
1116
 
        counting_handler = CountingHandler()
1117
 
 
1118
 
        dbus_logger.addHandler(counting_handler)
1119
 
 
1120
 
        try:
1121
 
            with self.assertLogs(log, logging.CRITICAL) as watcher:
1122
 
                with self.assertRaises(SystemExit) as e:
1123
 
                    get_managed_objects(MockObjectManagerFailing())
1124
 
        finally:
1125
 
            dbus_logger.removeFilter(counting_handler)
1126
 
 
1127
 
        # Make sure the dbus logger was suppressed
1128
 
        self.assertEqual(counting_handler.count, 0)
1129
 
 
1130
 
        # Test that the dbus_logger still works
1131
 
        with self.assertLogs(dbus_logger, logging.ERROR):
1132
 
            dbus_logger.error("Test")
1133
 
 
1134
 
        if isinstance(e.exception.code, int):
1135
 
            self.assertNotEqual(e.exception.code, 0)
1136
 
        else:
1137
 
            self.assertIsNotNone(e.exception.code)
1138
 
 
1139
 
 
1140
 
class Test_commands_from_options(unittest.TestCase):
 
974
class Test_command_from_options(unittest.TestCase):
1141
975
    def setUp(self):
1142
976
        self.parser = argparse.ArgumentParser()
1143
977
        add_command_line_options(self.parser)
1144
 
 
1145
 
    def test_is_enabled(self):
1146
 
        self.assert_command_from_args(["--is-enabled", "foo"],
1147
 
                                      IsEnabledCmd)
1148
 
 
1149
978
    def assert_command_from_args(self, args, command_cls,
1150
979
                                 **cmd_attrs):
1151
980
        """Assert that parsing ARGS should result in an instance of
1158
987
        self.assertIsInstance(command, command_cls)
1159
988
        for key, value in cmd_attrs.items():
1160
989
            self.assertEqual(getattr(command, key), value)
1161
 
 
1162
 
    def test_is_enabled_short(self):
1163
 
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1164
 
 
1165
 
    def test_approve(self):
1166
 
        self.assert_command_from_args(["--approve", "foo"],
1167
 
                                      ApproveCmd)
1168
 
 
1169
 
    def test_approve_short(self):
1170
 
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1171
 
 
1172
 
    def test_deny(self):
1173
 
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1174
 
 
1175
 
    def test_deny_short(self):
1176
 
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
1177
 
 
1178
 
    def test_remove(self):
1179
 
        self.assert_command_from_args(["--remove", "foo"],
1180
 
                                      RemoveCmd)
1181
 
 
1182
 
    def test_deny_before_remove(self):
1183
 
        options = self.parser.parse_args(["--deny", "--remove",
1184
 
                                          "foo"])
1185
 
        check_option_syntax(self.parser, options)
1186
 
        commands = commands_from_options(options)
1187
 
        self.assertEqual(len(commands), 2)
1188
 
        self.assertIsInstance(commands[0], DenyCmd)
1189
 
        self.assertIsInstance(commands[1], RemoveCmd)
1190
 
 
1191
 
    def test_deny_before_remove_reversed(self):
1192
 
        options = self.parser.parse_args(["--remove", "--deny",
1193
 
                                          "--all"])
1194
 
        check_option_syntax(self.parser, options)
1195
 
        commands = commands_from_options(options)
1196
 
        self.assertEqual(len(commands), 2)
1197
 
        self.assertIsInstance(commands[0], DenyCmd)
1198
 
        self.assertIsInstance(commands[1], RemoveCmd)
1199
 
 
1200
 
    def test_remove_short(self):
1201
 
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1202
 
 
1203
 
    def test_dump_json(self):
1204
 
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
 
990
    def test_print_table(self):
 
991
        self.assert_command_from_args([], PrintTableCmd,
 
992
                                      verbose=False)
 
993
 
 
994
    def test_print_table_verbose(self):
 
995
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
 
996
                                      verbose=True)
 
997
 
 
998
    def test_print_table_verbose_short(self):
 
999
        self.assert_command_from_args(["-v"], PrintTableCmd,
 
1000
                                      verbose=True)
1205
1001
 
1206
1002
    def test_enable(self):
1207
1003
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1231
1027
        self.assert_command_from_args(["--stop-checker", "foo"],
1232
1028
                                      StopCheckerCmd)
1233
1029
 
1234
 
    def test_approve_by_default(self):
1235
 
        self.assert_command_from_args(["--approve-by-default", "foo"],
1236
 
                                      ApproveByDefaultCmd)
 
1030
    def test_remove(self):
 
1031
        self.assert_command_from_args(["--remove", "foo"],
 
1032
                                      RemoveCmd)
1237
1033
 
1238
 
    def test_deny_by_default(self):
1239
 
        self.assert_command_from_args(["--deny-by-default", "foo"],
1240
 
                                      DenyByDefaultCmd)
 
1034
    def test_remove_short(self):
 
1035
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1241
1036
 
1242
1037
    def test_checker(self):
1243
1038
        self.assert_command_from_args(["--checker", ":", "foo"],
1251
1046
        self.assert_command_from_args(["-c", ":", "foo"],
1252
1047
                                      SetCheckerCmd, value_to_set=":")
1253
1048
 
1254
 
    def test_host(self):
1255
 
        self.assert_command_from_args(["--host", "foo.example.org",
1256
 
                                       "foo"], SetHostCmd,
1257
 
                                      value_to_set="foo.example.org")
1258
 
 
1259
 
    def test_host_short(self):
1260
 
        self.assert_command_from_args(["-H", "foo.example.org",
1261
 
                                       "foo"], SetHostCmd,
1262
 
                                      value_to_set="foo.example.org")
1263
 
 
1264
 
    def test_secret_devnull(self):
1265
 
        self.assert_command_from_args(["--secret", os.path.devnull,
1266
 
                                       "foo"], SetSecretCmd,
1267
 
                                      value_to_set=b"")
1268
 
 
1269
 
    def test_secret_tempfile(self):
1270
 
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
1271
 
            value = b"secret\0xyzzy\nbar"
1272
 
            f.write(value)
1273
 
            f.seek(0)
1274
 
            self.assert_command_from_args(["--secret", f.name,
1275
 
                                           "foo"], SetSecretCmd,
1276
 
                                          value_to_set=value)
1277
 
 
1278
 
    def test_secret_devnull_short(self):
1279
 
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1280
 
                                      SetSecretCmd, value_to_set=b"")
1281
 
 
1282
 
    def test_secret_tempfile_short(self):
1283
 
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
1284
 
            value = b"secret\0xyzzy\nbar"
1285
 
            f.write(value)
1286
 
            f.seek(0)
1287
 
            self.assert_command_from_args(["-s", f.name, "foo"],
1288
 
                                          SetSecretCmd,
1289
 
                                          value_to_set=value)
1290
 
 
1291
1049
    def test_timeout(self):
1292
1050
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1293
1051
                                      SetTimeoutCmd,
1314
1072
                                      SetIntervalCmd,
1315
1073
                                      value_to_set=120000)
1316
1074
 
 
1075
    def test_approve_by_default(self):
 
1076
        self.assert_command_from_args(["--approve-by-default", "foo"],
 
1077
                                      ApproveByDefaultCmd)
 
1078
 
 
1079
    def test_deny_by_default(self):
 
1080
        self.assert_command_from_args(["--deny-by-default", "foo"],
 
1081
                                      DenyByDefaultCmd)
 
1082
 
1317
1083
    def test_approval_delay(self):
1318
1084
        self.assert_command_from_args(["--approval-delay", "PT30S",
1319
1085
                                       "foo"], SetApprovalDelayCmd,
1324
1090
                                       "foo"], SetApprovalDurationCmd,
1325
1091
                                      value_to_set=1000)
1326
1092
 
1327
 
    def test_print_table(self):
1328
 
        self.assert_command_from_args([], PrintTableCmd,
1329
 
                                      verbose=False)
1330
 
 
1331
 
    def test_print_table_verbose(self):
1332
 
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
1333
 
                                      verbose=True)
1334
 
 
1335
 
    def test_print_table_verbose_short(self):
1336
 
        self.assert_command_from_args(["-v"], PrintTableCmd,
1337
 
                                      verbose=True)
 
1093
    def test_host(self):
 
1094
        self.assert_command_from_args(["--host", "foo.example.org",
 
1095
                                       "foo"], SetHostCmd,
 
1096
                                      value_to_set="foo.example.org")
 
1097
 
 
1098
    def test_host_short(self):
 
1099
        self.assert_command_from_args(["-H", "foo.example.org",
 
1100
                                       "foo"], SetHostCmd,
 
1101
                                      value_to_set="foo.example.org")
 
1102
 
 
1103
    def test_secret_devnull(self):
 
1104
        self.assert_command_from_args(["--secret", os.path.devnull,
 
1105
                                       "foo"], SetSecretCmd,
 
1106
                                      value_to_set=b"")
 
1107
 
 
1108
    def test_secret_tempfile(self):
 
1109
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1110
            value = b"secret\0xyzzy\nbar"
 
1111
            f.write(value)
 
1112
            f.seek(0)
 
1113
            self.assert_command_from_args(["--secret", f.name,
 
1114
                                           "foo"], SetSecretCmd,
 
1115
                                          value_to_set=value)
 
1116
 
 
1117
    def test_secret_devnull_short(self):
 
1118
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
 
1119
                                      SetSecretCmd, value_to_set=b"")
 
1120
 
 
1121
    def test_secret_tempfile_short(self):
 
1122
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1123
            value = b"secret\0xyzzy\nbar"
 
1124
            f.write(value)
 
1125
            f.seek(0)
 
1126
            self.assert_command_from_args(["-s", f.name, "foo"],
 
1127
                                          SetSecretCmd,
 
1128
                                          value_to_set=value)
 
1129
 
 
1130
    def test_approve(self):
 
1131
        self.assert_command_from_args(["--approve", "foo"],
 
1132
                                      ApproveCmd)
 
1133
 
 
1134
    def test_approve_short(self):
 
1135
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
 
1136
 
 
1137
    def test_deny(self):
 
1138
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
 
1139
 
 
1140
    def test_deny_short(self):
 
1141
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
 
1142
 
 
1143
    def test_dump_json(self):
 
1144
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
 
1145
 
 
1146
    def test_is_enabled(self):
 
1147
        self.assert_command_from_args(["--is-enabled", "foo"],
 
1148
                                      IsEnabledCmd)
 
1149
 
 
1150
    def test_is_enabled_short(self):
 
1151
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
 
1152
 
 
1153
    def test_deny_before_remove(self):
 
1154
        options = self.parser.parse_args(["--deny", "--remove",
 
1155
                                          "foo"])
 
1156
        check_option_syntax(self.parser, options)
 
1157
        commands = commands_from_options(options)
 
1158
        self.assertEqual(len(commands), 2)
 
1159
        self.assertIsInstance(commands[0], DenyCmd)
 
1160
        self.assertIsInstance(commands[1], RemoveCmd)
 
1161
 
 
1162
    def test_deny_before_remove_reversed(self):
 
1163
        options = self.parser.parse_args(["--remove", "--deny",
 
1164
                                          "--all"])
 
1165
        check_option_syntax(self.parser, options)
 
1166
        commands = commands_from_options(options)
 
1167
        self.assertEqual(len(commands), 2)
 
1168
        self.assertIsInstance(commands[0], DenyCmd)
 
1169
        self.assertIsInstance(commands[1], RemoveCmd)
1338
1170
 
1339
1171
 
1340
1172
class TestCmd(unittest.TestCase):
1341
1173
    """Abstract class for tests of command classes"""
1342
 
 
1343
1174
    def setUp(self):
1344
1175
        testcase = self
1345
1176
        class MockClient(object):
1417
1248
                ("/clients/barbar", self.other_client.attributes),
1418
1249
            ])
1419
1250
        self.one_client = {"/clients/foo": self.client.attributes}
1420
 
 
1421
1251
    @property
1422
1252
    def bus(self):
1423
1253
        class Bus(object):
1425
1255
            def get_object(client_bus_name, path):
1426
1256
                self.assertEqual(client_bus_name, dbus_busname)
1427
1257
                return {
1428
 
                    # Note: "self" here is the TestCmd instance, not
1429
 
                    # the Bus instance, since this is a static method!
1430
1258
                    "/clients/foo": self.client,
1431
1259
                    "/clients/barbar": self.other_client,
1432
1260
                }[path]
1439
1267
                                                      properties)
1440
1268
                            for client, properties
1441
1269
                            in self.clients.items()))
1442
 
 
1443
1270
    def test_is_enabled_run_exits_successfully(self):
1444
1271
        with self.assertRaises(SystemExit) as e:
1445
1272
            IsEnabledCmd().run(self.one_client)
1447
1274
            self.assertEqual(e.exception.code, 0)
1448
1275
        else:
1449
1276
            self.assertIsNone(e.exception.code)
1450
 
 
1451
1277
    def test_is_enabled_run_exits_with_failure(self):
1452
1278
        self.client.attributes["Enabled"] = dbus.Boolean(False)
1453
1279
        with self.assertRaises(SystemExit) as e:
1475
1301
            self.assertIn(("Approve", (False, client_dbus_interface)),
1476
1302
                          client.calls)
1477
1303
 
1478
 
 
1479
1304
class TestRemoveCmd(TestCmd):
1480
1305
    def test_remove(self):
1481
1306
        class MockMandos(object):
1545
1370
            },
1546
1371
        }
1547
1372
        return super(TestDumpJSONCmd, self).setUp()
1548
 
 
1549
1373
    def test_normal(self):
1550
 
        output = DumpJSONCmd().output(self.clients.values())
1551
 
        json_data = json.loads(output)
 
1374
        json_data = json.loads(DumpJSONCmd().output(self.clients))
1552
1375
        self.assertDictEqual(json_data, self.expected_json)
1553
 
 
1554
1376
    def test_one_client(self):
1555
 
        output = DumpJSONCmd().output(self.one_client.values())
1556
 
        json_data = json.loads(output)
 
1377
        clients = self.one_client
 
1378
        json_data = json.loads(DumpJSONCmd().output(clients))
1557
1379
        expected_json = {"foo": self.expected_json["foo"]}
1558
1380
        self.assertDictEqual(json_data, expected_json)
1559
1381
 
1567
1389
            "barbar Yes     00:05:00 2019-02-04T00:00:00  ",
1568
1390
        ))
1569
1391
        self.assertEqual(output, expected_output)
1570
 
 
1571
1392
    def test_verbose(self):
1572
1393
        output = PrintTableCmd(verbose=True).output(
1573
1394
            self.clients.values())
1662
1483
                                            for rows in columns)
1663
1484
                                    for line in range(num_lines))
1664
1485
        self.assertEqual(output, expected_output)
1665
 
 
1666
1486
    def test_one_client(self):
1667
1487
        output = PrintTableCmd().output(self.one_client.values())
1668
 
        expected_output = "\n".join((
1669
 
            "Name Enabled Timeout  Last Successful Check",
1670
 
            "foo  Yes     00:05:00 2019-02-03T00:00:00  ",
1671
 
        ))
 
1488
        expected_output = """
 
1489
Name Enabled Timeout  Last Successful Check
 
1490
foo  Yes     00:05:00 2019-02-03T00:00:00  
 
1491
"""[1:-1]
1672
1492
        self.assertEqual(output, expected_output)
1673
1493
 
1674
1494
 
 
1495
class Unique(object):
 
1496
    """Class for objects which exist only to be unique objects, since
 
1497
unittest.mock.sentinel only exists in Python 3.3"""
 
1498
 
 
1499
 
1675
1500
class TestPropertyCmd(TestCmd):
1676
1501
    """Abstract class for tests of PropertyCmd classes"""
1677
1502
    def runTest(self):
1684
1509
            for clientpath in self.clients:
1685
1510
                client = self.bus.get_object(dbus_busname, clientpath)
1686
1511
                old_value = client.attributes[self.propname]
1687
 
                self.assertNotIsInstance(old_value, self.Unique)
1688
 
                client.attributes[self.propname] = self.Unique()
 
1512
                self.assertNotIsInstance(old_value, Unique)
 
1513
                client.attributes[self.propname] = Unique()
1689
1514
            self.run_command(value_to_set, self.clients)
1690
1515
            for clientpath in self.clients:
1691
1516
                client = self.bus.get_object(dbus_busname, clientpath)
1692
1517
                value = client.attributes[self.propname]
1693
 
                self.assertNotIsInstance(value, self.Unique)
 
1518
                self.assertNotIsInstance(value, Unique)
1694
1519
                self.assertEqual(value, value_to_get)
1695
 
 
1696
 
    class Unique(object):
1697
 
        """Class for objects which exist only to be unique objects,
1698
 
since unittest.mock.sentinel only exists in Python 3.3"""
1699
 
 
1700
1520
    def run_command(self, value, clients):
1701
1521
        self.command().run(clients, self.bus)
1702
1522
 
1703
1523
 
1704
 
class TestEnableCmd(TestPropertyCmd):
1705
 
    command = EnableCmd
1706
 
    propname = "Enabled"
1707
 
    values_to_set = [dbus.Boolean(True)]
1708
 
 
1709
 
 
1710
 
class TestDisableCmd(TestPropertyCmd):
1711
 
    command = DisableCmd
1712
 
    propname = "Enabled"
1713
 
    values_to_set = [dbus.Boolean(False)]
 
1524
class TestEnableCmd(TestCmd):
 
1525
    def test_enable(self):
 
1526
        for clientpath in self.clients:
 
1527
            client = self.bus.get_object(dbus_busname, clientpath)
 
1528
            client.attributes["Enabled"] = False
 
1529
 
 
1530
        EnableCmd().run(self.clients, self.bus)
 
1531
 
 
1532
        for clientpath in self.clients:
 
1533
            client = self.bus.get_object(dbus_busname, clientpath)
 
1534
            self.assertTrue(client.attributes["Enabled"])
 
1535
 
 
1536
 
 
1537
class TestDisableCmd(TestCmd):
 
1538
    def test_disable(self):
 
1539
        DisableCmd().run(self.clients, self.bus)
 
1540
        for clientpath in self.clients:
 
1541
            client = self.bus.get_object(dbus_busname, clientpath)
 
1542
            self.assertFalse(client.attributes["Enabled"])
1714
1543
 
1715
1544
 
1716
1545
class TestBumpTimeoutCmd(TestPropertyCmd):
1745
1574
 
1746
1575
class TestPropertyValueCmd(TestPropertyCmd):
1747
1576
    """Abstract class for tests of PropertyValueCmd classes"""
1748
 
 
1749
1577
    def runTest(self):
1750
1578
        if type(self) is TestPropertyValueCmd:
1751
1579
            return
1752
1580
        return super(TestPropertyValueCmd, self).runTest()
1753
 
 
1754
1581
    def run_command(self, value, clients):
1755
1582
        self.command(value).run(clients, self.bus)
1756
1583
 
1848
1675
    return tests
1849
1676
 
1850
1677
if __name__ == "__main__":
1851
 
    try:
1852
 
        if should_only_run_tests():
1853
 
            # Call using ./tdd-python-script --check [--verbose]
1854
 
            unittest.main()
1855
 
        else:
1856
 
            main()
1857
 
    finally:
1858
 
        logging.shutdown()
 
1678
    if should_only_run_tests():
 
1679
        # Call using ./tdd-python-script --check [--verbose]
 
1680
        unittest.main()
 
1681
    else:
 
1682
        main()