/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-06 19:02:46 UTC
  • mto: (237.7.594 trunk)
  • mto: This revision was merged to the branch mainline in revision 382.
  • Revision ID: teddy@recompile.se-20190306190246-5mdmymvq62qpba3y
mandos-ctl: Refactor test

* mandos-ctl (commands_from_args): Removed.
  (TestOptions.assert_command_from_args): New.

Show diffs side-by-side

added added

removed removed

Lines of Context:
278
278
        commands which want to operate on all clients at the same time
279
279
        can override this run() method instead."""
280
280
        self.mandos = mandos
281
 
        for client in clients:
282
 
            self.run_on_one_client(client)
 
281
        for client, properties in clients.items():
 
282
            self.run_on_one_client(client, properties)
283
283
 
284
284
class PrintCmd(Command):
285
285
    """Abstract class for commands printing client details"""
295
295
 
296
296
class PropertyCmd(Command):
297
297
    """Abstract class for Actions for setting one client property"""
298
 
    def run_on_one_client(self, client):
 
298
    def run_on_one_client(self, client, properties):
299
299
        """Set the Client's D-Bus property"""
300
300
        client.Set(client_interface, self.property, self.value_to_set,
301
301
                   dbus_interface=dbus.PROPERTIES_IFACE)
323
323
        self.verbose = verbose
324
324
 
325
325
    def output(self, clients):
 
326
        default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
 
327
        keywords = default_keywords
326
328
        if self.verbose:
327
329
            keywords = self.all_keywords
328
 
        else:
329
 
            keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
330
330
        return str(self.TableOfClients(clients.values(), keywords))
331
331
 
332
332
    class TableOfClients(object):
419
419
        return value
420
420
 
421
421
class IsEnabledCmd(Command):
422
 
    def run_on_one_client(self, client):
423
 
        if self.is_enabled(client):
 
422
    def run_on_one_client(self, client, properties):
 
423
        if self.is_enabled(client, properties):
424
424
            sys.exit(0)
425
425
        sys.exit(1)
426
 
    def is_enabled(self, client):
427
 
        return client.Get(client_interface, "Enabled",
428
 
                          dbus_interface=dbus.PROPERTIES_IFACE)
 
426
    def is_enabled(self, client, properties):
 
427
        return bool(properties["Enabled"])
429
428
 
430
429
class RemoveCmd(Command):
431
 
    def run_on_one_client(self, client):
 
430
    def run_on_one_client(self, client, properties):
432
431
        self.mandos.RemoveClient(client.__dbus_object_path__)
433
432
 
434
433
class ApproveCmd(Command):
435
 
    def run_on_one_client(self, client):
 
434
    def run_on_one_client(self, client, properties):
436
435
        client.Approve(dbus.Boolean(True),
437
436
                       dbus_interface=client_interface)
438
437
 
439
438
class DenyCmd(Command):
440
 
    def run_on_one_client(self, client):
 
439
    def run_on_one_client(self, client, properties):
441
440
        client.Approve(dbus.Boolean(False),
442
441
                       dbus_interface=client_interface)
443
442
 
581
580
    parser.add_argument("client", nargs="*", help="Client name")
582
581
 
583
582
 
584
 
def commands_and_clients_from_options(options):
 
583
def commands_from_options(options):
585
584
 
586
585
    commands = []
587
586
 
595
594
        commands.append(DisableCmd())
596
595
 
597
596
    if options.bump_timeout:
598
 
        commands.append(BumpTimeoutCmd(options.bump_timeout))
 
597
        commands.append(BumpTimeoutCmd())
599
598
 
600
599
    if options.start_checker:
601
600
        commands.append(StartCheckerCmd())
652
651
    if not commands:
653
652
        commands.append(PrintTableCmd(verbose=options.verbose))
654
653
 
655
 
    return commands, options.client
 
654
    return commands
656
655
 
657
656
 
658
657
def main():
674
673
    if options.is_enabled and len(options.client) > 1:
675
674
        parser.error("--is-enabled requires exactly one client")
676
675
 
677
 
    commands, clientnames = commands_and_clients_from_options(options)
 
676
    clientnames = options.client
678
677
 
679
678
    try:
680
679
        bus = dbus.SystemBus()
727
726
                sys.exit(1)
728
727
 
729
728
    # Run all commands on clients
 
729
    commands = commands_from_options(options)
730
730
    for command in commands:
731
731
        command.run(mandos_serv, clients)
732
732
 
746
746
 
747
747
class Test_string_to_delta(unittest.TestCase):
748
748
    def test_handles_basic_rfc3339(self):
 
