/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-12 20:27:39 UTC
  • mto: This revision was merged to the branch mainline in revision 382.
  • Revision ID: teddy@recompile.se-20190312202739-sa7fhlxuqg1soimk
mandos-ctl: Refactor

* mandos-ctl (TestPrintTableCmd.test_one_client): Change format of
  "expected_output" definition to match other tests, for consistency.

Show diffs side-by-side

added added

removed removed

Lines of Context:
93
93
    if options.debug:
94
94
        log.setLevel(logging.DEBUG)
95
95
 
96
 
    bus = dbus.SystemBus()
97
 
 
98
 
    mandos_dbus_object = get_mandos_dbus_object(bus)
99
 
 
100
 
    mandos_serv = dbus.Interface(
101
 
        mandos_dbus_object, dbus_interface=server_dbus_interface)
 
96
    try:
 
97
        bus = dbus.SystemBus()
 
98
        log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
 
99
                  dbus_busname, server_dbus_path)
 
100
        mandos_dbus_objc = bus.get_object(dbus_busname,
 
101
                                          server_dbus_path)
 
102
    except dbus.exceptions.DBusException:
 
103
        log.critical("Could not connect to Mandos server")
 
104
        sys.exit(1)
 
105
 
 
106
    mandos_serv = dbus.Interface(mandos_dbus_objc,
 
107
                                 dbus_interface=server_dbus_interface)
102
108
    mandos_serv_object_manager = dbus.Interface(
103
 
        mandos_dbus_object, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
 
109
        mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
104
110
 
 
111
    # Filter out log message from dbus module
 
112
    dbus_logger = logging.getLogger("dbus.proxies")
 
113
    class NullFilter(logging.Filter):
 
114
        def filter(self, record):
 
115
            return False
 
116
    dbus_filter = NullFilter()
105
117
    try:
 
118
        dbus_logger.addFilter(dbus_filter)
106
119
        log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
107
120
                  server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
108
 
        with SilenceLogger("dbus.proxies"):
109
 
            all_clients = {path: ifs_and_props[client_dbus_interface]
110
 
                              for path, ifs_and_props in
111
 
                              mandos_serv_object_manager
112
 
                              .GetManagedObjects().items()
113
 
                              if client_dbus_interface in ifs_and_props}
 
121
        mandos_clients = {path: ifs_and_props[client_dbus_interface]
 
122
                          for path, ifs_and_props in
 
123
                          mandos_serv_object_manager
 
124
                          .GetManagedObjects().items()
 
125
                          if client_dbus_interface in ifs_and_props}
114
126
    except dbus.exceptions.DBusException as e:
115
127
        log.critical("Failed to access Mandos server through D-Bus:"
116
128
                     "\n%s", e)
117
129
        sys.exit(1)
 
130
    finally:
 
131
        # restore dbus logger
 
132
        dbus_logger.removeFilter(dbus_filter)
118
133
 
119
134
    # Compile dict of (clients: properties) to process
120
135
    clients = {}
121
136
 
122
137
    if not clientnames:
123
 
        clients = all_clients
 
138
        clients = {objpath: properties
 
139
                   for objpath, properties in mandos_clients.items()}
124
140
    else:
125
141
        for name in clientnames:
126
 
            for objpath, properties in all_clients.items():
 
142
            for objpath, properties in mandos_clients.items():
127
143
                if properties["Name"] == name:
128
144
                    clients[objpath] = properties
129
145
                    break
430
446
        options.remove = True
431
447
 
432
448
 
433
 
def get_mandos_dbus_object(bus):
434
 
    try:
435
 
        log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
436
 
                  dbus_busname, server_dbus_path)
437
 
        mandos_dbus_object = bus.get_object(dbus_busname,
438
 
                                            server_dbus_path)
439
 
    except dbus.exceptions.DBusException:
440
 
        log.critical("Could not connect to Mandos server")
441
 
        sys.exit(1)
442
 
 
443
 
    return mandos_dbus_object
444
 
 
445
 
 
446
 
class SilenceLogger(object):
447
 
    "Simple context manager to silence a particular logger"
