/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-03 14:04:14 UTC
  • Revision ID: teddy@recompile.se-20190303140414-4jspvi9mi83f9cpl
mandos-ctl: Refactor

* mandos-ctl (Command.run): Take a "mandos" argument and save it.
  (PrintCmd.run): Take, but ignore, a "mandos" argument.
  (RemoveCmd.__init__): Remove.
  (main): Don't pass mandos_serv to RemoveCmd constructor.  Instead,
          always pass mandos_serv to command.run().

Show diffs side-by-side

added added

removed removed

Lines of Context:
515
515
                options.deny))
516
516
 
517
517
 
518
 
def commands_and_clients_from_options(args=None):
519
 
    if args is None:
520
 
        args=sys.argv[1:]
 
518
def main():
521
519
    parser = argparse.ArgumentParser()
522
520
    parser.add_argument("--version", action="version",
523
521
                        version="%(prog)s {}".format(version),
581
579
    parser.add_argument("--check", action="store_true",
582
580
                        help="Run self-test")
583
581
    parser.add_argument("client", nargs="*", help="Client name")
584
 
    options = parser.parse_args(args=args)
 
582
    options = parser.parse_args()
585
583
 
586
584
    if has_actions(options) and not (options.client or options.all):
587
585
        parser.error("Options require clients names or --all.")
595
593
    if options.is_enabled and len(options.client) > 1:
596
594
            parser.error("--is-enabled requires exactly one client")
597
595
 
 
596
    try:
 
597
        bus = dbus.SystemBus()
 
598
        mandos_dbus_objc = bus.get_object(busname, server_path)
 
599
    except dbus.exceptions.DBusException:
 
600
        log.critical("Could not connect to Mandos server")
 
601
        sys.exit(1)
 
602
 
 
603
    mandos_serv = dbus.Interface(mandos_dbus_objc,
 
604
                                 dbus_interface=server_interface)
 
605
    mandos_serv_object_manager = dbus.Interface(
 
606
        mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
 
607
 
598
608
    commands = []
599
609
 
600
610
    if options.dump_json:
664
674
    if not commands:
665
675
        commands.append(PrintTableCmd(verbose=options.verbose))
666
676
 
667
 
    return commands, options.client
668
 
 
669
 
 
670
 
def main():
671
 
    commands, clientnames = commands_and_clients_from_options()
672
 
 
673
 
    try:
674
 
        bus = dbus.SystemBus()
675
 
        mandos_dbus_objc = bus.get_object(busname, server_path)
676
 
    except dbus.exceptions.DBusException:
677
 
        log.critical("Could not connect to Mandos server")
678
 
        sys.exit(1)
679
 
 
680
 
    mandos_serv = dbus.Interface(mandos_dbus_objc,
681
 
                                 dbus_interface=server_interface)
682
 
    mandos_serv_object_manager = dbus.Interface(
683
 
        mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
684
 
 
685
677
    # Filter out log message from dbus module
686
678
    dbus_logger = logging.getLogger("dbus.proxies")
687
679
    class NullFilter(logging.Filter):
707
699
    # Compile dict of (clients: properties) to process
708
700
    clients = {}
709
701
 
710
 
    if not clientnames:
 
702
    if options.all or not options.client:
711
703
        clients = {bus.get_object(busname, path): properties
712
704
                   for path, properties in mandos_clients.items()}
713
705
    else:
714
 
        for name in clientnames:
 
706
        for name in options.client:
715
707
            for path, client in mandos_clients.items():
716
708
                if client["Name"] == name:
717
709
                    client_objc = bus.get_object(busname, path)
765
757
            self.assertTrue(getattr(warning_filter, "found", False))
766
758
        self.assertEqual(value, datetime.timedelta(0, 7200))
767
759
 
768
 
 
769
 
class TestCmd(unittest.TestCase):
770
 
    """Abstract class for tests of command classes"""
 
760
class Test_TableOfClients(unittest.TestCase):
771
761
    def setUp(self):
772
 
        testcase = self
773
 
        class MockClient(object):
774
 
            def __init__(self, name, **attributes):
775
 
                self.__dbus_object_path__ = "objpath_{}".format(name)
776
 
                self.attributes = attributes
777
 
                self.attributes["Name"] = name
778
 
            def Set(interface, property, value,
779
 
                    properties_interface):
780
 
                testcase.assertEqual(interface, client_interface)
781
 
                testcase.assertEqual(properties_interface,
782
 
                                     dbus.PROPERTIES_IFACE)
783
 
                self.attributes[property] = value
784
 
            def Get(interface, property, properties_interface):
785
 
                testcase.assertEqual(interface, client_interface)
786
 
                testcase.assertEqual(properties_interface,
787
 
                                     dbus.PROPERTIES_IFACE)
788
 
                return self.attributes[property]
789
 
            def __getitem__(self, key):
790
 
                return self.attributes[key]
791
 
        self.clients = collections.OrderedDict([
792
 
            ("foo",
793
 
             MockClient(
794
 
                 "foo",
795
 
                 KeyID=("92ed150794387c03ce684574b1139a65"
796
 
                        "94a34f895daaaf09fd8ea90a27cddb12"),
797
 
                 Secret=b"secret",
798
 
                 Host="foo.example.org",
799
 
                 Enabled=dbus.Boolean(True),
800
 
                 Timeout=300000,
801
 
                 LastCheckedOK="2019-02-03T00:00:00",
802
 
                 Created="2019-01-02T00:00:00",
803
 
                 Interval=120000,
804
 
                 Fingerprint=("778827225BA7DE539C5A"
805
 
                              "7CFA59CFF7CDBD9A5920"),
806
 
                 CheckerRunning=dbus.Boolean(False),
807
 
                 LastEnabled="2019-01-03T00:00:00",
808
 
                 ApprovalPending=dbus.Boolean(False),
809
 
                 ApprovedByDefault=dbus.Boolean(True),
810
 
                 LastApprovalRequest="",
811
 
                 ApprovalDelay=0,
812
 
                 ApprovalDuration=1000,
813
 
                 Checker="fping -q -- %(host)s",
814
 
                 ExtendedTimeout=900000,
815
 
                 Expires="2019-02-04T00:00:00",
816
 
                 LastCheckerStatus=0)),
817
 
            ("barbar",
818
 
             MockClient(
819
 
                 "barbar",
820
 
                 KeyID=("0558568eedd67d622f5c83b35a115f79"
821
 
                        "6ab612cff5ad227247e46c2b020f441c"),
822
 
                 Secret=b"secretbar",
823
 
                 Host="192.0.2.3",
824
 
                 Enabled=dbus.Boolean(True),
825
 
                 Timeout=300000,
826
 
                 LastCheckedOK="2019-02-04T00:00:00",
827
 
                 Created="2019-01-03T00:00:00",
828
 
                 Interval=120000,
829
 
                 Fingerprint=("3E393AEAEFB84C7E89E2"
830
 
                              "F547B3A107558FCA3A27"),
831
 
                 CheckerRunning=dbus.Boolean(True),
832
 
                 LastEnabled="2019-01-04T00:00:00",
833
 
                 ApprovalPending=dbus.Boolean(False),
834
 
                 ApprovedByDefault=dbus.Boolean(False),
835
 
                 LastApprovalRequest="2019-01-03T00:00:00",
836
 
                 ApprovalDelay=30000,
837
 
                 ApprovalDuration=1000,
838
 
                 Checker=":",
839
 
                 ExtendedTimeout=900000,
840
 
                 Expires="2019-02-05T00:00:00",
841
 
                 LastCheckerStatus=-2)),
842
 
            ])
843
 
 
844
 
class TestPrintTableCmd(TestCmd):
845
 
    def test_normal(self):
846
 
        output = PrintTableCmd().output(self.clients)
847
 
        expected_output = """
848
 
Name   Enabled Timeout  Last Successful Check
849
 
foo    Yes     00:05:00 2019-02-03T00:00:00  
850
 
barbar Yes     00:05:00 2019-02-04T00:00:00  
851
 
"""[1:-1]
852
 
        self.assertEqual(output, expected_output)
853
 
    def test_verbose(self):
854
 
        output = PrintTableCmd(verbose=True).output(self.clients)
855
 
        expected_output = """
856
 
Name   Enabled Timeout  Last Successful Check Created             Interval Host            Key ID                                                           Fingerprint                              Check Is Running Last Enabled        Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker              Extended Timeout Expires             Last Checker Status
857
 
foo    Yes     00:05:00 2019-02-03T00:00:00   2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No               2019-01-03T00:00:00 No                  Yes                                       00:00:00       00:00:01          fping -q -- %(host)s 00:15:00         2019-02-04T00:00:00 0                  
858
 
barbar Yes     00:05:00 2019-02-04T00:00:00   2019-01-03T00:00:00 00:02:00 192.0.2.3       0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes              2019-01-04T00:00:00 No                  No                  2019-01-03T00:00:00   00:00:30       00:00:01          :                    00:15:00         2019-02-05T00:00:00 -2                 
859
 
"""[1:-1]
860
 
        self.assertEqual(output, expected_output)
861
 
    def test_one_client(self):
862
 
        output = PrintTableCmd().output({"foo": self.clients["foo"]})
863
 
        expected_output = """
864
 
Name Enabled Timeout  Last Successful Check
865
 
foo  Yes     00:05:00 2019-02-03T00:00:00  
866
 
"""[1:-1]
867
 
        self.assertEqual(output, expected_output)
 
762
        self.tableheaders = {
 
763
            "Attr1": "X",
 
764
            "AttrTwo": "Yy",
 
765
            "AttrThree": "Zzz",
 
766
            "Bool": "A D-BUS Boolean",
 
767
            "NonDbusBoolean": "A Non-D-BUS Boolean",
 
768
            "Integer": "An Integer",
 
769
            "Timeout": "Timedelta 1",
 
770
            "Interval": "Timedelta 2",
 
771
            "ApprovalDelay": "Timedelta 3",
 
772
            "ApprovalDuration": "Timedelta 4",
 
773
            "ExtendedTimeout": "Timedelta 5",
 
774
            "String": "A String",
 
775
        }
 
776
        self.keywords = ["Attr1", "AttrTwo"]
 
777
        self.clients = [
 
778
            {
 
779
                "Attr1": "x1",
 
780
                "AttrTwo": "y1",
 
781
                "AttrThree": "z1",
 
782
                "Bool": dbus.Boolean(False),
 
783
                "NonDbusBoolean": False,
 
784
                "Integer": 0,
 
785
                "Timeout": 0,
 
786
                "Interval": 1000,
 
787
                "ApprovalDelay": 2000,
 
788
                "ApprovalDuration": 3000,
 
789
                "ExtendedTimeout": 4000,
 
790
                "String": "",
 
791
            },
 
792
            {
 
793
                "Attr1": "x2",
 
794
                "AttrTwo": "y2",
 
795
                "AttrThree": "z2",
 
796
                "Bool": dbus.Boolean(True),
 
797
                "NonDbusBoolean": True,
 
798
                "Integer": 1,
 
799
                "Timeout": 93785000,
 
800
                "Interval": 93786000,
 
801
                "ApprovalDelay": 93787000,
 
802
                "ApprovalDuration": 93788000,
 
803
                "ExtendedTimeout": 93789000,
 
804
                "String": "A huge string which will not fit," * 10,
 
805
            },
 
806
        ]
 
807
    def test_short_header(self):
 
808
        text = str(TableOfClients(self.clients, self.keywords,
 
809
                                  self.tableheaders))
 
810
        expected_text = """
 
811
X  Yy
 
812
x1 y1
 
813
x2 y2
 
814
"""[1:-1]
 
815
        self.assertEqual(text, expected_text)
 
816
    def test_booleans(self):
 
817
        keywords = ["Bool", "NonDbusBoolean"]
 
818
        text = str(TableOfClients(self.clients, keywords,
 
819
                                  self.tableheaders))
 
820
        expected_text = """
 
821
A D-BUS Boolean A Non-D-BUS Boolean
 
822
No              False              
 
823
Yes             True               
 
824
"""[1:-1]
 
825
        self.assertEqual(text, expected_text)
 
826
    def test_milliseconds_detection(self):
 
827
        keywords = ["Integer", "Timeout", "Interval", "ApprovalDelay",
 
828
                    "ApprovalDuration", "ExtendedTimeout"]
 
829
        text = str(TableOfClients(self.clients, keywords,
 
830
                                  self.tableheaders))
 
831
        expected_text = """
 
832
An Integer Timedelta 1 Timedelta 2 Timedelta 3 Timedelta 4 Timedelta 5
 
833
0          00:00:00    00:00:01    00:00:02    00:00:03    00:00:04   
 
834
1          1T02:03:05  1T02:03:06  1T02:03:07  1T02:03:08  1T02:03:09 
 
835
"""[1:-1]
 
836
        self.assertEqual(text, expected_text)
 
837
    def test_empty_and_long_string_values(self):
 
838
        keywords = ["String"]
 
839
        text = str(TableOfClients(self.clients, keywords,
 
840
                                  self.tableheaders))
 
841
        expected_text = """
 
842
A String                                                                                                                                                                                                                                                                                                                                  
 
843
                                                                                                                                                                                                                                                                                                                                          
 
844
A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,
 
845
"""[1:-1]
 
846
        self.assertEqual(text, expected_text)
868
847
 
869
848
 
870
849