/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-08-18 00:05:36 UTC
  • Revision ID: teddy@recompile.se-20190818000536-cqmaqpo3h6574u1r
Debian package: Only reload dbus daemon if necessary

* debian/mandos.postinst (configure): Only reload dbus daemon if the
                                      _mandos user was renamed or
                                      created.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/python
2
 
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
 
2
# -*- after-save-hook: (lambda () (let ((command (if (fboundp 'file-local-name) (file-local-name (buffer-file-name)) (or (file-remote-p (buffer-file-name) 'localname) (buffer-file-name))))) (if (= (progn (if (get-buffer "*Test*") (kill-buffer "*Test*")) (process-file-shell-command (format "%s --check" (shell-quote-argument command)) nil "*Test*")) 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w))) (progn (with-current-buffer "*Test*" (compilation-mode)) (display-buffer "*Test*" '(display-buffer-in-side-window)))))); coding: utf-8 -*-
3
3
#
4
4
# Mandos Monitor - Control and monitor the Mandos server
5
5
#
45
45
import io
46
46
import tempfile
47
47
import contextlib
48
 
import abc
49
 
 
50
 
import dbus as dbus_python
 
48
 
 
49
if sys.version_info.major == 2:
 
50
    __metaclass__ = type
 
51
 
 
52
try:
 
53
    import pydbus
 
54
    import gi
 
55
    dbus_python = None
 
56
except ImportError:
 
57
    import dbus as dbus_python
 
58
    pydbus = None
 
59
    class gi:
 
60
        """Dummy gi module, for the tests"""
 
61
        class repository:
 
62
            class GLib:
 
63
                class Error(Exception):
 
64
                    pass
51
65
 
52
66
# Show warnings by default
53
67
if not sys.warnoptions:
67
81
 
68
82
locale.setlocale(locale.LC_ALL, "")
69
83
 
70
 
version = "1.8.3"
 
84
version = "1.8.7"
71
85
 
72
86
 
73
87
def main():
82
96
    if options.debug:
83
97
        log.setLevel(logging.DEBUG)
84
98
 
85
 
    bus = dbus_python_adapter.CachingBus(dbus_python)
 
99
    if pydbus is not None:
 
100
        bus = pydbus_adapter.CachingBus(pydbus)
 
101
    else:
 
102
        bus = dbus_python_adapter.CachingBus(dbus_python)
86
103
 
87
104
    try:
88
105
        all_clients = bus.get_clients_and_properties()
236
253
def rfc3339_duration_to_delta(duration):
237
254
    """Parse an RFC 3339 "duration" and return a datetime.timedelta
238
255
 
239
 
    >>> rfc3339_duration_to_delta("P7D")
240
 
    datetime.timedelta(7)
241
 
    >>> rfc3339_duration_to_delta("PT60S")
242
 
    datetime.timedelta(0, 60)
243
 
    >>> rfc3339_duration_to_delta("PT60M")
244
 
    datetime.timedelta(0, 3600)
245
 
    >>> rfc3339_duration_to_delta("P60M")
246
 
    datetime.timedelta(1680)
247
 
    >>> rfc3339_duration_to_delta("PT24H")
248
 
    datetime.timedelta(1)
249
 
    >>> rfc3339_duration_to_delta("P1W")
250
 
    datetime.timedelta(7)
251
 
    >>> rfc3339_duration_to_delta("PT5M30S")
252
 
    datetime.timedelta(0, 330)
253
 
    >>> rfc3339_duration_to_delta("P1DT3M20S")
254
 
    datetime.timedelta(1, 200)
 
256
    >>> rfc3339_duration_to_delta("P7D") == datetime.timedelta(7)
 
257
    True
 
258
    >>> rfc3339_duration_to_delta("PT60S") == datetime.timedelta(0, 60)
 
259
    True
 
260
    >>> rfc3339_duration_to_delta("PT60M") == datetime.timedelta(hours=1)
 
261
    True
 
262
    >>> # 60 months
 
263
    >>> rfc3339_duration_to_delta("P60M") == datetime.timedelta(1680)
 
264
    True
 
265
    >>> rfc3339_duration_to_delta("PT24H") == datetime.timedelta(1)
 
266
    True
 
267
    >>> rfc3339_duration_to_delta("P1W") == datetime.timedelta(7)
 
268
    True
 
269
    >>> rfc3339_duration_to_delta("PT5M30S") == datetime.timedelta(0, 330)
 
270
    True
 
271
    >>> rfc3339_duration_to_delta("P1DT3M20S") == datetime.timedelta(1, 200)
 
272
    True
255
273
    >>> # Can not be empty:
256
274
    >>> rfc3339_duration_to_delta("")
257
275
    Traceback (most recent call last):
367
385
    """Parse an interval string as documented by Mandos before 1.6.1,
