/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-03 18:11:39 UTC
  • Revision ID: teddy@recompile.se-20190303181139-f8r50k8bgirs3d5f
mandos-ctl: Add test for IsEnabledCmd class

* mandos-ctl (TestCmd.setUp.MockClient.Set,
              TestCmd.setUp.MockClient.Get): Fix bugs.
  (TestCmd.setUp.MockClient.__setitem__): New.
  (TestIsEnabledCmd): New.

Show diffs side-by-side

added added

removed removed

Lines of Context:
42
42
import json
43
43
import unittest
44
44
import logging
45
 
import io
46
45
 
47
46
import dbus
48
47
 
279
278
        commands which want to operate on all clients at the same time
280
279
        can override this run() method instead."""
281
280
        self.mandos = mandos
282
 
        for client, properties in clients.items():
283
 
            self.run_on_one_client(client, properties)
 
281
        for client in clients:
 
282
            self.run_on_one_client(client)
284
283
 
285
284
class PrintCmd(Command):
286
285
    """Abstract class for commands printing client details"""
296
295
 
297
296
class PropertyCmd(Command):
298
297
    """Abstract class for Actions for setting one client property"""
299
 
    def run_on_one_client(self, client, properties):
 
298
    def run_on_one_client(self, client):
300
299
        """Set the Client's D-Bus property"""
301
300
        client.Set(client_interface, self.property, self.value_to_set,
302
301
                   dbus_interface=dbus.PROPERTIES_IFACE)
324
323
        self.verbose = verbose
325
324
 
326
325
    def output(self, clients):
327
 
        default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
328
 
        keywords = default_keywords
329
326
        if self.verbose:
330
327
            keywords = self.all_keywords
 
328
        else:
 
329
            keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
331
330
        return str(self.TableOfClients(clients.values(), keywords))
332
331
 
333
332
    class TableOfClients(object):
420
419
        return value
421
420
 
422
421
class IsEnabledCmd(Command):
423
 
    def run_on_one_client(self, client, properties):
424
 
        if self.is_enabled(client, properties):
 
422
    def run_on_one_client(self, client):
 
423
        if self.is_enabled(client):
425
424
            sys.exit(0)
426
425
        sys.exit(1)
427
 
    def is_enabled(self, client, properties):
428
 
        return bool(properties["Enabled"])
 
426
    def is_enabled(self, client):
 
427
        return client.Get(client_interface, "Enabled",
 
428
                          dbus_interface=dbus.PROPERTIES_IFACE)
429
429
 
430
430
class RemoveCmd(Command):
431
 
    def run_on_one_client(self, client, properties):
 
431
    def run_on_one_client(self, client):
432
432
        self.mandos.RemoveClient(client.__dbus_object_path__)
433
433
 
434
434
class ApproveCmd(Command):
435
 
    def run_on_one_client(self, client, properties):
 
435
    def run_on_one_client(self, client):
436
436
        client.Approve(dbus.Boolean(True),
437
437
                       dbus_interface=client_interface)
438
438
 
439
439
class DenyCmd(Command):
440
 
    def run_on_one_client(self, client, properties):
 
440
    def run_on_one_client(self, client):
441
441
        client.Approve(dbus.Boolean(False),
442
442
                       dbus_interface=client_interface)
443
443
 
476
476
    property = "Host"
477
477
 
478
478
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
479
 
    @property
480
 
    def value_to_set(self):
481
 
        return self._vts
482
 
    @value_to_set.setter
483
 
    def value_to_set(self, value):
484
 
        """When setting, read data from supplied file object"""
485
 
        self._vts = value.read()
486
 
        value.close()
487
479
    property = "Secret"
488
480
 
489
481
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
524
516
                options.approve,
525
517
                options.deny))
526
518
 
527
 
def add_command_line_options(parser):
 
519
 
 
520
def commands_and_clients_from_options(args=None):
 
521
    if args is None:
 
522
        args=sys.argv[1:]
 
523
    parser = argparse.ArgumentParser()
