/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-04 18:43:22 UTC
  • Revision ID: teddy@recompile.se-20190304184322-6tumlykqh57t5nvs
mandos-ctl: Add test for RemoveCmd

* mandos-ctl (TestRemoveCmd): New.

Show diffs side-by-side

added added

removed removed

Lines of Context:
42
42
import json
43
43
import unittest
44
44
import logging
45
 
import io
46
45
 
47
46
import dbus
48
47
 
279
278
        commands which want to operate on all clients at the same time
280
279
        can override this run() method instead."""
281
280
        self.mandos = mandos
282
 
        for client, properties in clients.items():
283
 
            self.run_on_one_client(client, properties)
 
281
        for client in clients:
 
282
            self.run_on_one_client(client)
284
283
 
285
284
class PrintCmd(Command):
286
285
    """Abstract class for commands printing client details"""
296
295
 
297
296
class PropertyCmd(Command):
298
297
    """Abstract class for Actions for setting one client property"""
299
 
    def run_on_one_client(self, client, properties):
 
298
    def run_on_one_client(self, client):
300
299
        """Set the Client's D-Bus property"""
301
300
        client.Set(client_interface, self.property, self.value_to_set,
302
301
                   dbus_interface=dbus.PROPERTIES_IFACE)
324
323
        self.verbose = verbose
325
324
 
326
325
    def output(self, clients):
327
 
        default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
328
 
        keywords = default_keywords
329
326
        if self.verbose:
330
327
            keywords = self.all_keywords
 
328
        else:
 
329
            keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
331
330
        return str(self.TableOfClients(clients.values(), keywords))
332
331
 
333
332
    class TableOfClients(object):
420
419
        return value
421
420
 
422
421
class IsEnabledCmd(Command):
423
 
    def run_on_one_client(self, client, properties):
424
 
        if self.is_enabled(client, properties):
 
422
    def run_on_one_client(self, client):
 
423
        if self.is_enabled(client):
425
424
            sys.exit(0)
426
425
        sys.exit(1)
427
 
    def is_enabled(self, client, properties):
428
 
        return bool(properties["Enabled"])
 
426
    def is_enabled(self, client):
 
427
        return client.Get(client_interface, "Enabled",
 
428
                          dbus_interface=dbus.PROPERTIES_IFACE)
429
429
 
430
430
class RemoveCmd(Command):
431
 
    def run_on_one_client(self, client, properties):
 
431
    def run_on_one_client(self, client):
432
432
        self.mandos.RemoveClient(client.__dbus_object_path__)
433
433
 
434
434
class ApproveCmd(Command):
435
 
    def run_on_one_client(self, client, properties):
 
435
    def run_on_one_client(self, client):
436
436
        client.Approve(dbus.Boolean(True),
437
437
                       dbus_interface=client_interface)
438
438
 
439
439
class DenyCmd(Command):
440
 
    def run_on_one_client(self, client, properties):
 
440
    def run_on_one_client(self, client):
441
441
        client.Approve(dbus.Boolean(False),
442
442
                       dbus_interface=client_interface)
443
443
 
476
476
    property = "Host"
477
477
 
478
478
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
479
 
    @property
480
 
    def value_to_set(self):
481
 
        return self._vts
482
 
    @value_to_set.setter
483
 
    def value_to_set(self, value):
484
 
        """When setting, read data from supplied file object"""
485
 
        self._vts = value.read()
486
 
        value.close()
487
479
    property = "Secret"
488
480
 
489
481
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
589
581
    parser.add_argument("client", nargs="*", help="Client name")
590
582
 
591
583
 
592
 
def commands_from_options(options):
 
584
def commands_and_clients_from_options(options):
593
585
 
594
586
    commands = []
595
587
 
603
595
        commands.append(DisableCmd())
604
596
 
605
597
    if options.bump_timeout:
606
 
        commands.append(BumpTimeoutCmd())
 
598
        commands.append(BumpTimeoutCmd(options.bump_timeout))
607
599
 
608
600
    if options.start_checker:
609
601
        commands.append(StartCheckerCmd())
618
610
        commands.append(RemoveCmd())
619
611
 