749
        self.assertEqual(string_to_delta("PT0S"),
 
750
                         datetime.timedelta())
 
751
        self.assertEqual(string_to_delta("P0D"),
 
752
                         datetime.timedelta())
 
753
        self.assertEqual(string_to_delta("PT1S"),
 
754
                         datetime.timedelta(0, 1))
749
755
        self.assertEqual(string_to_delta("PT2H"),
750
756
                         datetime.timedelta(0, 7200))
751
757
    def test_falls_back_to_pre_1_6_1_with_warning(self):
786
792
                testcase.assertEqual(dbus_interface,
787
793
                                     dbus.PROPERTIES_IFACE)
788
794
                self.attributes[property] = value
789
 
                self.calls.append(("Set", (interface, property, value,
790
 
                                           dbus_interface)))
791
795
            def Get(self, interface, property, dbus_interface):
792
796
                testcase.assertEqual(interface, client_interface)
793
797
                testcase.assertEqual(dbus_interface,
794
798
                                     dbus.PROPERTIES_IFACE)
795
 
                self.calls.append(("Get", (interface, property,
796
 
                                           dbus_interface)))
797
799
                return self.attributes[property]
798
 
            def __getitem__(self, key):
799
 
                return self.attributes[key]
800
 
            def __setitem__(self, key, value):
801
 
                self.attributes[key] = value