448
 
    def __init__(self, loggername):
449
 
        self.logger = logging.getLogger(loggername)
450
 
 
451
 
    def __enter__(self):
452
 
        self.logger.addFilter(self.nullfilter)
453
 
        return self
454
 
 
455
 
    class NullFilter(logging.Filter):
456
 
        def filter(self, record):
457
 
            return False
458
 
 
459
 
    nullfilter = NullFilter()
460
 
 
461
 
    def __exit__(self, exc_type, exc_val, exc_tb):
462
 
        self.logger.removeFilter(self.nullfilter)
463
 
 
464
 
 
465
449
def commands_from_options(options):
466
450
 
467
451
    commands = []
584
568
        self.mandos.RemoveClient(client.__dbus_object_path__)
585
569
 
586
570
 
587
 
class OutputCmd(Command):
588
 
    """Abstract class for commands outputting client details"""
 
571
class PrintCmd(Command):
 
572
    """Abstract class for commands printing client details"""
589
573
    all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
590
574
                    "Created", "Interval", "Host", "KeyID",
591
575
                    "Fingerprint", "CheckerRunning", "LastEnabled",
593
577
                    "LastApprovalRequest", "ApprovalDelay",
594
578
                    "ApprovalDuration", "Checker", "ExtendedTimeout",
595
579
                    "Expires", "LastCheckerStatus")
596
 
 
597
580
    def run(self, clients, bus=None, mandos=None):
598
581
        print(self.output(clients.values()))
599
 
 
600
582
    def output(self, clients):
601
583
        raise NotImplementedError()
602
584
 
603
585
 
604
 
class DumpJSONCmd(OutputCmd):
 
586
class DumpJSONCmd(PrintCmd):
605
587
    def output(self, clients):
606
588
        data = {client["Name"]:
607
589
                {key: self.dbus_boolean_to_bool(client[key])
608
590
                 for key in self.all_keywords}
609
 
                for client in clients}
 
591
                for client in clients.values()}
610
592
        return json.dumps(data, indent=4, separators=(',', ': '))
611
 
 
612
593
    @staticmethod
613
594
    def dbus_boolean_to_bool(value):
614
595
        if isinstance(value, dbus.Boolean):
616
597
        return value
617
598
 
618
599
 
619
 
class PrintTableCmd(OutputCmd):
 
600
class PrintTableCmd(PrintCmd):
620
601
    def __init__(self, verbose=False):
621
602
        self.verbose = verbose
622
603
 
652
633
            "LastCheckerStatus": "Last Checker Status",
653
634
        }
654
635
 
655
 
        def __init__(self, clients, keywords):
 
636
        def __init__(self, clients, keywords, tableheaders=None):
656
637
            self.clients = clients
657
638
            self.keywords = keywords
 
639
            if tableheaders is not None:
 
640
                self.tableheaders = tableheaders
658
641
 
659
642
        def __str__(self):
660
643
            return "\n".join(self.rows())
713
696
 
714
697
class PropertyCmd(Command):
715
698
    """Abstract class for Actions for setting one client property"""
716
 
 
717
699
    def run_on_one_client(self, client, properties):
718
700
        """Set the Client's D-Bus property"""
719
701
        log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
725
707
        client.Set(client_dbus_interface, self.propname,
726
708
                   self.value_to_set,
727
709
                   dbus_interface=dbus.PROPERTIES_IFACE)
728
 
 
729
710
    @property
730
711
    def propname(self):
731
712
        raise NotImplementedError()
782
763
 
783
764
class SetSecretCmd(PropertyValueCmd):
784
765
    propname = "Secret"
785
 
 
786
766
    @property
787
767
    def value_to_set(self):
788
768
        return self._vts
789
 
 
790
769
    @value_to_set.setter
791
770
    def value_to_set(self, value):
792
771
        """When setting, read data from supplied file object"""
797
776
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
798
777
    """Abstract class for PropertyValueCmd taking a value argument as
799
778
a datetime.timedelta() but should store it as milliseconds."""
800
 
 
801
779
    @property
802
780
    def value_to_set(self):
