/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-03 16:54:45 UTC
  • Revision ID: teddy@recompile.se-20190303165445-vw9ch8k4o4x1syc8
mandos-ctl: Add test for DumpJSONCmd class

* mandos-ctl (TestDumpJSONCmd): New.

Show diffs side-by-side

added added

removed removed

Lines of Context:
516
516
                options.approve,
517
517
                options.deny))
518
518
 
519
 
def add_command_line_options(parser):
 
519
 
 
520
def commands_and_clients_from_options(args=None):
 
521
    if args is None:
 
522
        args=sys.argv[1:]
 
523
    parser = argparse.ArgumentParser()
520
524
    parser.add_argument("--version", action="version",
521
525
                        version="%(prog)s {}".format(version),
522
526
                        help="show version number and exit")
579
583
    parser.add_argument("--check", action="store_true",
580
584
                        help="Run self-test")
581
585
    parser.add_argument("client", nargs="*", help="Client name")
582
 
 
583
 
 
584
 
def commands_and_clients_from_options(options):
 
586
    options = parser.parse_args(args=args)
 
587
 
 
588
    if has_actions(options) and not (options.client or options.all):
 
589
        parser.error("Options require clients names or --all.")
 
590
    if options.verbose and has_actions(options):
 
591
        parser.error("--verbose can only be used alone.")
 
592
    if options.dump_json and (options.verbose
 
593
                              or has_actions(options)):
 
594
        parser.error("--dump-json can only be used alone.")
 
595
    if options.all and not has_actions(options):
 
596
        parser.error("--all requires an action.")
 
597
    if options.is_enabled and len(options.client) > 1:
 
598
            parser.error("--is-enabled requires exactly one client")
585
599
 
586
600
    commands = []
587
601
 
656
670
 
657
671
 
658
672
def main():
659
 
    parser = argparse.ArgumentParser()
660
 
 
661
 
    add_command_line_options(parser)
662
 
 
663
 
    options = parser.parse_args()
664
 
 
665
 
    if has_actions(options) and not (options.client or options.all):
666
 
        parser.error("Options require clients names or --all.")
667
 
    if options.verbose and has_actions(options):
668
 
        parser.error("--verbose can only be used alone.")
669
 
    if options.dump_json and (options.verbose
670
 
                              or has_actions(options)):
671
 
        parser.error("--dump-json can only be used alone.")
672
 
    if options.all and not has_actions(options):
673
 
        parser.error("--all requires an action.")
674
 
    if options.is_enabled and len(options.client) > 1:
675
 
        parser.error("--is-enabled requires exactly one client")
676
 
 
677
 
    commands, clientnames = commands_and_clients_from_options(options)
 
673
    commands, clientnames = commands_and_clients_from_options()
678
674
 
679
675
    try:
680
676
        bus = dbus.SystemBus()
694
690
        def filter(self, record):
695
691
            return False
696
692
    dbus_filter = NullFilter()
 
693
    dbus_logger.addFilter(dbus_filter)
697
694
    try:
698
 
        dbus_logger.addFilter(dbus_filter)
699
 
        mandos_clients = {path: ifs_and_props[client_interface]
700
 
                          for path, ifs_and_props in
701
 
                          mandos_serv_object_manager
702
 
                          .GetManagedObjects().items()
703
 
                          if client_interface in ifs_and_props}
 
695
        try:
 
696
            mandos_clients = {path: ifs_and_props[client_interface]
 
697
                              for path, ifs_and_props in
 
698
                              mandos_serv_object_manager
 
699
                              .GetManagedObjects().items()
 
700
                              if client_interface in ifs_and_props}
 
701
        finally:
 
702
            # restore dbus logger
 
703
            dbus_logger.removeFilter(dbus_filter)
704
704
    except dbus.exceptions.DBusException as e:
705
705
        log.critical("Failed to access Mandos server through D-Bus:"
706
706
                     "\n%s", e)
707
707
        sys.exit(1)
708
 
    finally:
709
 
        # restore dbus logger
710
 
        dbus_logger.removeFilter(dbus_filter)
711
708
 