368
386
    and return a datetime.timedelta
369
387
 
370
 
    >>> parse_pre_1_6_1_interval('7d')
371
 
    datetime.timedelta(7)
372
 
    >>> parse_pre_1_6_1_interval('60s')
373
 
    datetime.timedelta(0, 60)
374
 
    >>> parse_pre_1_6_1_interval('60m')
375
 
    datetime.timedelta(0, 3600)
376
 
    >>> parse_pre_1_6_1_interval('24h')
377
 
    datetime.timedelta(1)
378
 
    >>> parse_pre_1_6_1_interval('1w')
379
 
    datetime.timedelta(7)
380
 
    >>> parse_pre_1_6_1_interval('5m 30s')
381
 
    datetime.timedelta(0, 330)
382
 
    >>> parse_pre_1_6_1_interval('')
383
 
    datetime.timedelta(0)
 
388
    >>> parse_pre_1_6_1_interval('7d') == datetime.timedelta(days=7)
 
389
    True
 
390
    >>> parse_pre_1_6_1_interval('60s') == datetime.timedelta(0, 60)
 
391
    True
 
392
    >>> parse_pre_1_6_1_interval('60m') == datetime.timedelta(hours=1)
 
393
    True
 
394
    >>> parse_pre_1_6_1_interval('24h') == datetime.timedelta(days=1)
 
395
    True
 
396
    >>> parse_pre_1_6_1_interval('1w') == datetime.timedelta(days=7)
 
397
    True
 
398
    >>> parse_pre_1_6_1_interval('5m 30s') == datetime.timedelta(0, 330)
 
399
    True
 
400
    >>> parse_pre_1_6_1_interval('') == datetime.timedelta(0)
 
401
    True
384
402
    >>> # Ignore unknown characters, allow any order and repetitions
385
 
    >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
386
 
    datetime.timedelta(2, 480, 18000)
 
403
    >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m') == datetime.timedelta(2, 480, 18000)
 
404
    True
387
405
 