803
781
        return self._vts
804
 
 
805
782
    @value_to_set.setter
806
783
    def value_to_set(self, value):
807
784
        """When setting, convert value from a datetime.timedelta"""
839
816
                         datetime.timedelta(0, 1))
840
817
        self.assertEqual(string_to_delta("PT2H"),
841
818
                         datetime.timedelta(0, 7200))
842
 
 
843
819
    def test_falls_back_to_pre_1_6_1_with_warning(self):
844
820
        # assertLogs only exists in Python 3.4
845
821
        if hasattr(self, "assertLogs"):
864
840
 
865
841
 
866
842
class Test_check_option_syntax(unittest.TestCase):
867
 
    def setUp(self):
868
 
        self.parser = argparse.ArgumentParser()
869
 
        add_command_line_options(self.parser)
870
 
 
871
 
    def test_actions_requires_client_or_all(self):
872
 
        for action, value in self.actions.items():
873
 
            options = self.parser.parse_args()
874
 
            setattr(options, action, value)
875
 
            with self.assertParseError():
876
 
                self.check_option_syntax(options)
877
 
 
878
843
    # This mostly corresponds to the definition from has_actions() in
879
844
    # check_option_syntax()
880
845
    actions = {
901
866
        "deny": True,
902
867
    }
903
868
 
 
869
    def setUp(self):
 
870
        self.parser = argparse.ArgumentParser()
 
871
        add_command_line_options(self.parser)
 
872
 
904
873
    @contextlib.contextmanager
905
874
    def assertParseError(self):
906
875
        with self.assertRaises(SystemExit) as e:
928
897
    def check_option_syntax(self, options):
929
898
        check_option_syntax(self.parser, options)
930
899
 
 
900
    def test_actions_requires_client_or_all(self):
 
901
        for action, value in self.actions.items():
 
902
            options = self.parser.parse_args()
 
903
            setattr(options, action, value)
 
904
            with self.assertParseError():
 
905
                self.check_option_syntax(options)
 
906
 
931
907
    def test_actions_conflicts_with_verbose(self):
932
908
        for action, value in self.actions.items():
933
909
            options = self.parser.parse_args()
995
971
                self.check_option_syntax(options)
996
972
 
997
973
 
998
 
class Test_get_mandos_dbus_object(unittest.TestCase):
999
 
    def test_calls_and_returns_get_object_on_bus(self):
1000
 
        class MockBus(object):
1001
 
            called = False
1002
 
            def get_object(mockbus_self, busname, dbus_path):
1003
 
                # Note that "self" is still the testcase instance,
1004
 
                # this MockBus instance is in "mockbus_self".
1005
 
                self.assertEqual(busname, dbus_busname)
1006
 
                self.assertEqual(dbus_path, server_dbus_path)
1007
 
                mockbus_self.called = True
1008
 
                return mockbus_self
1009
 
 
1010
 
        mockbus = get_mandos_dbus_object(bus=MockBus())
1011
 
        self.assertIsInstance(mockbus, MockBus)
1012
 
        self.assertTrue(mockbus.called)
1013
 
 
1014
 
    def test_logs_and_exits_on_dbus_error(self):
1015
 
        class MockBusFailing(object):
1016
 
            def get_object(self, busname, dbus_path):
1017
 
                raise dbus.exceptions.DBusException("Test")
1018
 
 
1019
 
        # assertLogs only exists in Python 3.4
1020
 
        if hasattr(self, "assertLogs"):
1021
 
            with self.assertLogs(log, logging.CRITICAL):
1022
 
                with self.assertRaises(SystemExit) as e:
1023
 
                    bus = get_mandos_dbus_object(bus=MockBus())
1024
 
        else:
1025
 
            critical_filter = self.CriticalFilter()
1026
 
            log.addFilter(critical_filter)
1027
 
            try:
1028
 
                with self.assertRaises(SystemExit) as e:
1029
 
                    get_mandos_dbus_object(bus=MockBusFailing())
1030
 
            finally:
1031
 
                log.removeFilter(critical_filter)
1032
 
            self.assertTrue(critical_filter.found)
