/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-08-30 21:46:16 UTC
  • mto: This revision was merged to the branch mainline in revision 392.
  • Revision ID: teddy@recompile.se-20190830214616-t1yhgfjz3bdggjd9
Use Python 3 by default

* INSTALL: Document Python 3 dependency.
* debian/control: Change both of "python (>= 2.7), python (<< 3)" to
                  "python (>= 3)" and change all python-* dependencies
                  to be python3-* instead.
* mandos: Change first line to use "python3" instead of "python".
* mandos-ctl: - '' -
* mandos-monitor: - '' -

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/python
2
 
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
 
1
#!/usr/bin/python3 -bb
 
2
# -*- after-save-hook: (lambda () (let ((command (if (fboundp 'file-local-name) (file-local-name (buffer-file-name)) (or (file-remote-p (buffer-file-name) 'localname) (buffer-file-name))))) (if (= (progn (if (get-buffer "*Test*") (kill-buffer "*Test*")) (process-file-shell-command (format "%s --check" (shell-quote-argument command)) nil "*Test*")) 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w))) (progn (with-current-buffer "*Test*" (compilation-mode)) (display-buffer "*Test*" '(display-buffer-in-side-window)))))); coding: utf-8 -*-
3
3
#
4
4
# Mandos Monitor - Control and monitor the Mandos server
5
5
#
46
46
import tempfile
47
47
import contextlib
48
48
 
49
 
import dbus as dbus_python
 
49
if sys.version_info.major == 2:
 
50
    __metaclass__ = type
 
51
 
 
52
try:
 
53
    import pydbus
 
54
    import gi
 
55
    dbus_python = None
 
56
except ImportError:
 
57
    import dbus as dbus_python
 
58
    pydbus = None
 
59
    class gi:
 
60
        """Dummy gi module, for the tests"""
 
61
        class repository:
 
62
            class GLib:
 
63
                class Error(Exception):
 
64
                    pass
50
65
 
51
66
# Show warnings by default
52
67
if not sys.warnoptions:
66
81
 
67
82
locale.setlocale(locale.LC_ALL, "")
68
83
 
69
 
version = "1.8.3"
 
84
version = "1.8.8"
70
85
 
71
86
 
72
87
def main():
81
96
    if options.debug:
82
97
        log.setLevel(logging.DEBUG)
83
98
 
84
 
    bus = dbus_python_adapter.CachingBus(dbus_python)
 
99
    if pydbus is not None:
 
100
        bus = pydbus_adapter.CachingBus(pydbus)
 
101
    else:
 
102
        bus = dbus_python_adapter.CachingBus(dbus_python)
85
103
 
86
104
    try:
87
105
        all_clients = bus.get_clients_and_properties()
235
253
def rfc3339_duration_to_delta(duration):
236
254
    """Parse an RFC 3339 "duration" and return a datetime.timedelta
237
255
 
238
 
    >>> rfc3339_duration_to_delta("P7D")
239
 
    datetime.timedelta(7)
240
 
    >>> rfc3339_duration_to_delta("PT60S")
241
 
    datetime.timedelta(0, 60)
242
 
    >>> rfc3339_duration_to_delta("PT60M")
243
 
    datetime.timedelta(0, 3600)
244
 
    >>> rfc3339_duration_to_delta("P60M")
245
 
    datetime.timedelta(1680)
246
 
    >>> rfc3339_duration_to_delta("PT24H")
247
 
    datetime.timedelta(1)
248
 
    >>> rfc3339_duration_to_delta("P1W")
249
 
    datetime.timedelta(7)
250
 
    >>> rfc3339_duration_to_delta("PT5M30S")
251
 
    datetime.timedelta(0, 330)
252
 
    >>> rfc3339_duration_to_delta("P1DT3M20S")
253
 
    datetime.timedelta(1, 200)
 
256
    >>> rfc3339_duration_to_delta("P7D") == datetime.timedelta(7)
 
257
    True
 
258
    >>> rfc3339_duration_to_delta("PT60S") == datetime.timedelta(0, 60)
 
259
    True
 
260
    >>> rfc3339_duration_to_delta("PT60M") == datetime.timedelta(hours=1)
 
261
    True
 
262
    >>> # 60 months
 
263
    >>> rfc3339_duration_to_delta("P60M") == datetime.timedelta(1680)
 
264
    True
 
265
    >>> rfc3339_duration_to_delta("PT24H") == datetime.timedelta(1)
 
266
    True
 
267
    >>> rfc3339_duration_to_delta("P1W") == datetime.timedelta(7)
 
268
    True
 
269
    >>> rfc3339_duration_to_delta("PT5M30S") == datetime.timedelta(0, 330)
 
270
    True
 
271
    >>> rfc3339_duration_to_delta("P1DT3M20S") == datetime.timedelta(1, 200)
 
272
    True
254
273
    >>> # Can not be empty:
255
274
    >>> rfc3339_duration_to_delta("")
256
275
    Traceback (most recent call last):
366
385
    """Parse an interval string as documented by Mandos before 1.6.1,