388
406
    """
389
407
 
452
470
        parser.error("--remove can only be combined with --deny")
453
471
 
454
472
 
455
 
class dbus(object):
 
473
class dbus:
456
474
 
457
 
    class SystemBus(object):
 
475
    class SystemBus:
458
476
 
459
477
        object_manager_iface = "org.freedesktop.DBus.ObjectManager"
460
478
        def get_managed_objects(self, busname, objectpath):
506
524
        pass
507
525
 
508
526
 
509
 
class dbus_python_adapter(object):
 
527
class dbus_python_adapter:
510
528
 
511
529
    class SystemBus(dbus.MandosBus):
512
530
        """Use dbus-python"""
557
575
                        for key, subval in value.items()}
558
576
            return value
559
577
 
 
578
        def set_client_property(self, objectpath, key, value):
 
579
            if key == "Secret":
 
580
                if not isinstance(value, bytes):
 
581
                    value = value.encode("utf-8")
 
582
                value = self.dbus_python.ByteArray(value)
 
583
            return self.set_property(self.busname, objectpath,
 
584
                                     self.client_interface, key,
 
585
                                     value)
560
586
 
561
 
    class SilenceLogger(object):
 
587
    class SilenceLogger:
562
588
        "Simple context manager to silence a particular logger"
563
589
        def __init__(self, loggername):
564
590
            self.logger = logging.getLogger(loggername)
593
619
                return new_object
594
620
 
595
621
 
 
622
class pydbus_adapter:
 
623
    class SystemBus(dbus.MandosBus):
 
624
        def __init__(self, module=pydbus):
 
625
            self.pydbus = module
 
626
            self.bus = self.pydbus.SystemBus()
 
627
 
 
628
        @contextlib.contextmanager
 
629
        def convert_exception(self, exception_class=dbus.Error):
 
630
            try:
 
631
                yield
 
632
            except gi.repository.GLib.Error as e:
 
633
                # This does what "raise from" would do
 
634
                exc = exception_class(*e.args)
 
635
                exc.__cause__ = e
 
636
                raise exc
 
637
 
 
638
        def call_method(self, methodname, busname, objectpath,
 
639
                        interface, *args):
 
640
            proxy_object = self.get(busname, objectpath)
 
641
            log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
 
642
                      interface, methodname,
 
643
                      ", ".join(repr(a) for a in args))
 
644
            method = getattr(proxy_object[interface], methodname)
 
645
            with self.convert_exception():
 
646
                return method(*args)
 
647
 
 
648
        def get(self, busname, objectpath):
 
649
            log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
 
650
                      busname, objectpath)
 
651
            with self.convert_exception(dbus.ConnectFailed):
 
652
                if sys.version_info.major <= 2:
 
653
                    with warnings.catch_warnings():
 
654
                        warnings.filterwarnings(
 
655
                            "ignore", "", DeprecationWarning,
 
656
                            r"^xml\.etree\.ElementTree$")
 
657
                        return self.bus.get(busname, objectpath)
 
658
                else:
 
659
                    return self.bus.get(busname, objectpath)
 
660
 
 
661
        def set_property(self, busname, objectpath, interface, key,
 
662
                         value):
 
663
            proxy_object = self.get(busname, objectpath)
 
664
            log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
 
665
                      objectpath, self.properties_iface, interface,
 
666
                      key, value)
 
667
            setattr(proxy_object[interface], key, value)
 
668
 
 
669
    class CachingBus(SystemBus):
 
670
        """A caching layer for pydbus_adapter.SystemBus"""
 
671
        def __init__(self, *args, **kwargs):
 
672
            self.object_cache = {}
 
673
            super(pydbus_adapter.CachingBus,
 
674
                  self).__init__(*args, **kwargs)
 
675
        def get(self, busname, objectpath):
 
676
            try:
 
677
                return self.object_cache[(busname, objectpath)]
 
678
            except KeyError:
 
679
                new_object = (super(pydbus_adapter.CachingBus, self)
 
680
                              .get(busname, objectpath))
 
681
                self.object_cache[(busname, objectpath)]  = new_object
 
682
                return new_object
 
683
 
 
684
 
596
685
def commands_from_options(options):
597
686
 
598
687
    commands = list(options.commands)
626
715
    return commands
627
716
 
628
717
 
629
 
class command(object):
 
718
class command:
630
719
    """A namespace for command classes"""
631
720
 
632
 
    class Base(object):
 
721
    class Base:
633
722
        """Abstract base class for commands"""
634
723
        def run(self, clients, bus=None):
635
724
            """Normal commands should implement run_on_one_client(),
698
787
                keywords = self.all_keywords
699
788
            print(self.TableOfClients(clients.values(), keywords))
700
789
 
701
 
        class TableOfClients(object):
 
790
        class TableOfClients:
702
791
            tableheaders = {
703
792
                "Name": "Name",
704
793
                "Enabled": "Enabled",
935
1024
                                                     "output"))