1033
 
        if isinstance(e.exception.code, int):
1034
 
            self.assertNotEqual(e.exception.code, 0)
1035
 
        else:
1036
 
            self.assertIsNotNone(e.exception.code)
1037
 
 
1038
 
    class CriticalFilter(logging.Filter):
1039
 
        """Don't show, but register, critical messages"""
1040
 
        found = False
1041
 
        def filter(self, record):
1042
 
            is_critical = record.levelno >= logging.CRITICAL
1043
 
            self.found = is_critical or self.found
1044
 
            return not is_critical
1045
 
 
1046
 
 
1047
 
class Test_SilenceLogger(unittest.TestCase):
1048
 
    loggername = "mandos-ctl.Test_SilenceLogger"
1049
 
    log = logging.getLogger(loggername)
1050
 
    log.propagate = False
1051
 
    log.addHandler(logging.NullHandler())
1052
 
 
1053
 
    def setUp(self):
1054
 
        self.counting_filter = self.CountingFilter()
1055
 
 
1056
 
    class CountingFilter(logging.Filter):
1057
 
        "Count number of records"
1058
 
        count = 0
1059
 
        def filter(self, record):
1060
 
            self.count += 1
1061
 
            return True
1062
 
 
1063
 
    def test_should_filter_records_only_when_active(self):
1064
 
        try:
1065
 
            with SilenceLogger(self.loggername):
1066
 
                self.log.addFilter(self.counting_filter)
1067
 
                self.log.info("Filtered log message 1")
1068
 
            self.log.info("Non-filtered message 2")
1069
 
            self.log.info("Non-filtered message 3")
1070
 
        finally:
1071
 
            self.log.removeFilter(self.counting_filter)
1072
 
        self.assertEqual(self.counting_filter.count, 2)
1073
 
 
1074
 
 
1075
 
class Test_commands_from_options(unittest.TestCase):
 
974
class Test_command_from_options(unittest.TestCase):
1076
975
    def setUp(self):
1077
976
        self.parser = argparse.ArgumentParser()
1078
977
        add_command_line_options(self.parser)
1079
 
 
1080
 
    def test_is_enabled(self):
1081
 
        self.assert_command_from_args(["--is-enabled", "foo"],
1082
 
                                      IsEnabledCmd)
1083
 
 
1084
978
    def assert_command_from_args(self, args, command_cls,
1085
979
                                 **cmd_attrs):