712
709
    # Compile dict of (clients: properties) to process
713
710
    clients = {}
780
777
                self.__dbus_object_path__ = "objpath_{}".format(name)
781
778
                self.attributes = attributes
782
779
                self.attributes["Name"] = name
783
 
                self.calls = []
784
 
            def Set(self, interface, property, value, dbus_interface):
 
780
            def Set(interface, property, value,
 
781
                    properties_interface):
785
782
                testcase.assertEqual(interface, client_interface)
786
 
                testcase.assertEqual(dbus_interface,
 
783
                testcase.assertEqual(properties_interface,
787
784
                                     dbus.PROPERTIES_IFACE)
788
785
                self.attributes[property] = value
789
 
                self.calls.append(("Set", (interface, property, value,
790
 
                                           dbus_interface)))
791
 
            def Get(self, interface, property, dbus_interface):
 
786
            def Get(interface, property, properties_interface):
792
787
                testcase.assertEqual(interface, client_interface)
793
 
                testcase.assertEqual(dbus_interface,
 
788
                testcase.assertEqual(properties_interface,
794
789
                                     dbus.PROPERTIES_IFACE)
795
 
                self.calls.append(("Get", (interface, property,
796
 
                                           dbus_interface)))
797
790
                return self.attributes[property]
798
791
            def __getitem__(self, key):
799
792
                return self.attributes[key]
800
 
            def __setitem__(self, key, value):
801
 
                self.attributes[key] = value
802
793
        self.clients = collections.OrderedDict([
803
794
            ("foo",
804
795
             MockClient(
939
930
        expected_json = {"foo": self.expected_json["foo"]}
940
931
        self.assertDictEqual(json_data, expected_json)
941
932
 
942
 
class TestIsEnabledCmd(TestCmd):
943
 
    def test_is_enabled(self):
944
 
        self.assertTrue(all(IsEnabledCmd().is_enabled(client)
945
 
                            for client in self.clients.values()))
946
 
    def test_is_enabled_does_get_attribute(self):
947
 
        client = self.clients["foo"]
948
 
        self.assertTrue(IsEnabledCmd().is_enabled(client))
949
 
        self.assertListEqual(client.calls,
950
 
                             [("Get",
951
 
                               ("se.recompile.Mandos.Client",
952
 
                                "Enabled",
953
 
                                "org.freedesktop.DBus.Properties"))])
954
 
    def test_is_enabled_run_exits_successfully(self):
955
 
        client = self.clients["foo"]
956
 
        with self.assertRaises(SystemExit) as e:
957
 
            IsEnabledCmd().run(None, [client])
958
 
        if e.exception.code is not None:
959
 
            self.assertEqual(e.exception.code, 0)
960
 
        else:
961
 
            self.assertIsNone(e.exception.code)
962
 
    def test_is_enabled_run_exits_with_failure(self):
963
 
        client = self.clients["foo"]
964
 
        client["Enabled"] = dbus.Boolean(False)
965
 
        with self.assertRaises(SystemExit) as e:
966
 
            IsEnabledCmd().run(None, [client])
967
 
        if isinstance(e.exception.code, int):
968
 
            self.assertNotEqual(e.exception.code, 0)
969
 
        else:
970
 
            self.assertIsNotNone(e.exception.code)
971
 
 
972
 
 
973
 
class TestRemoveCmd(TestCmd):
974
 
    def test_remove(self):
975
 
        client = self.clients["foo"]
976
 
        class MockMandos(object):
977
 
            def __init__(self):
978
 
                self.calls = []
979
 
            def RemoveClient(self, dbus_path):
980
 
                self.calls.append(("RemoveClient", (dbus_path,)))
981
 
        mandos = MockMandos()
982
 
        RemoveCmd().run(mandos, [client])
983
 
        self.assertEqual(len(mandos.calls), 1)
984
 
        self.assertListEqual(mandos.calls,
985
 
                             [("RemoveClient",
986
 
                               (client.__dbus_object_path__,))])
987
 
 
988
 
 
989
933
 
990
934
def should_only_run_tests():
991
935
    parser = argparse.ArgumentParser(add_help=False)