936
1025
 
937
1026
 
938
 
class Unique(object):
 
1027
class Unique:
939
1028
    """Class for objects which exist only to be unique objects, since
940
1029
unittest.mock.sentinel only exists in Python 3.3"""
941
1030
 
1225
1314
class Test_dbus_python_adapter_SystemBus(TestCaseWithAssertLogs):
1226
1315
 
1227
1316
    def MockDBusPython_func(self, func):
1228
 
        class mock_dbus_python(object):
 
1317
        class mock_dbus_python:
1229
1318
            """mock dbus-python module"""
1230
 
            class exceptions(object):
 
1319
            class exceptions:
1231
1320
                """Pseudo-namespace"""
1232
1321
                class DBusException(Exception):
1233
1322
                    pass
1234
 
            class SystemBus(object):
 
1323
            class SystemBus:
1235
1324
                @staticmethod
1236
1325
                def get_object(busname, objectpath):
1237
1326
                    DBusObject = collections.namedtuple(
1238
 
                        "DBusObject", ("methodname",))
 
1327
                        "DBusObject", ("methodname", "Set"))
1239
1328
                    def method(*args, **kwargs):
1240
1329
                        self.assertEqual({"dbus_interface":
1241
1330
                                          "interface"},
1242
1331
                                         kwargs)
1243
1332
                        return func(*args)
1244
 
                    return DBusObject(methodname=method)
1245
 
            class Boolean(object):
 
1333
                    def set_property(interface, key, value,
 
1334
                                     dbus_interface=None):
 
1335
                        self.assertEqual(
 
1336
                            "org.freedesktop.DBus.Properties",
 
1337
                            dbus_interface)
 
1338
                        self.assertEqual("Secret", key)
 
1339
                        return func(interface, key, value,
 
1340
                                    dbus_interface=dbus_interface)
 
1341
                    return DBusObject(methodname=method,
 
1342
                                      Set=set_property)
 
1343
            class Boolean:
1246
1344
                def __init__(self, value):
1247
1345
                    self.value = bool(value)
1248
1346
                def __bool__(self):
1253
1351
                pass
1254
1352
            class Dictionary(dict):
1255
1353
                pass
 
1354
            class ByteArray(bytes):
 
1355
                pass
1256
1356
        return mock_dbus_python
1257
1357
 
1258
1358
    def call_method(self, bus, methodname, busname, objectpath,
1435
1535
        # Make sure the dbus logger was suppressed
1436
1536
        self.assertEqual(0, counting_handler.count)
1437
1537
 
 
1538
    def test_Set_Secret_sends_bytearray(self):
 
1539
        ret = [None]
 
1540
        def func(*args, **kwargs):
 
1541
            ret[0] = (args, kwargs)
 
1542
        mock_dbus_python = self.MockDBusPython_func(func)
 
1543
        bus = dbus_python_adapter.SystemBus(mock_dbus_python)
 
1544
        bus.set_client_property("objectpath", "Secret", "value")
 
1545
        expected_call = (("se.recompile.Mandos.Client", "Secret",
 
1546
                          mock_dbus_python.ByteArray(b"value")),
 
1547
                         {"dbus_interface":
 
1548
                          "org.freedesktop.DBus.Properties"})
 
1549
        self.assertEqual(expected_call, ret[0])
 
1550
        if sys.version_info.major == 2:
 
1551
            self.assertIsInstance(ret[0][0][-1],
 
1552
                                  mock_dbus_python.ByteArray)
 
1553
 
1438
1554
    def test_get_object_converts_to_correct_exception(self):
1439
1555
        bus = dbus_python_adapter.SystemBus(
1440
1556
            self.fake_dbus_python_raises_exception_on_connect)
1442
1558
            self.call_method(bus, "methodname", "busname",
1443
1559
                             "objectpath", "interface")
1444
1560
 
1445
 
    class fake_dbus_python_raises_exception_on_connect(object):
 
1561
    class fake_dbus_python_raises_exception_on_connect:
1446
1562
        """fake dbus-python module"""
1447
 
        class exceptions(object):
 
1563
        class exceptions:
1448
1564
            """Pseudo-namespace"""
1449
1565
            class DBusException(Exception):
1450
1566
                pass
1458
1574
 
1459
1575
 
1460
1576
class Test_dbus_python_adapter_CachingBus(unittest.TestCase):
1461
 
    class mock_dbus_python(object):
 
1577
    class mock_dbus_python:
1462
1578
        """mock dbus-python modules"""
1463
 
        class SystemBus(object):
 
1579
        class SystemBus:
1464
1580
            @staticmethod
1465
1581
            def get_object(busname, objectpath):
1466
1582
                return Unique()
1509
1625
        self.assertIs(obj1, obj1b)
1510
1626
 
1511
1627
 
 
1628
class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs):
 