367
386
    and return a datetime.timedelta
368
387
 
369
 
    >>> parse_pre_1_6_1_interval('7d')
370
 
    datetime.timedelta(7)
371
 
    >>> parse_pre_1_6_1_interval('60s')
372
 
    datetime.timedelta(0, 60)
373
 
    >>> parse_pre_1_6_1_interval('60m')
374
 
    datetime.timedelta(0, 3600)
375
 
    >>> parse_pre_1_6_1_interval('24h')
376
 
    datetime.timedelta(1)
377
 
    >>> parse_pre_1_6_1_interval('1w')
378
 
    datetime.timedelta(7)
379
 
    >>> parse_pre_1_6_1_interval('5m 30s')
380
 
    datetime.timedelta(0, 330)
381
 
    >>> parse_pre_1_6_1_interval('')
382
 
    datetime.timedelta(0)
 
388
    >>> parse_pre_1_6_1_interval('7d') == datetime.timedelta(days=7)
 
389
    True
 
390
    >>> parse_pre_1_6_1_interval('60s') == datetime.timedelta(0, 60)
 
391
    True
 
392
    >>> parse_pre_1_6_1_interval('60m') == datetime.timedelta(hours=1)
 
393
    True
 
394
    >>> parse_pre_1_6_1_interval('24h') == datetime.timedelta(days=1)
 
395
    True
 
396
    >>> parse_pre_1_6_1_interval('1w') == datetime.timedelta(days=7)
 
397
    True
 
398
    >>> parse_pre_1_6_1_interval('5m 30s') == datetime.timedelta(0, 330)
 
399
    True
 
400
    >>> parse_pre_1_6_1_interval('') == datetime.timedelta(0)
 
401
    True
383
402
    >>> # Ignore unknown characters, allow any order and repetitions
384
 
    >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
385
 
    datetime.timedelta(2, 480, 18000)
 
403
    >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m') == datetime.timedelta(2, 480, 18000)
 
404
    True
386
405
 