802
 
        self.clients = collections.OrderedDict([
803
 
            ("foo",
804
 
             MockClient(
805
 
                 "foo",
806
 
                 KeyID=("92ed150794387c03ce684574b1139a65"
807
 
                        "94a34f895daaaf09fd8ea90a27cddb12"),
808
 
                 Secret=b"secret",
809
 
                 Host="foo.example.org",
810
 
                 Enabled=dbus.Boolean(True),
811
 
                 Timeout=300000,
812
 
                 LastCheckedOK="2019-02-03T00:00:00",
813
 
                 Created="2019-01-02T00:00:00",
814
 
                 Interval=120000,
815
 
                 Fingerprint=("778827225BA7DE539C5A"
816
 
                              "7CFA59CFF7CDBD9A5920"),
817
 
                 CheckerRunning=dbus.Boolean(False),
818
 
                 LastEnabled="2019-01-03T00:00:00",
819
 
                 ApprovalPending=dbus.Boolean(False),
820
 
                 ApprovedByDefault=dbus.Boolean(True),
821
 
                 LastApprovalRequest="",
822
 
                 ApprovalDelay=0,
823
 
                 ApprovalDuration=1000,
824
 
                 Checker="fping -q -- %(host)s",
825
 
                 ExtendedTimeout=900000,
826
 
                 Expires="2019-02-04T00:00:00",
827
 
                 LastCheckerStatus=0)),
828
 
            ("barbar",
829
 
             MockClient(
830
 
                 "barbar",
831
 
                 KeyID=("0558568eedd67d622f5c83b35a115f79"
832
 
                        "6ab612cff5ad227247e46c2b020f441c"),
833
 
                 Secret=b"secretbar",
834
 
                 Host="192.0.2.3",
835
 
                 Enabled=dbus.Boolean(True),
836
 
                 Timeout=300000,
837
 
                 LastCheckedOK="2019-02-04T00:00:00",
838
 
                 Created="2019-01-03T00:00:00",
839
 
                 Interval=120000,
840
 
                 Fingerprint=("3E393AEAEFB84C7E89E2"
841
 
                              "F547B3A107558FCA3A27"),
842
 
                 CheckerRunning=dbus.Boolean(True),
843
 
                 LastEnabled="2019-01-04T00:00:00",
844
 
                 ApprovalPending=dbus.Boolean(False),
845
 
                 ApprovedByDefault=dbus.Boolean(False),
846
 
                 LastApprovalRequest="2019-01-03T00:00:00",
847
 
                 ApprovalDelay=30000,
848
 
                 ApprovalDuration=1000,
849
 
                 Checker=":",
850
 
                 ExtendedTimeout=900000,
851
 
                 Expires="2019-02-05T00:00:00",
852
 
                 LastCheckerStatus=-2)),
 
800
            def Approve(self, approve, dbus_interface):
 
801
                testcase.assertEqual(dbus_interface, client_interface)
 
802
                self.calls.append(("Approve", (approve,
 
803
                                               dbus_interface)))
 
804
        self.client = MockClient(
 
805
            "foo",
 
806
            KeyID=("92ed150794387c03ce684574b1139a65"
 
807
                   "94a34f895daaaf09fd8ea90a27cddb12"),
 
808
            Secret=b"secret",
 
809
            Host="foo.example.org",
 
810
            Enabled=dbus.Boolean(True),
 
811
            Timeout=300000,
 
812
            LastCheckedOK="2019-02-03T00:00:00",
 
813
            Created="2019-01-02T00:00:00",
 
814
            Interval=120000,
 
815
            Fingerprint=("778827225BA7DE539C5A"
 
816
                         "7CFA59CFF7CDBD9A5920"),
 
817
            CheckerRunning=dbus.Boolean(False),
 
818
            LastEnabled="2019-01-03T00:00:00",
 
819
            ApprovalPending=dbus.Boolean(False),
 
820
            ApprovedByDefault=dbus.Boolean(True),
 
821
            LastApprovalRequest="",
 
822
            ApprovalDelay=0,
 
823
            ApprovalDuration=1000,
 
824
            Checker="fping -q -- %(host)s",
 
825
            ExtendedTimeout=900000,
 
826
            Expires="2019-02-04T00:00:00",
 
827
            LastCheckerStatus=0)
 
828
        self.other_client = MockClient(
 
829
            "barbar",
 
830
            KeyID=("0558568eedd67d622f5c83b35a115f79"
 
831
                   "6ab612cff5ad227247e46c2b020f441c"),
 
832
            Secret=b"secretbar",
 
833
            Host="192.0.2.3",
 
834
            Enabled=dbus.Boolean(True),
 
835
            Timeout=300000,
 
836
            LastCheckedOK="2019-02-04T00:00:00",
 
837
            Created="2019-01-03T00:00:00",
 
838
            Interval=120000,
 
839
            Fingerprint=("3E393AEAEFB84C7E89E2"
 
840
                         "F547B3A107558FCA3A27"),
 
841
            CheckerRunning=dbus.Boolean(True),
 
842
            LastEnabled="2019-01-04T00:00:00",
 
843
            ApprovalPending=dbus.Boolean(False),
 
844
            ApprovedByDefault=dbus.Boolean(False),
 
845
            LastApprovalRequest="2019-01-03T00:00:00",
 
846
            ApprovalDelay=30000,
 
847
            ApprovalDuration=1000,
 
848
            Checker=":",
 
849
            ExtendedTimeout=900000,
 
850
            Expires="2019-02-05T00:00:00",
 
851
            LastCheckerStatus=-2)
 
852
        self.clients =  collections.OrderedDict(
 
853
            [
 
854
                (self.client, self.client.attributes),
 
855
                (self.other_client, self.other_client.attributes),
853
856
            ])
 
857
        self.one_client = {self.client: self.client.attributes}
854
858
 
855
859
class TestPrintTableCmd(TestCmd):
856
860
    def test_normal(self):
870
874
"""[1:-1]
871
875
        self.assertEqual(output, expected_output)
872
876
    def test_one_client(self):
873
 
        output = PrintTableCmd().output({"foo": self.clients["foo"]})
 
877
        output = PrintTableCmd().output(self.one_client)
874
878
        expected_output = """
875
879
Name Enabled Timeout  Last Successful Check
876
880
foo  Yes     00:05:00 2019-02-03T00:00:00  
934
938
        json_data = json.loads(DumpJSONCmd().output(self.clients))
935
939
        self.assertDictEqual(json_data, self.expected_json)
936
940
    def test_one_client(self):
937
 
        clients = {"foo": self.clients["foo"]}
 
941
        clients = self.one_client
938
942
        json_data = json.loads(DumpJSONCmd().output(clients))
939
943
        expected_json = {"foo": self.expected_json["foo"]}
940
944
        self.assertDictEqual(json_data, expected_json)
941
945
 
942
946
class TestIsEnabledCmd(TestCmd):
943
947
    def test_is_enabled(self):
944
 
        self.assertTrue(all(IsEnabledCmd().is_enabled(client)
945
 
                            for client in self.clients.values()))
946
 
    def test_is_enabled_does_get_attribute(self):
947
 
        client = self.clients["foo"]
948
 
        self.assertTrue(IsEnabledCmd().is_enabled(client))
949
 
        self.assertListEqual(client.calls,
950
 
                             [("Get",
951
 
                               ("se.recompile.Mandos.Client",
952
 
                                "Enabled",
953
 
                                "org.freedesktop.DBus.Properties"))])
 
948
        self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
 
949
                            for client, properties in self.clients.items()))
954
950
    def test_is_enabled_run_exits_successfully(self):