1629
 
 
1630
    def Stub_pydbus_func(self, func):
 
1631
        class stub_pydbus:
 
1632
            """stub pydbus module"""
 
1633
            class SystemBus:
 
1634
                @staticmethod
 
1635
                def get(busname, objectpath):
 
1636
                    DBusObject = collections.namedtuple(
 
1637
                        "DBusObject", ("methodname",))
 
1638
                    return {"interface":
 
1639
                            DBusObject(methodname=func)}
 
1640
        return stub_pydbus
 
1641
 
 
1642
    def call_method(self, bus, methodname, busname, objectpath,
 
1643
                    interface, *args):
 
1644
        with self.assertLogs(log, logging.DEBUG):
 
1645
            return bus.call_method(methodname, busname, objectpath,
 
1646
                                   interface, *args)
 
1647
 
 
1648
    def test_call_method_returns(self):
 
1649
        expected_method_return = Unique()
 
1650
        method_args = (Unique(), Unique())
 
1651
        def func(*args):
 
1652
            self.assertEqual(len(method_args), len(args))
 
1653
            for marg, arg in zip(method_args, args):
 
1654
                self.assertIs(marg, arg)
 
1655
            return expected_method_return
 
1656
        stub_pydbus = self.Stub_pydbus_func(func)
 
1657
        bus = pydbus_adapter.SystemBus(stub_pydbus)
 
1658
        ret = self.call_method(bus, "methodname", "busname",
 
1659
                               "objectpath", "interface",
 
1660
                               *method_args)
 
1661
        self.assertIs(ret, expected_method_return)
 
1662
 
 
1663
    def test_call_method_handles_exception(self):
 
1664
        dbus_logger = logging.getLogger("dbus.proxies")
 
1665
 
 
1666
        def func():
 
1667
            raise gi.repository.GLib.Error()
 
1668
 
 
1669
        stub_pydbus = self.Stub_pydbus_func(func)
 
1670
        bus = pydbus_adapter.SystemBus(stub_pydbus)
 
1671
 
 
1672
        with self.assertRaises(dbus.Error) as e:
 
1673
            self.call_method(bus, "methodname", "busname",
 
1674
                             "objectpath", "interface")
 
1675
 
 
1676
        self.assertNotIsInstance(e, dbus.ConnectFailed)
 
1677
 
 
1678
    def test_get_converts_to_correct_exception(self):
 
1679
        bus = pydbus_adapter.SystemBus(
 
1680
            self.fake_pydbus_raises_exception_on_connect)
 
1681
        with self.assertRaises(dbus.ConnectFailed):
 
1682
            self.call_method(bus, "methodname", "busname",
 
1683
                             "objectpath", "interface")
 
1684
 
 
1685
    class fake_pydbus_raises_exception_on_connect:
 
1686
        """fake dbus-python module"""
 
1687
        @classmethod
 
1688
        def SystemBus(cls):
 
1689
            def get(busname, objectpath):
 