387
406
    """
388
407
 
451
470
        parser.error("--remove can only be combined with --deny")
452
471
 
453
472
 
454
 
class dbus(object):
 
473
class dbus:
455
474
 
456
 
    class SystemBus(object):
 
475
    class SystemBus:
457
476
 
458
477
        object_manager_iface = "org.freedesktop.DBus.ObjectManager"
459
478
        def get_managed_objects(self, busname, objectpath):
505
524
        pass
506
525
 
507
526
 
508
 
class dbus_python_adapter(object):
 
527
class dbus_python_adapter:
509
528
 
510
529
    class SystemBus(dbus.MandosBus):
511
530
        """Use dbus-python"""
556
575
                        for key, subval in value.items()}
557
576
            return value
558
577
 
 
578
        def set_client_property(self, objectpath, key, value):
 
579
            if key == "Secret":
 
580
                if not isinstance(value, bytes):
 
581
                    value = value.encode("utf-8")
 
582
                value = self.dbus_python.ByteArray(value)
 
583
            return self.set_property(self.busname, objectpath,
 
584
                                     self.client_interface, key,
 
585
                                     value)
559
586
 
560
 
    class SilenceLogger(object):
 
587
    class SilenceLogger:
561
588
        "Simple context manager to silence a particular logger"
562
589
        def __init__(self, loggername):
563
590
            self.logger = logging.getLogger(loggername)
592
619
                return new_object
593
620
 
594
621
 
 
622
class pydbus_adapter:
 
623
    class SystemBus(dbus.MandosBus):
 
624
        def __init__(self, module=pydbus):
 
625
            self.pydbus = module
 
626
            self.bus = self.pydbus.SystemBus()
 
627
 
 
628
        @contextlib.contextmanager
 
629
        def convert_exception(self, exception_class=dbus.Error):
 
630
            try:
 
631
                yield
 
632
            except gi.repository.GLib.Error as e:
 
633
                # This does what "raise from" would do
 
634
                exc = exception_class(*e.args)
 
635
                exc.__cause__ = e
 
636
                raise exc
 
637
 
 
638
        def call_method(self, methodname, busname, objectpath,
 
639
                        interface, *args):
 
640
            proxy_object = self.get(busname, objectpath)
 
641
            log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
 
642
                      interface, methodname,
 
643
                      ", ".join(repr(a) for a in args))
 
644
            method = getattr(proxy_object[interface], methodname)
 
645
            with self.convert_exception():
 
646
                return method(*args)
 
647
 
 
648
        def get(self, busname, objectpath):
 
649
            log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
 
650
                      busname, objectpath)
 
651
            with self.convert_exception(dbus.ConnectFailed):
 
652
                if sys.version_info.major <= 2:
 
653
                    with warnings.catch_warnings():
 
654
                        warnings.filterwarnings(
 
655
                            "ignore", "", DeprecationWarning,
 
656
                            r"^xml\.etree\.ElementTree$")
 
657
                        return self.bus.get(busname, objectpath)
 
658
                else:
 
659
                    return self.bus.get(busname, objectpath)
 
660
 
 
661
        def set_property(self, busname, objectpath, interface, key,
 
662
                         value):
 
663
            proxy_object = self.get(busname, objectpath)
 
664
            log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
 
665
                      objectpath, self.properties_iface, interface,
 
666
                      key, value)
 
667
            setattr(proxy_object[interface], key, value)
 
668
 
 
669
    class CachingBus(SystemBus):
 
670
        """A caching layer for pydbus_adapter.SystemBus"""
 
671
        def __init__(self, *args, **kwargs):
 
672
            self.object_cache = {}
 
673
            super(pydbus_adapter.CachingBus,
 
674
                  self).__init__(*args, **kwargs)
 
675
        def get(self, busname, objectpath):
 
676
            try:
 
677
                return self.object_cache[(busname, objectpath)]
 
678
            except KeyError:
 
679
                new_object = (super(pydbus_adapter.CachingBus, self)
 
680
                              .get(busname, objectpath))
 
681
                self.object_cache[(busname, objectpath)]  = new_object
 
682
                return new_object
 
683
 
 
684
 
595
685
def commands_from_options(options):
596
686
 
597
687
    commands = list(options.commands)
625
715
    return commands
626
716
 
627
717
 
628
 
class command(object):
 
718
class command:
629
719
    """A namespace for command classes"""
630
720
 
631
 
    class Base(object):
 
721
    class Base:
632
722
        """Abstract base class for commands"""
633
723
        def run(self, clients, bus=None):
634
724
            """Normal commands should implement run_on_one_client(),
697
787
                keywords = self.all_keywords
698
788
            print(self.TableOfClients(clients.values(), keywords))
699
789
 
700
 
        class TableOfClients(object):
 