1086
980
        """Assert that parsing ARGS should result in an instance of
1093
987
        self.assertIsInstance(command, command_cls)
1094
988
        for key, value in cmd_attrs.items():
1095
989
            self.assertEqual(getattr(command, key), value)
1096
 
 
1097
 
    def test_is_enabled_short(self):
1098
 
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1099
 
 
1100
 
    def test_approve(self):
1101
 
        self.assert_command_from_args(["--approve", "foo"],
1102
 
                                      ApproveCmd)
1103
 
 
1104
 
    def test_approve_short(self):
1105
 
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1106
 
 
1107
 
    def test_deny(self):
1108
 
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1109
 
 
1110
 
    def test_deny_short(self):
1111
 
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
1112
 
 
1113
 
    def test_remove(self):
1114
 
        self.assert_command_from_args(["--remove", "foo"],
1115
 
                                      RemoveCmd)
1116
 
 
1117
 
    def test_deny_before_remove(self):
1118
 
        options = self.parser.parse_args(["--deny", "--remove",
1119
 
                                          "foo"])
1120
 
        check_option_syntax(self.parser, options)
1121
 
        commands = commands_from_options(options)
1122
 
        self.assertEqual(len(commands), 2)
1123
 
        self.assertIsInstance(commands[0], DenyCmd)
1124
 
        self.assertIsInstance(commands[1], RemoveCmd)
1125
 
 
1126
 
    def test_deny_before_remove_reversed(self):
1127
 
        options = self.parser.parse_args(["--remove", "--deny",
1128
 
                                          "--all"])
1129
 
        check_option_syntax(self.parser, options)
1130
 
        commands = commands_from_options(options)
1131
 
        self.assertEqual(len(commands), 2)
1132
 
        self.assertIsInstance(commands[0], DenyCmd)
1133
 
        self.assertIsInstance(commands[1], RemoveCmd)
1134
 
 
1135
 
    def test_remove_short(self):
1136
 
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1137
 
 
1138
 
    def test_dump_json(self):
1139
 
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
 
990
    def test_print_table(self):
 
991
        self.assert_command_from_args([], PrintTableCmd,
 
992
                                      verbose=False)
 
993
 
 
994
    def test_print_table_verbose(self):
 
995
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
 
996
                                      verbose=True)
 
997
 
 
998
    def test_print_table_verbose_short(self):
 
999
        self.assert_command_from_args(["-v"], PrintTableCmd,
 
1000
                                      verbose=True)
1140
1001
 
1141
1002
    def test_enable(self):
1142
1003
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1166
1027
        self.assert_command_from_args(["--stop-checker", "foo"],
1167
1028
                                      StopCheckerCmd)
1168
1029
 
1169
 
    def test_approve_by_default(self):
1170
 
        self.assert_command_from_args(["--approve-by-default", "foo"],
1171
 
                                      ApproveByDefaultCmd)
 
1030
    def test_remove(self):
 
1031
        self.assert_command_from_args(["--remove", "foo"],
 
1032
                                      RemoveCmd)
1172
1033
 
1173
 
    def test_deny_by_default(self):
1174
 
        self.assert_command_from_args(["--deny-by-default", "foo"],
1175
 
                                      DenyByDefaultCmd)
 
1034
    def test_remove_short(self):
 
1035
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1176
1036
 
1177
1037
    def test_checker(self):
1178
1038
        self.assert_command_from_args(["--checker", ":", "foo"],
1186
1046
        self.assert_command_from_args(["-c", ":", "foo"],
1187
1047
                                      SetCheckerCmd, value_to_set=":")
1188
1048
 
1189
 
    def test_host(self):
1190
 
        self.assert_command_from_args(["--host", "foo.example.org",
1191
 
                                       "foo"], SetHostCmd,
1192
 
                                      value_to_set="foo.example.org")
1193
 
 
1194
 
    def test_host_short(self):
1195
 
        self.assert_command_from_args(["-H", "foo.example.org",
1196
 
                                       "foo"], SetHostCmd,
1197
 
                                      value_to_set="foo.example.org")
1198
 
 
1199
 
    def test_secret_devnull(self):
1200
 
        self.assert_command_from_args(["--secret", os.path.devnull,
1201
 
                                       "foo"], SetSecretCmd,
1202
 
                                      value_to_set=b"")
1203
 
 
1204
 
    def test_secret_tempfile(self):
1205
 
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
1206
 
            value = b"secret\0xyzzy\nbar"
1207
 
            f.write(value)
1208
 
            f.seek(0)
1209
 
            self.assert_command_from_args(["--secret", f.name,
1210
 
                                           "foo"], SetSecretCmd,
1211
 
                                          value_to_set=value)
1212
 
 
1213
 
    def test_secret_devnull_short(self):
1214
 
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1215
 
                                      SetSecretCmd, value_to_set=b"")
1216
 
 
1217
 
    def test_secret_tempfile_short(self):
1218
 
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
1219
 
            value = b"secret\0xyzzy\nbar"
1220
 
            f.write(value)
1221
 
            f.seek(0)
1222
 
            self.assert_command_from_args(["-s", f.name, "foo"],
1223
 
                                          SetSecretCmd,
1224
 
                                          value_to_set=value)
1225
 
 
1226
1049
    def test_timeout(self):
1227
1050
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1228
1051
                                      SetTimeoutCmd,
1249
1072
                                      SetIntervalCmd,
1250
1073
                                      value_to_set=120000)
1251
1074
 
 
1075
    def test_approve_by_default(self):
 
1076
        self.assert_command_from_args(["--approve-by-default", "foo"],
 
1077
                                      ApproveByDefaultCmd)
 
1078
 
 
1079
    def test_deny_by_default(self):
 
1080
        self.assert_command_from_args(["--deny-by-default", "foo"],
 
1081
                                      DenyByDefaultCmd)
 
1082
 
1252
1083
    def test_approval_delay(self):
1253
1084
        self.assert_command_from_args(["--approval-delay", "PT30S",
1254
1085
                                       "foo"], SetApprovalDelayCmd,
1259
1090
                                       "foo"], SetApprovalDurationCmd,
1260
1091
                                      value_to_set=1000)
1261
1092
 
1262
 
    def test_print_table(self):
1263
 
        self.assert_command_from_args([], PrintTableCmd,
1264
 
                                      verbose=False)
1265
 
 
1266
 
    def test_print_table_verbose(self):
1267
 
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
1268
 
                                      verbose=True)
1269
 
 
1270
 
    def test_print_table_verbose_short(self):
1271
 
        self.assert_command_from_args(["-v"], PrintTableCmd,
1272
 
                                      verbose=True)
 
1093
    def test_host(self):
 
1094
        self.assert_command_from_args(["--host", "foo.example.org",
 
1095
                                       "foo"], SetHostCmd,
 
1096
                                      value_to_set="foo.example.org")
 
1097
 
 
1098
    def test_host_short(self):
 
1099
        self.assert_command_from_args(["-H", "foo.example.org",
 
1100
                                       "foo"], SetHostCmd,
 
1101
                                      value_to_set="foo.example.org")
 
1102
 
 
1103
    def test_secret_devnull(self):
 
1104
        self.assert_command_from_args(["--secret", os.path.devnull,
 
1105
                                       "foo"], SetSecretCmd,
 
1106
                                      value_to_set=b"")
 
1107
 
 
1108
    def test_secret_tempfile(self):
 
1109
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1110
            value = b"secret\0xyzzy\nbar"
 
1111
            f.write(value)
 
1112
            f.seek(0)
 
1113
            self.assert_command_from_args(["--secret", f.name,
 
1114
                                           "foo"], SetSecretCmd,
 
1115
                                          value_to_set=value)
 
1116
 
 
1117
    def test_secret_devnull_short(self):
 
1118
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
 
1119
                                      SetSecretCmd, value_to_set=b"")
 
1120
 
 
1121
    def test_secret_tempfile_short(self):
 
1122
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1123
            value = b"secret\0xyzzy\nbar"
 
1124
            f.write(value)
 
1125
            f.seek(0)
 
1126
            self.assert_command_from_args(["-s", f.name, "foo"],
 
1127
                                          SetSecretCmd,
 
1128
                                          value_to_set=value)
 
1129
 
 
1130
    def test_approve(self):
 
1131
        self.assert_command_from_args(["--approve", "foo"],
 
1132
                                      ApproveCmd)
 
1133
 
 
1134
    def test_approve_short(self):
 
1135
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
 
1136
 
 
1137
    def test_deny(self):
 
1138
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
 
1139
 
 
1140
    def test_deny_short(self):
 
1141
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
 
1142
 
 
1143
    def test_dump_json(self):
 
1144
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
 
1145
 
 
1146
    def test_is_enabled(self):
 
1147
        self.assert_command_from_args(["--is-enabled", "foo"],
 
1148
                                      IsEnabledCmd)
 
1149
 
 
1150
    def test_is_enabled_short(self):
 
1151
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
 
1152
 
 
1153
    def test_deny_before_remove(self):
 
1154
        options = self.parser.parse_args(["--deny", "--remove",
 
1155
                                          "foo"])
 
1156
        check_option_syntax(self.parser, options)
 
1157
        commands = commands_from_options(options)
 
1158
        self.assertEqual(len(commands), 2)
 
1159
        self.assertIsInstance(commands[0], DenyCmd)
 
1160
        self.assertIsInstance(commands[1], RemoveCmd)
 
1161
 
 
1162
    def test_deny_before_remove_reversed(self):
 
1163
        options = self.parser.parse_args(["--remove", "--deny",
 
1164
                                          "--all"])
 
1165
        check_option_syntax(self.parser, options)
 
1166
        commands = commands_from_options(options)
 
1167
        self.assertEqual(len(commands), 2)
 
1168
        self.assertIsInstance(commands[0], DenyCmd)
 
1169
        self.assertIsInstance(commands[1], RemoveCmd)
1273
1170
 
1274
1171
 
1275
1172
class TestCmd(unittest.TestCase):
1276
1173
    """Abstract class for tests of command classes"""
1277
 
 
1278
1174
    def setUp(self):
1279
1175
        testcase = self
1280
1176
        class MockClient(object):
1352
1248
                ("/clients/barbar", self.other_client.attributes),
1353
1249
            ])
1354
1250
        self.one_client = {"/clients/foo": self.client.attributes}
1355
 
 
1356
1251
    @property
1357
1252
    def bus(self):
1358
1253
        class Bus(object):
1360
1255
            def get_object(client_bus_name, path):
1361
1256
                self.assertEqual(client_bus_name, dbus_busname)
1362
1257
                return {
1363
 
                    # Note: "self" here is the TestCmd instance, not
1364
 
                    # the Bus instance, since this is a static method!
1365
1258
                    "/clients/foo": self.client,
1366
1259
                    "/clients/barbar": self.other_client,
1367
1260
                }[path]
1374
1267
                                                      properties)
1375
1268
                            for client, properties
1376
1269
                            in self.clients.items()))
1377
 
 
1378
1270
    def test_is_enabled_run_exits_successfully(self):
1379
1271
        with self.assertRaises(SystemExit) as e:
1380
1272
            IsEnabledCmd().run(self.one_client)
1382
1274
            self.assertEqual(e.exception.code, 0)
1383
1275
        else:
1384
1276
            self.assertIsNone(e.exception.code)
1385
 
 
1386
1277
    def test_is_enabled_run_exits_with_failure(self):
1387
1278
        self.client.attributes["Enabled"] = dbus.Boolean(False)
1388
1279
        with self.assertRaises(SystemExit) as e:
1410
1301
            self.assertIn(("Approve", (False, client_dbus_interface)),
1411
1302
                          client.calls)
1412
1303
 
1413
 
 
1414
1304
class TestRemoveCmd(TestCmd):
1415
1305
    def test_remove(self):
1416
1306
        class MockMandos(object):
1480
1370
            },
1481
1371
        }
1482
1372
        return super(TestDumpJSONCmd, self).setUp()
1483
 
 
1484
1373
    def test_normal(self):
1485
 
        output = DumpJSONCmd().output(self.clients.values())
1486
 
        json_data = json.loads(output)
 
1374
        json_data = json.loads(DumpJSONCmd().output(self.clients))
1487
1375
        self.assertDictEqual(json_data, self.expected_json)
1488
 
 
1489
1376
    def test_one_client(self):
1490
 
        output = DumpJSONCmd().output(self.one_client.values())
1491
 
        json_data = json.loads(output)
 
1377
        clients = self.one_client
 
1378
        json_data = json.loads(DumpJSONCmd().output(clients))
1492
1379
        expected_json = {"foo": self.expected_json["foo"]}
1493
1380
        self.assertDictEqual(json_data, expected_json)
1494
1381
 
1502
1389
            "barbar Yes     00:05:00 2019-02-04T00:00:00  ",
1503
1390
        ))
1504
1391
        self.assertEqual(output, expected_output)
1505
 
 
1506
1392
    def test_verbose(self):
1507
1393
        output = PrintTableCmd(verbose=True).output(
1508
1394
            self.clients.values())
1597
1483
                                            for rows in columns)
1598
1484
                                    for line in range(num_lines))
1599
1485
        self.assertEqual(output, expected_output)
1600
 
 
1601
1486
    def test_one_client(self):
1602
1487
        output = PrintTableCmd().output(self.one_client.values())
1603
1488
        expected_output = "\n".join((
1607
1492
        self.assertEqual(output, expected_output)
1608
1493
 
1609
1494
 
 
1495
class Unique(object):
 
1496
    """Class for objects which exist only to be unique objects, since
 