1690
                raise gi.repository.GLib.Error()
 
1691
            Bus = collections.namedtuple("Bus", ["get"])
 
1692
            return Bus(get=get)
 
1693
 
 
1694
    def test_set_property_uses_setattr(self):
 
1695
        class Object:
 
1696
            pass
 
1697
        obj = Object()
 
1698
        class pydbus_spy:
 
1699
            class SystemBus:
 
1700
                @staticmethod
 
1701
                def get(busname, objectpath):
 
1702
                    return {"interface": obj}
 
1703
        bus = pydbus_adapter.SystemBus(pydbus_spy)
 
1704
        value = Unique()
 
1705
        bus.set_property("busname", "objectpath", "interface", "key",
 
1706
                         value)
 
1707
        self.assertIs(value, obj.key)
 
1708
 
 
1709
    def test_get_suppresses_xml_deprecation_warning(self):
 
1710
        if sys.version_info.major >= 3:
 
1711
            return
 
1712
        class stub_pydbus_get:
 
1713
            class SystemBus:
 
1714
                @staticmethod
 
1715
                def get(busname, objectpath):
 
1716
                    warnings.warn_explicit(
 
1717
                        "deprecated", DeprecationWarning,
 
1718
                        "xml.etree.ElementTree", 0)
 
1719
        bus = pydbus_adapter.SystemBus(stub_pydbus_get)
 
1720
        with warnings.catch_warnings(record=True) as w:
 
1721
            warnings.simplefilter("always")
 
1722
            bus.get("busname", "objectpath")
 
1723
            self.assertEqual(0, len(w))
 
1724
 
 
1725
 
 
1726
class Test_pydbus_adapter_CachingBus(unittest.TestCase):
 
1727
    class stub_pydbus:
 
1728
        """stub pydbus module"""
 
1729
        class SystemBus:
 
1730
            @staticmethod
 
1731
            def get(busname, objectpath):
 
1732
                return Unique()
 
1733
 
 
1734
    def setUp(self):
 
1735
        self.bus = pydbus_adapter.CachingBus(self.stub_pydbus)
 
1736
 
 
1737
    def test_returns_distinct_objectpaths(self):
 
1738
        obj1 = self.bus.get("busname", "objectpath1")
 
1739
        self.assertIsInstance(obj1, Unique)
 
1740
        obj2 = self.bus.get("busname", "objectpath2")
 
1741
        self.assertIsInstance(obj2, Unique)
 
1742
        self.assertIsNot(obj1, obj2)
 
1743
 
 
1744
    def test_returns_distinct_busnames(self):
 
1745
        obj1 = self.bus.get("busname1", "objectpath")
 
1746
        self.assertIsInstance(obj1, Unique)
 
1747
        obj2 = self.bus.get("busname2", "objectpath")
 
1748
        self.assertIsInstance(obj2, Unique)
 
1749
        self.assertIsNot(obj1, obj2)
 
1750
 
 
1751
    def test_returns_distinct_both(self):
 
1752
        obj1 = self.bus.get("busname1", "objectpath")
 
1753
        self.assertIsInstance(obj1, Unique)
 
1754
        obj2 = self.bus.get("busname2", "objectpath")
 
1755
        self.assertIsInstance(obj2, Unique)
 
1756
        self.assertIsNot(obj1, obj2)
 
1757
 
 
1758
    def test_returns_same(self):
 
1759
        obj1 = self.bus.get("busname", "objectpath")
 
1760
        self.assertIsInstance(obj1, Unique)
 
1761
        obj2 = self.bus.get("busname", "objectpath")
 
1762
        self.assertIsInstance(obj2, Unique)
 
1763
        self.assertIs(obj1, obj2)
 
1764
 
 
1765
    def test_returns_same_old(self):
 
1766
        obj1 = self.bus.get("busname1", "objectpath1")
 
1767
        self.assertIsInstance(obj1, Unique)
 