528
524
    parser.add_argument("--version", action="version",
529
525
                        version="%(prog)s {}".format(version),
530
526
                        help="show version number and exit")
587
583
    parser.add_argument("--check", action="store_true",
588
584
                        help="Run self-test")
589
585
    parser.add_argument("client", nargs="*", help="Client name")
590
 
 
591
 
 
592
 
def commands_from_options(options):
 
586
    options = parser.parse_args(args=args)
 
587
 
 
588
    if has_actions(options) and not (options.client or options.all):
 
589
        parser.error("Options require clients names or --all.")
 
590
    if options.verbose and has_actions(options):
 
591
        parser.error("--verbose can only be used alone.")
 
592
    if options.dump_json and (options.verbose
 
593
                              or has_actions(options)):
 
594
        parser.error("--dump-json can only be used alone.")
 
595
    if options.all and not has_actions(options):
 
596
        parser.error("--all requires an action.")
 
597
    if options.is_enabled and len(options.client) > 1:
 
598
            parser.error("--is-enabled requires exactly one client")
593
599
 
594
600
    commands = []
595
601
 
603
609
        commands.append(DisableCmd())
604
610
 
605
611
    if options.bump_timeout:
606
 
        commands.append(BumpTimeoutCmd())
 
612
        commands.append(BumpTimeoutCmd(options.bump_timeout))
607
613
 
608
614
    if options.start_checker:
609
615
        commands.append(StartCheckerCmd())
618
624
        commands.append(RemoveCmd())
619
625
 
620
626
    if options.checker is not None:
621
 
        commands.append(SetCheckerCmd(options.checker))
 
627
        commands.append(SetCheckerCmd())
622
628
 
623
629
    if options.timeout is not None:
624
630
        commands.append(SetTimeoutCmd(options.timeout))
628
634
            SetExtendedTimeoutCmd(options.extended_timeout))
629
635
 
630
636
    if options.interval is not None:
631
 
        commands.append(SetIntervalCmd(options.interval))
 
637
        command.append(SetIntervalCmd(options.interval))
632
638
 
633
639
    if options.approved_by_default is not None:
634
640
        if options.approved_by_default:
635
 
            commands.append(ApproveByDefaultCmd())
 
641
            command.append(ApproveByDefaultCmd())
636
642
        else:
637
 
            commands.append(DenyByDefaultCmd())
 
643
            command.append(DenyByDefaultCmd())
638
644
 
639
645
    if options.approval_delay is not None:
640
 
        commands.append(SetApprovalDelayCmd(options.approval_delay))
 
646
        command.append(SetApprovalDelayCmd(options.approval_delay))
641
647
 
642
648
    if options.approval_duration is not None:
643
 
        commands.append(
 
649
        command.append(
644
650
            SetApprovalDurationCmd(options.approval_duration))
645
651
 
646
652
    if options.host is not None:
647
 
        commands.append(SetHostCmd(options.host))
 
653
        command.append(SetHostCmd(options.host))
648
654
 
649
655
    if options.secret is not None:
650
 
        commands.append(SetSecretCmd(options.secret))
 
656
        command.append(SetSecretCmd(options.secret))
651
657
 
652
658
    if options.approve:
653
659
        commands.append(ApproveCmd())
660
666
    if not commands:
661
667
        commands.append(PrintTableCmd(verbose=options.verbose))
662
668
 
663
 
    return commands
 
669
    return commands, options.client
664
670
 
665
671
 
666
672
def main():
667
 
    parser = argparse.ArgumentParser()
668
 
 
669
 
    add_command_line_options(parser)
670
 
 
671
 
    options = parser.parse_args()
672
 
 
673
 
    if has_actions(options) and not (options.client or options.all):
674
 
        parser.error("Options require clients names or --all.")
675
 
    if options.verbose and has_actions(options):
676
 
        parser.error("--verbose can only be used alone.")
677
 
    if options.dump_json and (options.verbose
678
 
                              or has_actions(options)):
679
 
        parser.error("--dump-json can only be used alone.")
680
 
    if options.all and not has_actions(options):
681
 
        parser.error("--all requires an action.")
682
 
    if options.is_enabled and len(options.client) > 1:
683
 
        parser.error("--is-enabled requires exactly one client")
684
 
 
685
 
    clientnames = options.client
 
673
    commands, clientnames = commands_and_clients_from_options()
686
674
 
687
675
    try:
688
676
        bus = dbus.SystemBus()
702
690
        def filter(self, record):
703
691
            return False
704
692
    dbus_filter = NullFilter()
 
693
    dbus_logger.addFilter(dbus_filter)
705
694
    try:
706
 
        dbus_logger.addFilter(dbus_filter)
707
 
        mandos_clients = {path: ifs_and_props[client_interface]
708
 
                          for path, ifs_and_props in
709
 
                          mandos_serv_object_manager
710
 
                          .GetManagedObjects().items()
711
 
                          if client_interface in ifs_and_props}
 
695
        try:
 
696
            mandos_clients = {path: ifs_and_props[client_interface]
 
697
                              for path, ifs_and_props in
 
698
                              mandos_serv_object_manager
 
699
                              .GetManagedObjects().items()
 
700
                              if client_interface in ifs_and_props}
 
701
        finally:
 
702
            # restore dbus logger
 
703
            dbus_logger.removeFilter(dbus_filter)
712
704
    except dbus.exceptions.DBusException as e:
713
705
        log.critical("Failed to access Mandos server through D-Bus:"
714
706
                     "\n%s", e)
715
707
        sys.exit(1)
716
 
    finally:
717
 
        # restore dbus logger
718
 
        dbus_logger.removeFilter(dbus_filter)
719
708
 
720
709
    # Compile dict of (clients: properties) to process
721
710
    clients = {}
735
724
                sys.exit(1)
736
725
 
737
726
    # Run all commands on clients
738
 
    commands = commands_from_options(options)
739
727
    for command in commands:
740
728
        command.run(mandos_serv, clients)
741
729
 
755
743
 
756
744
class Test_string_to_delta(unittest.TestCase):
757
745
    def test_handles_basic_rfc3339(self):
758
 
        self.assertEqual(string_to_delta("PT0S"),
759
 
                         datetime.timedelta())
760
 
        self.assertEqual(string_to_delta("P0D"),
761
 
                         datetime.timedelta())
762
 
        self.assertEqual(string_to_delta("PT1S"),
763
 
                         datetime.timedelta(0, 1))
764
746
        self.assertEqual(string_to_delta("PT2H"),
765
747
                         datetime.timedelta(0, 7200))
766
748
    def test_falls_back_to_pre_1_6_1_with_warning(self):
801
783
                testcase.assertEqual(dbus_interface,
802
784
                                     dbus.PROPERTIES_IFACE)
803
785
                self.attributes[property] = value
 
786
                self.calls.append(("Set", (interface, property, value,
 
787
                                           dbus_interface)))
804
788
            def Get(self, interface, property, dbus_interface):
805
789
                testcase.assertEqual(interface, client_interface)
806
790
                testcase.assertEqual(dbus_interface,
807
791
                                     dbus.PROPERTIES_IFACE)
 
792
                self.calls.append(("Get", (interface, property,
 
793
                                           dbus_interface)))
808
794
                return self.attributes[property]
809
 
            def Approve(self, approve, dbus_interface):
810
 
                testcase.assertEqual(dbus_interface, client_interface)
811
 
                self.calls.append(("Approve", (approve,
812
 
                                               dbus_interface)))
813
 
        self.client = MockClient(
814
 
            "foo",
815
 
            KeyID=("92ed150794387c03ce684574b1139a65"
816
 
                   "94a34f895daaaf09fd8ea90a27cddb12"),
817
 
            Secret=b"secret",
818
 
            Host="foo.example.org",
819
 
            Enabled=dbus.Boolean(True),
820
 
            Timeout=300000,
821
 
            LastCheckedOK="2019-02-03T00:00:00",
822
 
            Created="2019-01-02T00:00:00",
823
 
            Interval=120000,
824
 
            Fingerprint=("778827225BA7DE539C5A"
825
 
                         "7CFA59CFF7CDBD9A5920"),
826
 
            CheckerRunning=dbus.Boolean(False),
827
 
            LastEnabled="2019-01-03T00:00:00",
828
 
            ApprovalPending=dbus.Boolean(False),
829
 
            ApprovedByDefault=dbus.Boolean(True),
830
 
            LastApprovalRequest="",
831
 
            ApprovalDelay=0,
832
 
            ApprovalDuration=1000,
833
 
            Checker="fping -q -- %(host)s",
834
 
            ExtendedTimeout=900000,
835
 
            Expires="2019-02-04T00:00:00",
836
 
            LastCheckerStatus=0)
837
 
        self.other_client = MockClient(
838
 
            "barbar",
839
 
            KeyID=("0558568eedd67d622f5c83b35a115f79"
840
 
                   "6ab612cff5ad227247e46c2b020f441c"),
841
 
            Secret=b"secretbar",
842
 
            Host="192.0.2.3",
843
 
            Enabled=dbus.Boolean(True),
844
 
            Timeout=300000,
845
 
            LastCheckedOK="2019-02-04T00:00:00",
846
 
            Created="2019-01-03T00:00:00",
847
 
            Interval=120000,
848
 
            Fingerprint=("3E393AEAEFB84C7E89E2"
849
 
                         "F547B3A107558FCA3A27"),
850
 
            CheckerRunning=dbus.Boolean(True),
851
 
            LastEnabled="2019-01-04T00:00:00",
852
 
            ApprovalPending=dbus.Boolean(False),
853
 
            ApprovedByDefault=dbus.Boolean(False),
854
 
            LastApprovalRequest="2019-01-03T00:00:00",
855
 
            ApprovalDelay=30000,
856
 
            ApprovalDuration=1000,
857
 
            Checker=":",
858
 
            ExtendedTimeout=900000,
859
 
            Expires="2019-02-05T00:00:00",
860
 
            LastCheckerStatus=-2)
861
 
        self.clients =  collections.OrderedDict(
862
 
            [
863
 
                (self.client, self.client.attributes),
864
 
                (self.other_client, self.other_client.attributes),
 
795
            def __getitem__(self, key):
 
796
                return self.attributes[key]
 
797
            def __setitem__(self, key, value):
 
798
                self.attributes[key] = value
 
799
        self.clients = collections.OrderedDict([
 
800
            ("foo",
 
801
             MockClient(
 
802
                 "foo",
 
803
                 KeyID=("92ed150794387c03ce684574b1139a65"
 
804
                        "94a34f895daaaf09fd8ea90a27cddb12"),
 
805
                 Secret=b"secret",
 
806
                 Host="foo.example.org",
 
807
                 Enabled=dbus.Boolean(True),
 
808
                 Timeout=300000,
 
809
                 LastCheckedOK="2019-02-03T00:00:00",
 
810
                 Created="2019-01-02T00:00:00",
 
811
                 Interval=120000,
 
812
                 Fingerprint=("778827225BA7DE539C5A"
 
813
                              "7CFA59CFF7CDBD9A5920"),
 
814
                 CheckerRunning=dbus.Boolean(False),
 
815
                 LastEnabled="2019-01-03T00:00:00",
 
816
                 ApprovalPending=dbus.Boolean(False),
 
817
                 ApprovedByDefault=dbus.Boolean(True),
 
818
                 LastApprovalRequest="",
 
819
                 ApprovalDelay=0,
 
820
                 ApprovalDuration=1000,
 
821
                 Checker="fping -q -- %(host)s",
 
822
                 ExtendedTimeout=900000,
 
823
                 Expires="2019-02-04T00:00:00",
 
824
                 LastCheckerStatus=0)),
 
825
            ("barbar",
 
826
             MockClient(
 
827
                 "barbar",
 
828
                 KeyID=("0558568eedd67d622f5c83b35a115f79"
 
829
                        "6ab612cff5ad227247e46c2b020f441c"),
 
830
                 Secret=b"secretbar",
 
831
                 Host="192.0.2.3",
 
832
                 Enabled=dbus.Boolean(True),
 
833
                 Timeout=300000,
 
834
                 LastCheckedOK="2019-02-04T00:00:00",
 
835
                 Created="2019-01-03T00:00:00",
 
836
                 Interval=120000,
 
837
                 Fingerprint=("3E393AEAEFB84C7E89E2"
 
838
                              "F547B3A107558FCA3A27"),
 
839
                 CheckerRunning=dbus.Boolean(True),
 
840
                 LastEnabled="2019-01-04T00:00:00",
 
841
                 ApprovalPending=dbus.Boolean(False),
 
842
                 ApprovedByDefault=dbus.Boolean(False),
 
843
                 LastApprovalRequest="2019-01-03T00:00:00",
 
844
                 ApprovalDelay=30000,
 
845
                 ApprovalDuration=1000,
 
846
                 Checker=":",
 
847
                 ExtendedTimeout=900000,
 
848
                 Expires="2019-02-05T00:00:00",
 
849
                 LastCheckerStatus=-2)),
865
850
            ])
866
 
        self.one_client = {self.client: self.client.attributes}
867
851
 
868
852
class TestPrintTableCmd(TestCmd):
869
853
    def test_normal(self):
883
867
"""[1:-1]
884
868
        self.assertEqual(output, expected_output)
885
869
    def test_one_client(self):
886
 
        output = PrintTableCmd().output(self.one_client)
 
870
        output = PrintTableCmd().output({"foo": self.clients["foo"]})
887
871
        expected_output = """
888
872
Name Enabled Timeout  Last Successful Check
889
873
foo  Yes     00:05:00 2019-02-03T00:00:00  
947
931
        json_data = json.loads(DumpJSONCmd().output(self.clients))
948
932
        self.assertDictEqual(json_data, self.expected_json)
949
933
    def test_one_client(self):
950
 
        clients = self.one_client
 
934
        clients = {"foo": self.clients["foo"]}
951
935
        json_data = json.loads(DumpJSONCmd().output(clients))
952
936
        expected_json = {"foo": self.expected_json["foo"]}
953
937
        self.assertDictEqual(json_data, expected_json)
954
938
 
955
939
class TestIsEnabledCmd(TestCmd):
956
940
    def test_is_enabled(self):
957
 
        self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
958
 
                            for client, properties in self.clients.items()))
 
941
        self.assertTrue(all(IsEnabledCmd().is_enabled(client)
 
942
                            for client in self.clients.values()))
 
943
    def test_is_enabled_does_get_attribute(self):
 
944
        client = self.clients["foo"]
 
945
        self.assertTrue(IsEnabledCmd().is_enabled(client))
 
946
        self.assertListEqual(client.calls,
 
947
                             [("Get",
 
948
                               ("se.recompile.Mandos.Client",
 
949
                                "Enabled",
 
950
                                "org.freedesktop.DBus.Properties"))])
959
951
    def test_is_enabled_run_exits_successfully(self):
 
952
        client = self.clients["foo"]
960
953
        with self.assertRaises(SystemExit) as e:
961
 
            IsEnabledCmd().run(None, self.one_client)
 
954
            IsEnabledCmd().run_on_one_client(client)
962
955
        if e.exception.code is not None:
963
956
            self.assertEqual(e.exception.code, 0)
964
957
        else:
965
958
            self.assertIsNone(e.exception.code)
966
959
    def test_is_enabled_run_exits_with_failure(self):
967
 
        self.client.attributes["Enabled"] = dbus.Boolean(False)
 
960
        client = self.clients["foo"]
 
961
        client["Enabled"] = dbus.Boolean(False)
968
962
        with self.assertRaises(SystemExit) as e:
969
 
            IsEnabledCmd().run(None, self.one_client)
 
963
            IsEnabledCmd().run_on_one_client(client)
970
964
        if isinstance(e.exception.code, int):
971
965
            self.assertNotEqual(e.exception.code, 0)
972
966
        else:
973
967
            self.assertIsNotNone(e.exception.code)
974
968
 
975
 
class TestRemoveCmd(TestCmd):
976
 
    def test_remove(self):
977
 
        class MockMandos(object):
978
 
            def __init__(self):
979
 
                self.calls = []
980
 
            def RemoveClient(self, dbus_path):
981
 
                self.calls.append(("RemoveClient", (dbus_path,)))
982
 
        mandos = MockMandos()
983
 
        super(TestRemoveCmd, self).setUp()
984
 
        RemoveCmd().run(mandos, self.clients)
985
 
        self.assertEqual(len(mandos.calls), 2)
986
 
        for client in self.clients:
987
 
            self.assertIn(("RemoveClient",
988
 
                           (client.__dbus_object_path__,)),
989
 
                          mandos.calls)
990
 
 
991
 
class TestApproveCmd(TestCmd):
992
 
    def test_approve(self):
993
 
        ApproveCmd().run(None, self.clients)
994
 
        for client in self.clients:
995
 
            self.assertIn(("Approve", (True, client_interface)),
996
 
                          client.calls)
997
 
 
998
 
class TestDenyCmd(TestCmd):
999
 
    def test_deny(self):
1000
 
        DenyCmd().run(None, self.clients)
1001
 
        for client in self.clients:
1002
 
            self.assertIn(("Approve", (False, client_interface)),
1003
 
                          client.calls)
1004
 
 
1005
 
class TestEnableCmd(TestCmd):
1006
 
    def test_enable(self):
1007
 
        for client in self.clients:
1008
 
            client.attributes["Enabled"] = False
1009
 
 
1010
 
        EnableCmd().run(None, self.clients)
1011
 
 
1012
 
        for client in self.clients:
1013
 
            self.assertTrue(client.attributes["Enabled"])
1014
 
 
1015
 
class TestDisableCmd(TestCmd):
1016
 
    def test_disable(self):
1017
 
        DisableCmd().run(None, self.clients)
1018
 
 
1019
 
        for client in self.clients:
1020
 
            self.assertFalse(client.attributes["Enabled"])
1021
 
 
1022
 
class Unique(object):
1023
 
    """Class for objects which exist only to be unique objects, since
1024
 
unittest.mock.sentinel only exists in Python 3.3"""
1025
 
 
1026
 
class TestPropertyCmd(TestCmd):
1027
 
    """Abstract class for tests of PropertyCmd classes"""
1028
 
    def runTest(self):
1029
 
        if not hasattr(self, "command"):
1030
 
            return
1031
 
        values_to_get = getattr(self, "values_to_get",
1032
 
                                self.values_to_set)
1033
 
        for value_to_set, value_to_get in zip(self.values_to_set,
1034
 
                                              values_to_get):
1035
 
            for client in self.clients:
1036
 
                old_value = client.attributes[self.property]
1037
 
                self.assertNotIsInstance(old_value, Unique)
1038
 
                client.attributes[self.property] = Unique()
1039
 
            self.run_command(value_to_set, self.clients)
1040
 
            for client in self.clients:
1041
 
                value = client.attributes[self.property]
1042
 
                self.assertNotIsInstance(value, Unique)
1043
 
                self.assertEqual(value, value_to_get)
1044
 
    def run_command(self, value, clients):
1045
 
        self.command().run(None, clients)
1046
 
 
1047
 
class TestBumpTimeoutCmd(TestPropertyCmd):
1048
 
    command = BumpTimeoutCmd
1049
 
    property = "LastCheckedOK"
1050
 
    values_to_set = [""]
1051
 
 
1052
 
class TestStartCheckerCmd(TestPropertyCmd):
1053
 
    command = StartCheckerCmd
1054
 
    property = "CheckerRunning"
1055
 
    values_to_set = [dbus.Boolean(True)]
1056
 
 
1057
 
class TestStopCheckerCmd(TestPropertyCmd):
1058
 
    command = StopCheckerCmd
1059
 
    property = "CheckerRunning"
1060
 
    values_to_set = [dbus.Boolean(False)]
1061
 
 
1062
 
class TestApproveByDefaultCmd(TestPropertyCmd):
1063
 
    command = ApproveByDefaultCmd
1064
 
    property = "ApprovedByDefault"
1065
 
    values_to_set = [dbus.Boolean(True)]
1066
 
 
1067
 
class TestDenyByDefaultCmd(TestPropertyCmd):
1068
 
    command = DenyByDefaultCmd
1069
 
    property = "ApprovedByDefault"
1070
 
    values_to_set = [dbus.Boolean(False)]
1071
 
 
1072
 
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1073
 
    """Abstract class for tests of PropertyCmd classes using the
1074
 
ValueArgumentMixIn"""
1075
 
    def runTest(self):
1076
 
        if type(self) is TestValueArgumentPropertyCmd:
1077
 
            return
1078
 
        return super(TestValueArgumentPropertyCmd, self).runTest()
1079
 
    def run_command(self, value, clients):
1080
 
        self.command(value).run(None, clients)
1081
 
 
1082
 
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1083
 
    command = SetCheckerCmd
1084
 
    property = "Checker"
1085
 
    values_to_set = ["", ":", "fping -q -- %s"]
1086
 
 
1087
 
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1088
 
    command = SetHostCmd
1089
 
    property = "Host"
1090
 
    values_to_set = ["192.0.2.3", "foo.example.org"]
1091
 
 
1092
 
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1093
 
    command = SetSecretCmd
1094
 
    property = "Secret"
1095
 
    values_to_set = [open("/dev/null", "rb"),
1096
 
                     io.BytesIO(b"secret\0xyzzy\nbar")]
1097
 
    values_to_get = [b"", b"secret\0xyzzy\nbar"]
1098
 
 
1099
 
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1100
 
    command = SetTimeoutCmd
1101
 
    property = "Timeout"
1102
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1103
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1104
 
 
1105
 
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1106
 
    command = SetExtendedTimeoutCmd
1107
 
    property = "ExtendedTimeout"
1108
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1109
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1110
 
 
1111
 
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1112
 
    command = SetIntervalCmd
1113
 
    property = "Interval"
1114
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1115
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1116
 
 
1117
 
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1118
 
    command = SetApprovalDelayCmd
1119
 
    property = "ApprovalDelay"
1120
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1121
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1122
 
 
1123
 
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1124
 
    command = SetApprovalDurationCmd
1125
 
    property = "ApprovalDuration"
1126
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1127
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1128
 
 
1129
 
class TestOptions(unittest.TestCase):
1130
 
    def setUp(self):
1131
 
        self.parser = argparse.ArgumentParser()
1132
 
        add_command_line_options(self.parser)
1133
 
    def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1134
 
        """Assert that parsing ARGS should result in an instance of
1135
 
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1136
 
        options = self.parser.parse_args(args)
1137
 
        commands = commands_from_options(options)
1138
 
        self.assertEqual(len(commands), 1)
1139
 
        command = commands[0]
1140
 
        self.assertIsInstance(command, command_cls)
1141
 
        for key, value in cmd_attrs.items():
1142
 
            self.assertEqual(getattr(command, key), value)
1143
 
    def test_default_is_show_table(self):
1144
 
        self.assert_command_from_args([], PrintTableCmd,
1145
 
                                      verbose=False)
1146
 
    def test_show_table_verbose(self):
1147
 
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
1148
 
                                      verbose=True)
1149
 
    def test_enable(self):
1150
 
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1151
 
    def test_disable(self):
1152
 
        self.assert_command_from_args(["--disable", "foo"],
1153
 
                                      DisableCmd)
1154
 
 
1155
969
 
1156
970
 
1157
971
def should_only_run_tests():