790
        class TableOfClients:
701
791
            tableheaders = {
702
792
                "Name": "Name",
703
793
                "Enabled": "Enabled",
934
1024
                                                     "output"))
935
1025
 
936
1026
 
937
 
class Unique(object):
 
1027
class Unique:
938
1028
    """Class for objects which exist only to be unique objects, since
939
1029
unittest.mock.sentinel only exists in Python 3.3"""
940
1030
 
1224
1314
class Test_dbus_python_adapter_SystemBus(TestCaseWithAssertLogs):
1225
1315
 
1226
1316
    def MockDBusPython_func(self, func):
1227
 
        class mock_dbus_python(object):
 
1317
        class mock_dbus_python:
1228
1318
            """mock dbus-python module"""
1229
 
            class exceptions(object):
 
1319
            class exceptions:
1230
1320
                """Pseudo-namespace"""
1231
1321
                class DBusException(Exception):
1232
1322
                    pass
1233
 
            class SystemBus(object):
 
1323
            class SystemBus:
1234
1324
                @staticmethod
1235
1325
                def get_object(busname, objectpath):
1236
1326
                    DBusObject = collections.namedtuple(
1237
 
                        "DBusObject", ("methodname",))
 
1327
                        "DBusObject", ("methodname", "Set"))
1238
1328
                    def method(*args, **kwargs):
1239
1329
                        self.assertEqual({"dbus_interface":
1240
1330
                                          "interface"},
1241
1331
                                         kwargs)
1242
1332
                        return func(*args)
1243
 
                    return DBusObject(methodname=method)
1244
 
            class Boolean(object):
 
1333
                    def set_property(interface, key, value,
 
1334
                                     dbus_interface=None):
 
1335
                        self.assertEqual(
 
1336
                            "org.freedesktop.DBus.Properties",
 
1337
                            dbus_interface)
 
1338
                        self.assertEqual("Secret", key)
 
1339
                        return func(interface, key, value,
 
1340
                                    dbus_interface=dbus_interface)
 
1341
                    return DBusObject(methodname=method,
 
1342
                                      Set=set_property)
 
1343
            class Boolean:
1245
1344
                def __init__(self, value):
1246
1345
                    self.value = bool(value)
1247
1346
                def __bool__(self):
1252
1351
                pass
1253
1352
            class Dictionary(dict):
1254
1353
                pass
 
1354
            class ByteArray(bytes):
 
1355
                pass
1255
1356
        return mock_dbus_python
1256
1357
 
1257
1358
    def call_method(self, bus, methodname, busname, objectpath,
1434
1535
        # Make sure the dbus logger was suppressed
1435
1536
        self.assertEqual(0, counting_handler.count)
1436
1537
 
 
1538
    def test_Set_Secret_sends_bytearray(self):
 
1539
        ret = [None]
 
1540
        def func(*args, **kwargs):
 
1541
            ret[0] = (args, kwargs)
 
1542
        mock_dbus_python = self.MockDBusPython_func(func)
 
1543
        bus = dbus_python_adapter.SystemBus(mock_dbus_python)
 
1544
        bus.set_client_property("objectpath", "Secret", "value")
 
1545
        expected_call = (("se.recompile.Mandos.Client", "Secret",
 
1546
                          mock_dbus_python.ByteArray(b"value")),
 
1547
                         {"dbus_interface":
 
1548
                          "org.freedesktop.DBus.Properties"})
 
1549
        self.assertEqual(expected_call, ret[0])
 
1550
        if sys.version_info.major == 2:
 
1551
            self.assertIsInstance(ret[0][0][-1],
 
1552
                                  mock_dbus_python.ByteArray)
 
1553
 
1437
1554
    def test_get_object_converts_to_correct_exception(self):
1438
1555
        bus = dbus_python_adapter.SystemBus(
1439
1556
            self.fake_dbus_python_raises_exception_on_connect)
1441
1558
            self.call_method(bus, "methodname", "busname",
1442
1559
                             "objectpath", "interface")
1443
1560
 
1444
 
    class fake_dbus_python_raises_exception_on_connect(object):
 
1561
    class fake_dbus_python_raises_exception_on_connect:
1445
1562
        """fake dbus-python module"""
1446
 
        class exceptions(object):
 
1563
        class exceptions:
1447
1564
            """Pseudo-namespace"""
1448
1565
            class DBusException(Exception):
1449
1566
                pass
1457
1574
 
1458
1575
 
1459
1576
class Test_dbus_python_adapter_CachingBus(unittest.TestCase):
1460
 
    class mock_dbus_python(object):
 
1577
    class mock_dbus_python:
1461
1578
        """mock dbus-python modules"""
1462
 
        class SystemBus(object):
 
1579
        class SystemBus:
1463
1580
            @staticmethod
1464
1581
            def get_object(busname, objectpath):
1465
1582
                return Unique()
1508
1625
        self.assertIs(obj1, obj1b)
1509
1626
 
1510
1627
 
 
1628
class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs):
 