1768
        obj2 = self.bus.get("busname2", "objectpath2")
 
1769
        self.assertIsInstance(obj2, Unique)
 
1770
        obj1b = self.bus.get("busname1", "objectpath1")
 
1771
        self.assertIsInstance(obj1b, Unique)
 
1772
        self.assertIsNot(obj1, obj2)
 
1773
        self.assertIsNot(obj2, obj1b)
 
1774
        self.assertIs(obj1, obj1b)
 
1775
 
 
1776
 
1512
1777
class Test_commands_from_options(unittest.TestCase):
1513
1778
 
1514
1779
    def setUp(self):
1519
1784
        self.assert_command_from_args(["--is-enabled", "client"],
1520
1785
                                      command.IsEnabled)
1521
1786
 
1522
 
    def assert_command_from_args(self, args, command_cls,
1523
 
                                 **cmd_attrs):
 
1787
    def assert_command_from_args(self, args, command_cls, length=1,
 
1788
                                 clients=None, **cmd_attrs):
1524
1789
        """Assert that parsing ARGS should result in an instance of
1525
1790
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1526
1791
        options = self.parser.parse_args(args)
1527
1792
        check_option_syntax(self.parser, options)
1528
1793
        commands = commands_from_options(options)
1529
 
        self.assertEqual(1, len(commands))
1530
 
        command = commands[0]
1531
 
        self.assertIsInstance(command, command_cls)
 
1794
        self.assertEqual(length, len(commands))
 
1795
        for command in commands:
 
1796
            if isinstance(command, command_cls):
 
1797
                break
 
1798
        else:
 
1799
            self.assertIsInstance(command, command_cls)
 
1800
        if clients is not None:
 
1801
            self.assertEqual(clients, options.client)
1532
1802
        for key, value in cmd_attrs.items():
1533
1803
            self.assertEqual(value, getattr(command, key))
1534
1804
 
 
1805
    def assert_commands_from_args(self, args, commands, clients=None):
 
1806
        for cmd in commands:
 
1807
            self.assert_command_from_args(args, cmd,
 
1808
                                          length=len(commands),
 
1809
                                          clients=clients)
 
1810
 
1535
1811
    def test_is_enabled_short(self):
1536
1812
        self.assert_command_from_args(["-V", "client"],
1537
1813
                                      command.IsEnabled)
1728
2004
                                      verbose=True)
1729
2005
 
1730
2006
 
 
2007
    def test_manual_page_example_1(self):
 
2008
        self.assert_command_from_args("",
 
2009
                                      command.PrintTable,
 
2010
                                      clients=[],
 
2011
                                      verbose=False)
 
2012
 
 
2013
    def test_manual_page_example_2(self):
 
2014
        self.assert_command_from_args(
 
2015
            "--verbose foo1.example.org foo2.example.org".split(),
 
2016
            command.PrintTable, clients=["foo1.example.org",
 
2017
                                         "foo2.example.org"],
 
2018
            verbose=True)
 
2019
 
 
2020
    def test_manual_page_example_3(self):
 
2021
        self.assert_command_from_args("--enable --all".split(),
 
2022
                                      command.Enable,
 
2023
                                      clients=[])
 
2024
 
 
2025
    def test_manual_page_example_4(self):
 
2026
        self.assert_commands_from_args(
 
2027
            ("--timeout=PT5M --interval=PT1M foo1.example.org"
 
2028
             " foo2.example.org").split(),
 
2029
            [command.SetTimeout, command.SetInterval],
 
2030
            clients=["foo1.example.org", "foo2.example.org"])
 
2031
 
 
2032
    def test_manual_page_example_5(self):
 
2033
        self.assert_command_from_args("--approve --all".split(),
 
2034
                                      command.Approve,
 
2035
                                      clients=[])
 
2036
 
 
2037
 
1731
2038
class TestCommand(unittest.TestCase):
1732
2039
    """Abstract class for tests of command classes"""
1733
2040