1497
unittest.mock.sentinel only exists in Python 3.3"""
 
1498
 
 
1499
 
1610
1500
class TestPropertyCmd(TestCmd):
1611
1501
    """Abstract class for tests of PropertyCmd classes"""
1612
1502
    def runTest(self):
1619
1509
            for clientpath in self.clients:
1620
1510
                client = self.bus.get_object(dbus_busname, clientpath)
1621
1511
                old_value = client.attributes[self.propname]
1622
 
                self.assertNotIsInstance(old_value, self.Unique)
1623
 
                client.attributes[self.propname] = self.Unique()
 
1512
                self.assertNotIsInstance(old_value, Unique)
 
1513
                client.attributes[self.propname] = Unique()
1624
1514
            self.run_command(value_to_set, self.clients)
1625
1515
            for clientpath in self.clients:
1626
1516
                client = self.bus.get_object(dbus_busname, clientpath)
1627
1517
                value = client.attributes[self.propname]
1628
 
                self.assertNotIsInstance(value, self.Unique)
 
1518
                self.assertNotIsInstance(value, Unique)
1629
1519
                self.assertEqual(value, value_to_get)
1630
 
 
1631
 
    class Unique(object):
1632
 
        """Class for objects which exist only to be unique objects,
1633
 
since unittest.mock.sentinel only exists in Python 3.3"""
1634
 
 
1635
1520
    def run_command(self, value, clients):
1636
1521
        self.command().run(clients, self.bus)
1637
1522
 
1638
1523
 
1639
 
class TestEnableCmd(TestPropertyCmd):
1640
 
    command = EnableCmd
1641
 
    propname = "Enabled"
1642
 
    values_to_set = [dbus.Boolean(True)]
1643
 
 
1644
 
 
1645
 
class TestDisableCmd(TestPropertyCmd):
1646
 
    command = DisableCmd
1647
 
    propname = "Enabled"
1648
 
    values_to_set = [dbus.Boolean(False)]
 
1524
class TestEnableCmd(TestCmd):
 
1525
    def test_enable(self):
 
1526
        for clientpath in self.clients:
 
1527
            client = self.bus.get_object(dbus_busname, clientpath)
 
1528
            client.attributes["Enabled"] = False
 
1529
 
 
1530
        EnableCmd().run(self.clients, self.bus)
 
1531
 
 
1532
        for clientpath in self.clients:
 
1533
            client = self.bus.get_object(dbus_busname, clientpath)
 
1534
            self.assertTrue(client.attributes["Enabled"])
 
1535
 
 
1536
 
 
1537
class TestDisableCmd(TestCmd):
 
1538
    def test_disable(self):
 
1539
        DisableCmd().run(self.clients, self.bus)
 
1540
        for clientpath in self.clients:
 
1541
            client = self.bus.get_object(dbus_busname, clientpath)
 
1542
            self.assertFalse(client.attributes["Enabled"])
1649
1543
 
1650
1544
 
1651
1545
class TestBumpTimeoutCmd(TestPropertyCmd):
1680
1574
 
1681
1575
class TestPropertyValueCmd(TestPropertyCmd):
1682
1576
    """Abstract class for tests of PropertyValueCmd classes"""
1683
 
 
1684
1577
    def runTest(self):
1685
1578
        if type(self) is TestPropertyValueCmd:
1686
1579
            return
1687
1580
        return super(TestPropertyValueCmd, self).runTest()
1688
 
 
1689
1581
    def run_command(self, value, clients):
1690
1582
        self.command(value).run(clients, self.bus)
1691
1583
 
1783
1675
    return tests
1784
1676
 
1785
1677
if __name__ == "__main__":
1786
 
    try:
1787
 
        if should_only_run_tests():
1788
 
            # Call using ./tdd-python-script --check [--verbose]
1789
 
            unittest.main()
1790
 
        else:
1791
 
            main()
1792
 
    finally:
1793
 
        logging.shutdown()
 
1678
    if should_only_run_tests():
 
1679
        # Call using ./tdd-python-script --check [--verbose]
 
1680
        unittest.main()
 
1681
    else:
 
1682
        main()