1629
 
 
1630
    def Stub_pydbus_func(self, func):
 
1631
        class stub_pydbus:
 
1632
            """stub pydbus module"""
 
1633
            class SystemBus:
 
1634
                @staticmethod
 
1635
                def get(busname, objectpath):
 
1636
                    DBusObject = collections.namedtuple(
 
1637
                        "DBusObject", ("methodname",))
 
1638
                    return {"interface":
 
1639
                            DBusObject(methodname=func)}
 
1640
        return stub_pydbus
 
1641
 
 
1642
    def call_method(self, bus, methodname, busname, objectpath,
 
1643
                    interface, *args):
 
1644
        with self.assertLogs(log, logging.DEBUG):
 
1645
            return bus.call_method(methodname, busname, objectpath,
 
1646
                                   interface, *args)
 
1647
 
 
1648
    def test_call_method_returns(self):
 
1649
        expected_method_return = Unique()
 
1650
        method_args = (Unique(), Unique())
 
1651
        def func(*args):
 
1652
            self.assertEqual(len(method_args), len(args))
 
1653
            for marg, arg in zip(method_args, args):
 
1654
                self.assertIs(marg, arg)
 
1655
            return expected_method_return
 
1656
        stub_pydbus = self.Stub_pydbus_func(func)
 
1657
        bus = pydbus_adapter.SystemBus(stub_pydbus)
 
1658
        ret = self.call_method(bus, "methodname", "busname",
 
1659
                               "objectpath", "interface",
 
1660
                               *method_args)
 
1661
        self.assertIs(ret, expected_method_return)
 
1662
 
 
1663
    def test_call_method_handles_exception(self):
 
1664
        dbus_logger = logging.getLogger("dbus.proxies")
 
1665
 
 
1666
        def func():
 
1667
            raise gi.repository.GLib.Error()
 
1668
 
 
1669
        stub_pydbus = self.Stub_pydbus_func(func)
 
1670
        bus = pydbus_adapter.SystemBus(stub_pydbus)
 
1671
 
 
1672
        with self.assertRaises(dbus.Error) as e:
 
1673
            self.call_method(bus, "methodname", "busname",
 
1674
                             "objectpath", "interface")
 
1675
 
 
1676
        self.assertNotIsInstance(e, dbus.ConnectFailed)
 
1677
 
 
1678
    def test_get_converts_to_correct_exception(self):
 
1679
        bus = pydbus_adapter.SystemBus(
 
1680
            self.fake_pydbus_raises_exception_on_connect)
 
1681
        with self.assertRaises(dbus.ConnectFailed):
 
1682
            self.call_method(bus, "methodname", "busname",
 
1683
                             "objectpath", "interface")
 
1684
 
 
1685
    class fake_pydbus_raises_exception_on_connect:
 
1686
        """fake dbus-python module"""
 
1687
        @classmethod
 
1688
        def SystemBus(cls):
 