955
 
        client = self.clients["foo"]
956
951
        with self.assertRaises(SystemExit) as e:
957
 
            IsEnabledCmd().run(None, [client])
 
952
            IsEnabledCmd().run(None, self.one_client)
958
953
        if e.exception.code is not None:
959
954
            self.assertEqual(e.exception.code, 0)
960
955
        else:
961
956
            self.assertIsNone(e.exception.code)
962
957
    def test_is_enabled_run_exits_with_failure(self):
963
 
        client = self.clients["foo"]
964
 
        client["Enabled"] = dbus.Boolean(False)
 
958
        self.client.attributes["Enabled"] = dbus.Boolean(False)
965
959
        with self.assertRaises(SystemExit) as e:
966
 
            IsEnabledCmd().run(None, [client])
 
960
            IsEnabledCmd().run(None, self.one_client)
967
961
        if isinstance(e.exception.code, int):
968
962
            self.assertNotEqual(e.exception.code, 0)
969
963
        else:
970
964
            self.assertIsNotNone(e.exception.code)
971
965
 
972
 
 
973
966
class TestRemoveCmd(TestCmd):
974
967
    def test_remove(self):
975
 
        client = self.clients["foo"]
976
968
        class MockMandos(object):
977
969
            def __init__(self):
978
970
                self.calls = []
979
971
            def RemoveClient(self, dbus_path):
980
972
                self.calls.append(("RemoveClient", (dbus_path,)))
981
973
        mandos = MockMandos()
982
 
        RemoveCmd().run(mandos, [client])
983
 
        self.assertEqual(len(mandos.calls), 1)
984
 
        self.assertListEqual(mandos.calls,
985
 
                             [("RemoveClient",
986
 
                               (client.__dbus_object_path__,))])
 
974
        super(TestRemoveCmd, self).setUp()
 
975
        RemoveCmd().run(mandos, self.clients)
 
976
        self.assertEqual(len(mandos.calls), 2)
 
977
        for client in self.clients:
 
978
            self.assertIn(("RemoveClient",
 
979
                           (client.__dbus_object_path__,)),
 
980
                          mandos.calls)
 
981
 
 
982
class TestApproveCmd(TestCmd):
 
983
    def test_approve(self):
 
984
        ApproveCmd().run(None, self.clients)
 
985
        for client in self.clients:
 
986
            self.assertIn(("Approve", (True, client_interface)),
 
987
                          client.calls)
 
988
 
 
989
class TestDenyCmd(TestCmd):
 
990
    def test_deny(self):
 
991
        DenyCmd().run(None, self.clients)
 
992
        for client in self.clients:
 
993
            self.assertIn(("Approve", (False, client_interface)),
 
994
                          client.calls)
 
995
 
 
996
class TestEnableCmd(TestCmd):
 
997
    def test_enable(self):
 
998
        for client in self.clients:
 
999
            client.attributes["Enabled"] = False
 
1000
 
 
1001
        EnableCmd().run(None, self.clients)
 
1002
 
 
1003
        for client in self.clients:
 
1004
            self.assertTrue(client.attributes["Enabled"])
 
1005
 
 
1006
class TestDisableCmd(TestCmd):
 
1007
    def test_disable(self):
 
1008
        DisableCmd().run(None, self.clients)
 
1009
 
 
1010
        for client in self.clients:
 
1011
            self.assertFalse(client.attributes["Enabled"])
 
1012
 
 
1013
class Unique(object):
 
1014
    """Class for objects which exist only to be unique objects, since
 
1015
unittest.mock.sentinel only exists in Python 3.3"""
 
1016
 
 
1017
class TestPropertyCmd(TestCmd):
 
1018
    """Abstract class for tests of PropertyCmd classes"""
 
1019
    def runTest(self):
 
1020
        if not hasattr(self, "command"):
 
1021
            return
 
1022
        values_to_get = getattr(self, "values_to_get",
 
1023
                                self.values_to_set)
 
1024
        for value_to_set, value_to_get in zip(self.values_to_set,
 
1025
                                              values_to_get):
 
1026
            for client in self.clients:
 
1027
                old_value = client.attributes[self.property]
 
1028
                self.assertNotIsInstance(old_value, Unique)
 
1029
                client.attributes[self.property] = Unique()
 
1030
            self.run_command(value_to_set, self.clients)
 
1031
            for client in self.clients:
 
1032
                value = client.attributes[self.property]
 
1033
                self.assertNotIsInstance(value, Unique)
 
1034
                self.assertEqual(value, value_to_get)
 