620
612
    if options.checker is not None:
621
 
        commands.append(SetCheckerCmd(options.checker))
 
613
        commands.append(SetCheckerCmd())
622
614
 
623
615
    if options.timeout is not None:
624
616
        commands.append(SetTimeoutCmd(options.timeout))
628
620
            SetExtendedTimeoutCmd(options.extended_timeout))
629
621
 
630
622
    if options.interval is not None:
631
 
        commands.append(SetIntervalCmd(options.interval))
 
623
        command.append(SetIntervalCmd(options.interval))
632
624
 
633
625
    if options.approved_by_default is not None:
634
626
        if options.approved_by_default:
635
 
            commands.append(ApproveByDefaultCmd())
 
627
            command.append(ApproveByDefaultCmd())
636
628
        else:
637
 
            commands.append(DenyByDefaultCmd())
 
629
            command.append(DenyByDefaultCmd())
638
630
 
639
631
    if options.approval_delay is not None:
640
 
        commands.append(SetApprovalDelayCmd(options.approval_delay))
 
632
        command.append(SetApprovalDelayCmd(options.approval_delay))
641
633
 
642
634
    if options.approval_duration is not None:
643
 
        commands.append(
 
635
        command.append(
644
636
            SetApprovalDurationCmd(options.approval_duration))
645
637
 
646
638
    if options.host is not None:
647
 
        commands.append(SetHostCmd(options.host))
 
639
        command.append(SetHostCmd(options.host))
648
640
 
649
641
    if options.secret is not None:
650
 
        commands.append(SetSecretCmd(options.secret))
 
642
        command.append(SetSecretCmd(options.secret))
651
643
 
652
644
    if options.approve:
653
645
        commands.append(ApproveCmd())
660
652
    if not commands:
661
653
        commands.append(PrintTableCmd(verbose=options.verbose))
662
654
 
663
 
    return commands
 
655
    return commands, options.client
664
656
 
665
657
 
666
658
def main():
682
674
    if options.is_enabled and len(options.client) > 1:
683
675
        parser.error("--is-enabled requires exactly one client")
684
676
 
685
 
    clientnames = options.client
 
677
    commands, clientnames = commands_and_clients_from_options(options)
686
678
 
687
679
    try:
688
680
        bus = dbus.SystemBus()
735
727
                sys.exit(1)
736
728
 
737
729
    # Run all commands on clients
738
 
    commands = commands_from_options(options)
739
730
    for command in commands:
740
731
        command.run(mandos_serv, clients)
741
732
 
755
746
 
756
747
class Test_string_to_delta(unittest.TestCase):
757
748
    def test_handles_basic_rfc3339(self):
758
 
        self.assertEqual(string_to_delta("PT0S"),
759
 
                         datetime.timedelta())
760
 
        self.assertEqual(string_to_delta("P0D"),
761
 
                         datetime.timedelta())
762
 
        self.assertEqual(string_to_delta("PT1S"),
763
 
                         datetime.timedelta(0, 1))
764
749
        self.assertEqual(string_to_delta("PT2H"),
765
750
                         datetime.timedelta(0, 7200))
766
751
    def test_falls_back_to_pre_1_6_1_with_warning(self):
801
786
                testcase.assertEqual(dbus_interface,
802
787
                                     dbus.PROPERTIES_IFACE)
803
788
                self.attributes[property] = value
 
789
                self.calls.append(("Set", (interface, property, value,
 
790
                                           dbus_interface)))
804
791
            def Get(self, interface, property, dbus_interface):
805
792
                testcase.assertEqual(interface, client_interface)
806
793
                testcase.assertEqual(dbus_interface,
807
794
                                     dbus.PROPERTIES_IFACE)
 
795
                self.calls.append(("Get", (interface, property,
 
796
                                           dbus_interface)))
808
797
                return self.attributes[property]
809
 
            def Approve(self, approve, dbus_interface):
810
 
                testcase.assertEqual(dbus_interface, client_interface)
811
 
                self.calls.append(("Approve", (approve,
812
 
                                               dbus_interface)))
813
 
        self.client = MockClient(
814
 
            "foo",
815
 
            KeyID=("92ed150794387c03ce684574b1139a65"
816
 
                   "94a34f895daaaf09fd8ea90a27cddb12"),
817
 
            Secret=b"secret",
818
 
            Host="foo.example.org",
819
 
            Enabled=dbus.Boolean(True),
820
 
            Timeout=300000,
821
 
            LastCheckedOK="2019-02-03T00:00:00",
822
 
            Created="2019-01-02T00:00:00",
823
 
            Interval=120000,
824
 
            Fingerprint=("778827225BA7DE539C5A"
825
 
                         "7CFA59CFF7CDBD9A5920"),
826
 
            CheckerRunning=dbus.Boolean(False),
827
 
            LastEnabled="2019-01-03T00:00:00",
828
 
            ApprovalPending=dbus.Boolean(False),
829
 
            ApprovedByDefault=dbus.Boolean(True),
830
 
            LastApprovalRequest="",
831
 
            ApprovalDelay=0,
832
 
            ApprovalDuration=1000,
833
 
            Checker="fping -q -- %(host)s",
834
 
            ExtendedTimeout=900000,
835
 
            Expires="2019-02-04T00:00:00",
836
 
            LastCheckerStatus=0)
837
 
        self.other_client = MockClient(
838
 
            "barbar",
839
 
            KeyID=("0558568eedd67d622f5c83b35a115f79"
840
 
                   "6ab612cff5ad227247e46c2b020f441c"),
841
 
            Secret=b"secretbar",
842
 
            Host="192.0.2.3",
843
 
            Enabled=dbus.Boolean(True),
844
 
            Timeout=300000,
845
 
            LastCheckedOK="2019-02-04T00:00:00",
846
 
            Created="2019-01-03T00:00:00",
847
 
            Interval=120000,
848
 
            Fingerprint=("3E393AEAEFB84C7E89E2"
849
 
                         "F547B3A107558FCA3A27"),
850
 
            CheckerRunning=dbus.Boolean(True),
851
 
            LastEnabled="2019-01-04T00:00:00",
852
 
            ApprovalPending=dbus.Boolean(False),
853
 
            ApprovedByDefault=dbus.Boolean(False),
854
 
            LastApprovalRequest="2019-01-03T00:00:00",
855
 
            ApprovalDelay=30000,
856
 
            ApprovalDuration=1000,
857
 
            Checker=":",
858
 
            ExtendedTimeout=900000,
859
 
            Expires="2019-02-05T00:00:00",
860
 
            LastCheckerStatus=-2)
861
 
        self.clients =  collections.OrderedDict(
862
 
            [
863
 
                (self.client, self.client.attributes),
864
 
                (self.other_client, self.other_client.attributes),
 
798
            def __getitem__(self, key):
 
799
                return self.attributes[key]
 
800
            def __setitem__(self, key, value):
 
801
                self.attributes[key] = value
 
802
        self.clients = collections.OrderedDict([
 
803
            ("foo",
 
804
             MockClient(
 
805
                 "foo",
 
806
                 KeyID=("92ed150794387c03ce684574b1139a65"
 
807
                        "94a34f895daaaf09fd8ea90a27cddb12"),
 
808
                 Secret=b"secret",
 
809
                 Host="foo.example.org",
 
810
                 Enabled=dbus.Boolean(True),
 
811
                 Timeout=300000,
 
812
                 LastCheckedOK="2019-02-03T00:00:00",
 
813
                 Created="2019-01-02T00:00:00",
 
814
                 Interval=120000,
 
815
                 Fingerprint=("778827225BA7DE539C5A"
 
816
                              "7CFA59CFF7CDBD9A5920"),
 
817
                 CheckerRunning=dbus.Boolean(False),
 
818
                 LastEnabled="2019-01-03T00:00:00",
 
819
                 ApprovalPending=dbus.Boolean(False),
 
820
                 ApprovedByDefault=dbus.Boolean(True),
 
821
                 LastApprovalRequest="",
 
822
                 ApprovalDelay=0,
 
823
                 ApprovalDuration=1000,
 
824
                 Checker="fping -q -- %(host)s",
 
825
                 ExtendedTimeout=900000,
 
826
                 Expires="2019-02-04T00:00:00",
 
827
                 LastCheckerStatus=0)),
 
828
            ("barbar",
 
829
             MockClient(
 
830
                 "barbar",
 
831
                 KeyID=("0558568eedd67d622f5c83b35a115f79"
 
832
                        "6ab612cff5ad227247e46c2b020f441c"),
 
833
                 Secret=b"secretbar",
 
834
                 Host="192.0.2.3",
 
835
                 Enabled=dbus.Boolean(True),
 
836
                 Timeout=300000,
 
837
                 LastCheckedOK="2019-02-04T00:00:00",
 
838
                 Created="2019-01-03T00:00:00",
 
839
                 Interval=120000,
 
840
                 Fingerprint=("3E393AEAEFB84C7E89E2"
 
841
                              "F547B3A107558FCA3A27"),
 
842
                 CheckerRunning=dbus.Boolean(True),
 
843
                 LastEnabled="2019-01-04T00:00:00",
 
844
                 ApprovalPending=dbus.Boolean(False),
 
845
                 ApprovedByDefault=dbus.Boolean(False),
 
846
                 LastApprovalRequest="2019-01-03T00:00:00",
 
847
                 ApprovalDelay=30000,
 
848
                 ApprovalDuration=1000,
 
849
                 Checker=":",
 
850
                 ExtendedTimeout=900000,
 
851
                 Expires="2019-02-05T00:00:00",
 
852
                 LastCheckerStatus=-2)),
865
853
            ])
866
 
        self.one_client = {self.client: self.client.attributes}
867
854
 
868
855
class TestPrintTableCmd(TestCmd):
869
856
    def test_normal(self):
883
870
"""[1:-1]
884
871
        self.assertEqual(output, expected_output)
885
872
    def test_one_client(self):
886
 
        output = PrintTableCmd().output(self.one_client)
 
873
        output = PrintTableCmd().output({"foo": self.clients["foo"]})
887
874
        expected_output = """
888
875
Name Enabled Timeout  Last Successful Check
889
876
foo  Yes     00:05:00 2019-02-03T00:00:00  
947
934
        json_data = json.loads(DumpJSONCmd().output(self.clients))
948
935
        self.assertDictEqual(json_data, self.expected_json)
949
936
    def test_one_client(self):
950
 
        clients = self.one_client
 
937
        clients = {"foo": self.clients["foo"]}
951
938
        json_data = json.loads(DumpJSONCmd().output(clients))
952
939
        expected_json = {"foo": self.expected_json["foo"]}
953
940
        self.assertDictEqual(json_data, expected_json)
954
941
 
955
942
class TestIsEnabledCmd(TestCmd):
956
943
    def test_is_enabled(self):
957
 
        self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
958
 
                            for client, properties in self.clients.items()))
 
944
        self.assertTrue(all(IsEnabledCmd().is_enabled(client)
 
945
                            for client in self.clients.values()))
 
946
    def test_is_enabled_does_get_attribute(self):
 
947
        client = self.clients["foo"]
 
948
        self.assertTrue(IsEnabledCmd().is_enabled(client))
 
949
        self.assertListEqual(client.calls,
 
950
                             [("Get",
 
951
                               ("se.recompile.Mandos.Client",
 
952
                                "Enabled",
 
953
                                "org.freedesktop.DBus.Properties"))])
959
954
    def test_is_enabled_run_exits_successfully(self):
 
955
        client = self.clients["foo"]
960
956
        with self.assertRaises(SystemExit) as e:
961
 
            IsEnabledCmd().run(None, self.one_client)
 
957
            IsEnabledCmd().run(None, [client])
962
958
        if e.exception.code is not None:
963
959
            self.assertEqual(e.exception.code, 0)
964
960
        else:
965
961
            self.assertIsNone(e.exception.code)
966
962
    def test_is_enabled_run_exits_with_failure(self):
967
 
        self.client.attributes["Enabled"] = dbus.Boolean(False)
 
963
        client = self.clients["foo"]
 
964
        client["Enabled"] = dbus.Boolean(False)
968
965
        with self.assertRaises(SystemExit) as e:
969
 
            IsEnabledCmd().run(None, self.one_client)
 
966
            IsEnabledCmd().run(None, [client])
970
967
        if isinstance(e.exception.code, int):
971
968
            self.assertNotEqual(e.exception.code, 0)
972
969
        else:
973
970
            self.assertIsNotNone(e.exception.code)
974
971
 
 
972
 
975
973
class TestRemoveCmd(TestCmd):
976
974
    def test_remove(self):
 
975
        client = self.clients["foo"]
977
976
        class MockMandos(object):
978
977
            def __init__(self):
979
978
                self.calls = []
980
979
            def RemoveClient(self, dbus_path):
981
980
                self.calls.append(("RemoveClient", (dbus_path,)))
982
981
        mandos = MockMandos()
983
 
        super(TestRemoveCmd, self).setUp()
984
 
        RemoveCmd().run(mandos, self.clients)
985
 
        self.assertEqual(len(mandos.calls), 2)
986
 
        for client in self.clients:
987
 
            self.assertIn(("RemoveClient",
988
 
                           (client.__dbus_object_path__,)),
989
 
                          mandos.calls)
990
 
 
991
 
class TestApproveCmd(TestCmd):
992
 
    def test_approve(self):
993
 
        ApproveCmd().run(None, self.clients)
994
 
        for client in self.clients:
995
 
            self.assertIn(("Approve", (True, client_interface)),
996
 
                          client.calls)
997
 
 
998
 
class TestDenyCmd(TestCmd):
999
 
    def test_deny(self):
1000
 
        DenyCmd().run(None, self.clients)
1001
 
        for client in self.clients:
1002
 
            self.assertIn(("Approve", (False, client_interface)),
1003
 
                          client.calls)
1004
 
 
1005
 
class TestEnableCmd(TestCmd):
1006
 
    def test_enable(self):
1007
 
        for client in self.clients:
1008
 
            client.attributes["Enabled"] = False
1009
 
 
1010
 
        EnableCmd().run(None, self.clients)
1011
 
 
1012
 
        for client in self.clients:
1013
 
            self.assertTrue(client.attributes["Enabled"])
1014
 
 
1015
 
class TestDisableCmd(TestCmd):
1016
 
    def test_disable(self):
1017
 
        DisableCmd().run(None, self.clients)
1018
 
 
1019
 
        for client in self.clients:
1020
 
            self.assertFalse(client.attributes["Enabled"])
1021
 
 
1022
 
class Unique(object):
1023
 
    """Class for objects which exist only to be unique objects, since
1024
 
unittest.mock.sentinel only exists in Python 3.3"""
1025
 
 
1026
 
class TestPropertyCmd(TestCmd):
1027
 
    """Abstract class for tests of PropertyCmd classes"""
1028
 
    def runTest(self):
1029
 
        if not hasattr(self, "command"):
1030
 
            return
1031
 
        values_to_get = getattr(self, "values_to_get",
1032
 
                                self.values_to_set)
1033
 
        for value_to_set, value_to_get in zip(self.values_to_set,
1034
 
                                              values_to_get):
1035
 
            for client in self.clients:
1036
 
                old_value = client.attributes[self.property]
1037
 
                self.assertNotIsInstance(old_value, Unique)
1038
 
                client.attributes[self.property] = Unique()
1039
 
            self.run_command(value_to_set, self.clients)
1040
 
            for client in self.clients:
1041
 
                value = client.attributes[self.property]
1042
 
                self.assertNotIsInstance(value, Unique)
1043
 
                self.assertEqual(value, value_to_get)
1044
 
    def run_command(self, value, clients):
1045
 
        self.command().run(None, clients)
1046
 
 
1047
 
class TestBumpTimeoutCmd(TestPropertyCmd):
1048
 
    command = BumpTimeoutCmd
1049
 
    property = "LastCheckedOK"
1050
 
    values_to_set = [""]
1051
 
 
1052
 
class TestStartCheckerCmd(TestPropertyCmd):
1053
 
    command = StartCheckerCmd
1054
 
    property = "CheckerRunning"
1055
 
    values_to_set = [dbus.Boolean(True)]
1056
 
 
1057
 
class TestStopCheckerCmd(TestPropertyCmd):
1058
 
    command = StopCheckerCmd
1059
 
    property = "CheckerRunning"
1060
 
    values_to_set = [dbus.Boolean(False)]
1061
 
 
1062
 
class TestApproveByDefaultCmd(TestPropertyCmd):
1063
 
    command = ApproveByDefaultCmd
1064
 
    property = "ApprovedByDefault"
1065
 
    values_to_set = [dbus.Boolean(True)]
1066
 
 
1067
 
class TestDenyByDefaultCmd(TestPropertyCmd):
1068
 
    command = DenyByDefaultCmd
1069
 
    property = "ApprovedByDefault"
1070
 
    values_to_set = [dbus.Boolean(False)]
1071
 
 
1072
 
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1073
 
    """Abstract class for tests of PropertyCmd classes using the
1074
 
ValueArgumentMixIn"""
1075
 
    def runTest(self):
1076
 
        if type(self) is TestValueArgumentPropertyCmd:
1077
 
            return
1078
 
        return super(TestValueArgumentPropertyCmd, self).runTest()
1079
 
    def run_command(self, value, clients):
1080
 
        self.command(value).run(None, clients)
1081
 
 
1082
 
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1083
 
    command = SetCheckerCmd
1084
 
    property = "Checker"
1085
 
    values_to_set = ["", ":", "fping -q -- %s"]
1086
 
 
1087
 
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1088
 
    command = SetHostCmd
1089
 
    property = "Host"
1090
 
    values_to_set = ["192.0.2.3", "foo.example.org"]
1091
 
 
1092
 
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1093
 
    command = SetSecretCmd
1094
 
    property = "Secret"
1095
 
    values_to_set = [open("/dev/null", "rb"),
1096
 
                     io.BytesIO(b"secret\0xyzzy\nbar")]
1097
 
    values_to_get = [b"", b"secret\0xyzzy\nbar"]
1098
 
 
1099
 
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1100
 
    command = SetTimeoutCmd
1101
 
    property = "Timeout"
1102
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1103
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1104
 
 
1105
 
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1106
 
    command = SetExtendedTimeoutCmd
1107
 
    property = "ExtendedTimeout"
1108
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1109
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1110
 
 
1111
 
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1112
 
    command = SetIntervalCmd
1113
 
    property = "Interval"
1114
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1115
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1116
 
 
1117
 
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1118
 
    command = SetApprovalDelayCmd
1119
 
    property = "ApprovalDelay"
1120
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1121
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1122
 
 
1123
 
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1124
 
    command = SetApprovalDurationCmd
1125
 
    property = "ApprovalDuration"
1126
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1127
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1128
 
 
1129
 
class TestOptions(unittest.TestCase):
1130
 
    def setUp(self):
1131
 
        self.parser = argparse.ArgumentParser()
1132
 
        add_command_line_options(self.parser)
1133
 
    def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1134
 
        """Assert that parsing ARGS should result in an instance of
1135
 
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1136
 
        options = self.parser.parse_args(args)
1137
 
        commands = commands_from_options(options)
1138
 
        self.assertEqual(len(commands), 1)
1139
 
        command = commands[0]
1140
 
        self.assertIsInstance(command, command_cls)
1141
 
        for key, value in cmd_attrs.items():
1142
 
            self.assertEqual(getattr(command, key), value)
1143
 
    def test_default_is_show_table(self):
1144
 
        self.assert_command_from_args([], PrintTableCmd,
1145
 
                                      verbose=False)
1146
 
    def test_show_table_verbose(self):
1147
 
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
1148
 
                                      verbose=True)
1149
 
    def test_enable(self):
1150
 
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1151
 
    def test_disable(self):
1152
 
        self.assert_command_from_args(["--disable", "foo"],
1153
 
                                      DisableCmd)
 
982
        RemoveCmd().run(mandos, [client])
 
983
        self.assertEqual(len(mandos.calls), 1)
 
984
        self.assertListEqual(mandos.calls,
 
985
                             [("RemoveClient",
 
986
                               (client.__dbus_object_path__,))])
1154
987
 
1155
988
 
1156
989