1689
            def get(busname, objectpath):
 
1690
                raise gi.repository.GLib.Error()
 
1691
            Bus = collections.namedtuple("Bus", ["get"])
 
1692
            return Bus(get=get)
 
1693
 
 
1694
    def test_set_property_uses_setattr(self):
 
1695
        class Object:
 
1696
            pass
 
1697
        obj = Object()
 
1698
        class pydbus_spy:
 
1699
            class SystemBus:
 
1700
                @staticmethod
 
1701
                def get(busname, objectpath):
 
1702
                    return {"interface": obj}
 
1703
        bus = pydbus_adapter.SystemBus(pydbus_spy)
 
1704
        value = Unique()
 
1705
        bus.set_property("busname", "objectpath", "interface", "key",
 
1706
                         value)
 
1707
        self.assertIs(value, obj.key)
 
1708
 
 
1709
    def test_get_suppresses_xml_deprecation_warning(self):
 
1710
        if sys.version_info.major >= 3:
 
1711
            return
 
1712
        class stub_pydbus_get:
 
1713
            class SystemBus:
 
1714
                @staticmethod
 
1715
                def get(busname, objectpath):
 
1716
                    warnings.warn_explicit(
 
1717
                        "deprecated", DeprecationWarning,
 
1718
                        "xml.etree.ElementTree", 0)
 
1719
        bus = pydbus_adapter.SystemBus(stub_pydbus_get)
 
1720
        with warnings.catch_warnings(record=True) as w:
 
1721
            warnings.simplefilter("always")
 
1722
            bus.get("busname", "objectpath")
 
1723
            self.assertEqual(0, len(w))
 
1724
 
 
1725
 
 
1726
class Test_pydbus_adapter_CachingBus(unittest.TestCase):
 
1727
    class stub_pydbus:
 
1728
        """stub pydbus module"""
 
1729
        class SystemBus:
 
1730
            @staticmethod
 
1731
            def get(busname, objectpath):
 
1732
                return Unique()
 
1733
 
 
1734
    def setUp(self):
 
1735
        self.bus = pydbus_adapter.CachingBus(self.stub_pydbus)
 
1736
 
 
1737
    def test_returns_distinct_objectpaths(self):
 
1738
        obj1 = self.bus.get("busname", "objectpath1")
 
1739
        self.assertIsInstance(obj1, Unique)
 
1740
        obj2 = self.bus.get("busname", "objectpath2")
 
1741
        self.assertIsInstance(obj2, Unique)
 
1742
        self.assertIsNot(obj1, obj2)
 
1743
 
 
1744
    def test_returns_distinct_busnames(self):
 
1745
        obj1 = self.bus.get("busname1", "objectpath")
 
1746
        self.assertIsInstance(obj1, Unique)
 
1747
        obj2 = self.bus.get("busname2", "objectpath")
 
1748
        self.assertIsInstance(obj2, Unique)
 
1749
        self.assertIsNot(obj1, obj2)
 
1750
 
 
1751
    def test_returns_distinct_both(self):
 
1752
        obj1 = self.bus.get("busname1", "objectpath")
 
1753
        self.assertIsInstance(obj1, Unique)
 
1754
        obj2 = self.bus.get("busname2", "objectpath")
 
1755
        self.assertIsInstance(obj2, Unique)
 
1756
        self.assertIsNot(obj1, obj2)
 
1757
 
 
1758
    def test_returns_same(self):
 
1759
        obj1 = self.bus.get("busname", "objectpath")
 
1760
        self.assertIsInstance(obj1, Unique)
 
1761
        obj2 = self.bus.get("busname", "objectpath")
 
1762
        self.assertIsInstance(obj2, Unique)
 
1763
        self.assertIs(obj1, obj2)
 
1764
 
 
1765
    def test_returns_same_old(self):
 
1766
        obj1 = self.bus.get("busname1", "objectpath1")
 
1767
        self.assertIsInstance(obj1, Unique)
 