1035
    def run_command(self, value, clients):
 
1036
        self.command().run(None, clients)
 
1037
 
 
1038
class TestBumpTimeoutCmd(TestPropertyCmd):
 
1039
    command = BumpTimeoutCmd
 
1040
    property = "LastCheckedOK"
 
1041
    values_to_set = [""]
 
1042
 
 
1043
class TestStartCheckerCmd(TestPropertyCmd):
 
1044
    command = StartCheckerCmd
 
1045
    property = "CheckerRunning"
 
1046
    values_to_set = [dbus.Boolean(True)]
 
1047
 
 
1048
class TestStopCheckerCmd(TestPropertyCmd):
 
1049
    command = StopCheckerCmd
 
1050
    property = "CheckerRunning"
 
1051
    values_to_set = [dbus.Boolean(False)]
 
1052
 
 
1053
class TestApproveByDefaultCmd(TestPropertyCmd):
 
1054
    command = ApproveByDefaultCmd
 
1055
    property = "ApprovedByDefault"
 
1056
    values_to_set = [dbus.Boolean(True)]
 
1057
 
 
1058
class TestDenyByDefaultCmd(TestPropertyCmd):
 
1059
    command = DenyByDefaultCmd
 
1060
    property = "ApprovedByDefault"
 
1061
    values_to_set = [dbus.Boolean(False)]
 
1062
 
 
1063
class TestValueArgumentPropertyCmd(TestPropertyCmd):
 
1064
    """Abstract class for tests of PropertyCmd classes using the
 
1065
ValueArgumentMixIn"""
 
1066
    def runTest(self):
 
1067
        if type(self) is TestValueArgumentPropertyCmd:
 
1068
            return
 
1069
        return super(TestValueArgumentPropertyCmd, self).runTest()
 
1070
    def run_command(self, value, clients):
 
1071
        self.command(value).run(None, clients)
 
1072
 
 
1073
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
 
1074
    command = SetCheckerCmd
 
1075
    property = "Checker"
 
1076
    values_to_set = ["", ":", "fping -q -- %s"]
 
1077
 
 
1078
class TestSetHostCmd(TestValueArgumentPropertyCmd):
 
1079
    command = SetHostCmd
 
1080
    property = "Host"
 
1081
    values_to_set = ["192.0.2.3", "foo.example.org"]
 
1082
 
 
1083
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
 
1084
    command = SetSecretCmd
 
1085
    property = "Secret"
 
1086
    values_to_set = [b"", b"secret"]
 
1087
 
 
1088
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
 
1089
    command = SetTimeoutCmd
 
1090
    property = "Timeout"
 
1091
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1092
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1093
 
 
1094
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
 
1095
    command = SetExtendedTimeoutCmd
 
1096
    property = "ExtendedTimeout"
 
1097
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1098
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1099
 
 
1100
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
 
1101
    command = SetIntervalCmd
 
1102
    property = "Interval"
 
1103
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1104
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1105
 
 
1106
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
 
1107
    command = SetApprovalDelayCmd
 
1108
    property = "ApprovalDelay"
 
1109
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1110
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1111
 
 
1112
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
 
1113
    command = SetApprovalDurationCmd
 
1114
    property = "ApprovalDuration"
 
1115
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1116
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1117
 
 
1118
class TestOptions(unittest.TestCase):
 
1119
    def setUp(self):
 
1120
        self.parser = argparse.ArgumentParser()
 
1121
        add_command_line_options(self.parser)
 
1122
    def assert_command_from_args(self, args, command_cls, **cmd_attrs):
 
1123
        """Assert that parsing ARGS should result in an instance of
 
1124
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
 
1125
        options = self.parser.parse_args(args)
 
1126
        commands = commands_from_options(options)
 
1127
        self.assertEqual(len(commands), 1)
 
1128
        command = commands[0]
 
1129
        self.assertIsInstance(command, command_cls)
 
1130
        for key, value in cmd_attrs.items():
 
1131
            self.assertEqual(getattr(command, key), value)
 
1132
    def test_default_is_show_table(self):
 
1133
        self.assert_command_from_args([], PrintTableCmd,
 
1134
                                      verbose=False)
 
1135
    def test_show_table_verbose(self):
 
1136
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
 
1137
                                      verbose=True)
 
1138
    def test_enable(self):
 
1139
        self.assert_command_from_args(["--enable"], EnableCmd)
 
1140
    def test_disable(self):
 
1141
        self.assert_command_from_args(["--disable"], DisableCmd)
987
1142
 
988
1143
 
989
1144