1768
        obj2 = self.bus.get("busname2", "objectpath2")
 
1769
        self.assertIsInstance(obj2, Unique)
 
1770
        obj1b = self.bus.get("busname1", "objectpath1")
 
1771
        self.assertIsInstance(obj1b, Unique)
 
1772
        self.assertIsNot(obj1, obj2)
 
1773
        self.assertIsNot(obj2, obj1b)
 
1774
        self.assertIs(obj1, obj1b)
 
1775
 
 
1776
 
1511
1777
class Test_commands_from_options(unittest.TestCase):
1512
1778
 
1513
1779
    def setUp(self):
1518
1784
        self.assert_command_from_args(["--is-enabled", "client"],
1519
1785
                                      command.IsEnabled)
1520
1786
 
1521
 
    def assert_command_from_args(self, args, command_cls,
1522
 
                                 **cmd_attrs):
 
1787
    def assert_command_from_args(self, args, command_cls, length=1,
 
1788
                                 clients=None, **cmd_attrs):
1523
1789
        """Assert that parsing ARGS should result in an instance of
1524
1790
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1525
1791
        options = self.parser.parse_args(args)
1526
1792
        check_option_syntax(self.parser, options)
1527
1793
        commands = commands_from_options(options)
1528
 
        self.assertEqual(1, len(commands))
1529
 
        command = commands[0]
1530
 
        self.assertIsInstance(command, command_cls)
 
1794
        self.assertEqual(length, len(commands))
 
1795
        for command in commands:
 
1796
            if isinstance(command, command_cls):
 
1797
                break
 
1798
        else:
 
1799
            self.assertIsInstance(command, command_cls)
 
1800
        if clients is not None:
 
1801
            self.assertEqual(clients, options.client)
1531
1802
        for key, value in cmd_attrs.items():
1532
1803
            self.assertEqual(value, getattr(command, key))
1533
1804
 
 
1805
    def assert_commands_from_args(self, args, commands, clients=None):
 
1806
        for cmd in commands:
 
1807
            self.assert_command_from_args(args, cmd,
 
1808
                                          length=len(commands),
 
1809
                                          clients=clients)
 
1810
 
1534
1811
    def test_is_enabled_short(self):
1535
1812
        self.assert_command_from_args(["-V", "client"],
1536
1813
                                      command.IsEnabled)
1727
2004
                                      verbose=True)
1728
2005
 
1729
2006
 
 
2007
    def test_manual_page_example_1(self):
 
2008
        self.assert_command_from_args("",
 
2009
                                      command.PrintTable,
 
2010
                                      clients=[],
 
2011
                                      verbose=False)
 
2012
 
 
2013
    def test_manual_page_example_2(self):
 
2014
        self.assert_command_from_args(
 
2015
            "--verbose foo1.example.org foo2.example.org".split(),
 
2016
            command.PrintTable, clients=["foo1.example.org",
 
2017
                                         "foo2.example.org"],
 
2018
            verbose=True)
 
2019
 
 
2020
    def test_manual_page_example_3(self):
 
2021
        self.assert_command_from_args("--enable --all".split(),
 
2022
                                      command.Enable,
 
2023
                                      clients=[])
 
2024
 
 
2025
    def test_manual_page_example_4(self):
 
2026
        self.assert_commands_from_args(
 
2027
            ("--timeout=PT5M --interval=PT1M foo1.example.org"
 
2028
             " foo2.example.org").split(),
 
2029
            [command.SetTimeout, command.SetInterval],
 
2030
            clients=["foo1.example.org", "foo2.example.org"])
 
2031
 
 
2032
    def test_manual_page_example_5(self):
 
2033
        self.assert_command_from_args("--approve --all".split(),
 
2034
                                      command.Approve,
 
2035
                                      clients=[])
 
2036
 
 
2037
 
1730
2038
class TestCommand(unittest.TestCase):
1731
2039
    """Abstract class for tests of command classes"""
1732
2040