/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-31 04:39:25 UTC
  • Revision ID: teddy@recompile.se-20190331043925-0j9pdspo3hux5nka
mandos-ctl: Refactor: Move command list generation into argparse

* mandos-ctl (add_command_line_options): Don't simply set flags for
                                         each option; instead, add
                                         command objects to a
                                         "commands" list.
  (check_option_syntax): Rework to deal with an already-generated
                         command list.
  (commands_from_options): Remove all code to generate command list,
                           but add more code to move command.Deny
                           ahead of command.Remove in command list.
  (command.PropertySetterValue.argparse): New; used in
                                          add_command_line_options.
  (Test_check_option_syntax): Refactor to pass actual string arguments
                              to parse_args() instead of modifying an
                              argparse.Namespace.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/python3 -bbI
2
 
# -*- coding: utf-8; lexical-binding: t -*-
3
 
#
4
 
# Mandos Control - Control or query the Mandos server
5
 
#
6
 
# Copyright © 2008-2022 Teddy Hogeborn
7
 
# Copyright © 2008-2022 Björn Påhlsson
 
1
#!/usr/bin/python
 
2
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
 
3
#
 
4
# Mandos Monitor - Control and monitor the Mandos server
 
5
#
 
6
# Copyright © 2008-2019 Teddy Hogeborn
 
7
# Copyright © 2008-2019 Björn Påhlsson
8
8
#
9
9
# This file is part of Mandos.
10
10
#
23
23
#
24
24
# Contact the authors at <mandos@recompile.se>.
25
25
#
 
26
 
26
27
from __future__ import (division, absolute_import, print_function,
27
28
                        unicode_literals)
28
29
 
32
33
    pass
33
34
 
34
35
import sys
35
 
import unittest
36
36
import argparse
37
 
import logging
38
 
import os
39
37
import locale
40
38
import datetime
41
39
import re
 
40
import os
42
41
import collections
43
42
import json
 
43
import unittest
 
44
import logging
44
45
import io
45
46
import tempfile
46
47
import contextlib
47
 
 
48
 
if sys.version_info.major == 2:
49
 
    __metaclass__ = type
50
 
    str = unicode
51
 
    input = raw_input
52
 
 
53
 
 
54
 
class gi:
55
 
    """Dummy gi module, for the tests"""
56
 
    class repository:
57
 
        class GLib:
58
 
            class Error(Exception):
59
 
                pass
60
 
 
61
 
 
62
 
dbussy = None
63
 
ravel = None
64
 
dbus_python = None
65
 
pydbus = None
66
 
 
67
 
try:
68
 
    import dbussy
69
 
    import ravel
70
 
except ImportError:
71
 
    try:
72
 
        import pydbus
73
 
        import gi
74
 
    except ImportError:
75
 
        import dbus as dbus_python
76
 
 
 
48
import abc
 
49
 
 
50
import dbus as dbus_python
77
51
 
78
52
# Show warnings by default
79
53
if not sys.warnoptions:
80
54
    import warnings
81
55
    warnings.simplefilter("default")
82
56
 
83
 
log = logging.getLogger(os.path.basename(sys.argv[0]))
84
 
logging.basicConfig(level="INFO",         # Show info level messages
 
57
log = logging.getLogger(sys.argv[0])
 
58
logging.basicConfig(level="INFO", # Show info level messages
85
59
                    format="%(message)s") # Show basic log messages
86
60
 
87
61
logging.captureWarnings(True)   # Show warnings via the logging system
88
62
 
89
63
if sys.version_info.major == 2:
 
64
    str = unicode
90
65
    import StringIO
91
66
    io.StringIO = StringIO.StringIO
92
67
 
93
68
locale.setlocale(locale.LC_ALL, "")
94
69
 
95
 
version = "1.8.16"
 
70
version = "1.8.3"
96
71
 
97
72
 
98
73
def main():
105
80
    clientnames = options.client
106
81
 
107
82
    if options.debug:
108
 
        logging.getLogger("").setLevel(logging.DEBUG)
 
83
        log.setLevel(logging.DEBUG)
109
84
 
110
 
    if dbussy is not None and ravel is not None:
111
 
        bus = dbussy_adapter.CachingBus(dbussy, ravel)
112
 
    elif pydbus is not None:
113
 
        bus = pydbus_adapter.CachingBus(pydbus)
114
 
    else:
115
 
        bus = dbus_python_adapter.CachingBus(dbus_python)
 
85
    bus = dbus_python_adapter.CachingBus(dbus_python)
116
86
 
117
87
    try:
118
88
        all_clients = bus.get_clients_and_properties()
259
229
        return rfc3339_duration_to_delta(interval)
260
230
    except ValueError as e:
261
231
        log.warning("%s - Parsing as pre-1.6.1 interval instead",
262
 
                    " ".join(e.args))
 
232
                    ' '.join(e.args))
263
233
    return parse_pre_1_6_1_interval(interval)
264
234
 
265
235
 
266
236
def rfc3339_duration_to_delta(duration):
267
237
    """Parse an RFC 3339 "duration" and return a datetime.timedelta
268
238
 
269
 
    >>> rfc3339_duration_to_delta("P7D") == datetime.timedelta(7)
270
 
    True
271
 
    >>> rfc3339_duration_to_delta("PT60S") == datetime.timedelta(0, 60)
272
 
    True
273
 
    >>> rfc3339_duration_to_delta("PT60M") == datetime.timedelta(hours=1)
274
 
    True
275
 
    >>> # 60 months
276
 
    >>> rfc3339_duration_to_delta("P60M") == datetime.timedelta(1680)
277
 
    True
278
 
    >>> rfc3339_duration_to_delta("PT24H") == datetime.timedelta(1)
279
 
    True
280
 
    >>> rfc3339_duration_to_delta("P1W") == datetime.timedelta(7)
281
 
    True
282
 
    >>> rfc3339_duration_to_delta("PT5M30S") == datetime.timedelta(0, 330)
283
 
    True
284
 
    >>> rfc3339_duration_to_delta("P1DT3M20S") == datetime.timedelta(1, 200)
285
 
    True
 
239
    >>> rfc3339_duration_to_delta("P7D")
 
240
    datetime.timedelta(7)
 
241
    >>> rfc3339_duration_to_delta("PT60S")
 
242
    datetime.timedelta(0, 60)
 
243
    >>> rfc3339_duration_to_delta("PT60M")
 
244
    datetime.timedelta(0, 3600)
 
245
    >>> rfc3339_duration_to_delta("P60M")
 
246
    datetime.timedelta(1680)
 
247
    >>> rfc3339_duration_to_delta("PT24H")
 
248
    datetime.timedelta(1)
 
249
    >>> rfc3339_duration_to_delta("P1W")
 
250
    datetime.timedelta(7)
 
251
    >>> rfc3339_duration_to_delta("PT5M30S")
 
252
    datetime.timedelta(0, 330)
 
253
    >>> rfc3339_duration_to_delta("P1DT3M20S")
 
254
    datetime.timedelta(1, 200)
286
255
    >>> # Can not be empty:
287
256
    >>> rfc3339_duration_to_delta("")
288
257
    Traceback (most recent call last):
395
364
 
396
365
 
397
366
def parse_pre_1_6_1_interval(interval):
398
 
    r"""Parse an interval string as documented by Mandos before 1.6.1,
 
367
    """Parse an interval string as documented by Mandos before 1.6.1,
399
368
    and return a datetime.timedelta
400
369
 
401
 
    >>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7)
402
 
    True
403
 
    >>> parse_pre_1_6_1_interval("60s") == datetime.timedelta(0, 60)
404
 
    True
405
 
    >>> parse_pre_1_6_1_interval("60m") == datetime.timedelta(hours=1)
406
 
    True
407
 
    >>> parse_pre_1_6_1_interval("24h") == datetime.timedelta(days=1)
408
 
    True
409
 
    >>> parse_pre_1_6_1_interval("1w") == datetime.timedelta(days=7)
410
 
    True
411
 
    >>> parse_pre_1_6_1_interval("5m 30s") == datetime.timedelta(0, 330)
412
 
    True
413
 
    >>> parse_pre_1_6_1_interval("") == datetime.timedelta(0)
414
 
    True
 
370
    >>> parse_pre_1_6_1_interval('7d')
 
371
    datetime.timedelta(7)
 
372
    >>> parse_pre_1_6_1_interval('60s')
 
373
    datetime.timedelta(0, 60)
 
374
    >>> parse_pre_1_6_1_interval('60m')
 
375
    datetime.timedelta(0, 3600)
 
376
    >>> parse_pre_1_6_1_interval('24h')
 
377
    datetime.timedelta(1)
 
378
    >>> parse_pre_1_6_1_interval('1w')
 
379
    datetime.timedelta(7)
 
380
    >>> parse_pre_1_6_1_interval('5m 30s')
 
381
    datetime.timedelta(0, 330)
 
382
    >>> parse_pre_1_6_1_interval('')
 
383
    datetime.timedelta(0)
415
384
    >>> # Ignore unknown characters, allow any order and repetitions
416
 
    >>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") \
417
 
    ... == datetime.timedelta(2, 480, 18000)
418
 
    True
 
385
    >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
 
386
    datetime.timedelta(2, 480, 18000)
419
387
 
420
388
    """
421
389
 
484
452
        parser.error("--remove can only be combined with --deny")
485
453
 
486
454
 
487
 
class dbus:
 
455
class dbus(object):
488
456
 
489
 
    class SystemBus:
 
457
    class SystemBus(object):
490
458
 
491
459
        object_manager_iface = "org.freedesktop.DBus.ObjectManager"
492
 
 
493
460
        def get_managed_objects(self, busname, objectpath):
494
461
            return self.call_method("GetManagedObjects", busname,
495
462
                                    objectpath,
496
463
                                    self.object_manager_iface)
497
464
 
498
465
        properties_iface = "org.freedesktop.DBus.Properties"
499
 
 
500
466
        def set_property(self, busname, objectpath, interface, key,
501
467
                         value):
502
468
            self.call_method("Set", busname, objectpath,
503
469
                             self.properties_iface, interface, key,
504
470
                             value)
505
471
 
506
 
        def call_method(self, methodname, busname, objectpath,
507
 
                        interface, *args):
508
 
            raise NotImplementedError()
509
472
 
510
473
    class MandosBus(SystemBus):
511
474
        busname_domain = "se.recompile"
543
506
        pass
544
507
 
545
508
 
546
 
class dbus_python_adapter:
 
509
class dbus_python_adapter(object):
547
510
 
548
511
    class SystemBus(dbus.MandosBus):
549
512
        """Use dbus-python"""
594
557
                        for key, subval in value.items()}
595
558
            return value
596
559
 
597
 
        def set_client_property(self, objectpath, key, value):
598
 
            if key == "Secret":
599
 
                if not isinstance(value, bytes):
600
 
                    value = value.encode("utf-8")
601
 
                value = self.dbus_python.ByteArray(value)
602
 
            return self.set_property(self.busname, objectpath,
603
 
                                     self.client_interface, key,
604
 
                                     value)
605
560
 
606
 
    class SilenceLogger:
 
561
    class SilenceLogger(object):
607
562
        "Simple context manager to silence a particular logger"
608
 
 
609
563
        def __init__(self, loggername):
610
564
            self.logger = logging.getLogger(loggername)
611
565
 
621
575
        def __exit__(self, exc_type, exc_val, exc_tb):
622
576
            self.logger.removeFilter(self.nullfilter)
623
577
 
 
578
 
624
579
    class CachingBus(SystemBus):
625
580
        """A caching layer for dbus_python_adapter.SystemBus"""
626
 
 
627
581
        def __init__(self, *args, **kwargs):
628
582
            self.object_cache = {}
629
583
            super(dbus_python_adapter.CachingBus,
630
584
                  self).__init__(*args, **kwargs)
631
 
 
632
585
        def get_object(self, busname, objectpath):
633
586
            try:
634
587
                return self.object_cache[(busname, objectpath)]
636
589
                new_object = super(
637
590
                    dbus_python_adapter.CachingBus,
638
591
                    self).get_object(busname, objectpath)
639
 
                self.object_cache[(busname, objectpath)] = new_object
640
 
                return new_object
641
 
 
642
 
 
643
 
class pydbus_adapter:
644
 
    class SystemBus(dbus.MandosBus):
645
 
        def __init__(self, module=pydbus):
646
 
            self.pydbus = module
647
 
            self.bus = self.pydbus.SystemBus()
648
 
 
649
 
        @contextlib.contextmanager
650
 
        def convert_exception(self, exception_class=dbus.Error):
651
 
            try:
652
 
                yield
653
 
            except gi.repository.GLib.Error as e:
654
 
                # This does what "raise from" would do
655
 
                exc = exception_class(*e.args)
656
 
                exc.__cause__ = e
657
 
                raise exc
658
 
 
659
 
        def call_method(self, methodname, busname, objectpath,
660
 
                        interface, *args):
661
 
            proxy_object = self.get(busname, objectpath)
662
 
            log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
663
 
                      interface, methodname,
664
 
                      ", ".join(repr(a) for a in args))
665
 
            method = getattr(proxy_object[interface], methodname)
666
 
            with self.convert_exception():
667
 
                return method(*args)
668
 
 
669
 
        def get(self, busname, objectpath):
670
 
            log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
671
 
                      busname, objectpath)
672
 
            with self.convert_exception(dbus.ConnectFailed):
673
 
                if sys.version_info.major <= 2:
674
 
                    with warnings.catch_warnings():
675
 
                        warnings.filterwarnings(
676
 
                            "ignore", "", DeprecationWarning,
677
 
                            r"^xml\.etree\.ElementTree$")
678
 
                        return self.bus.get(busname, objectpath)
679
 
                else:
680
 
                    return self.bus.get(busname, objectpath)
681
 
 
682
 
        def set_property(self, busname, objectpath, interface, key,
683
 
                         value):
684
 
            proxy_object = self.get(busname, objectpath)
685
 
            log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
686
 
                      objectpath, self.properties_iface, interface,
687
 
                      key, value)
688
 
            setattr(proxy_object[interface], key, value)
689
 
 
690
 
    class CachingBus(SystemBus):
691
 
        """A caching layer for pydbus_adapter.SystemBus"""
692
 
 
693
 
        def __init__(self, *args, **kwargs):
694
 
            self.object_cache = {}
695
 
            super(pydbus_adapter.CachingBus,
696
 
                  self).__init__(*args, **kwargs)
697
 
 
698
 
        def get(self, busname, objectpath):
699
 
            try:
700
 
                return self.object_cache[(busname, objectpath)]
701
 
            except KeyError:
702
 
                new_object = (super(pydbus_adapter.CachingBus, self)
703
 
                              .get(busname, objectpath))
704
 
                self.object_cache[(busname, objectpath)] = new_object
705
 
                return new_object
706
 
 
707
 
 
708
 
class dbussy_adapter:
709
 
    class SystemBus(dbus.SystemBus):
710
 
        """Use DBussy"""
711
 
 
712
 
        def __init__(self, dbussy, ravel):
713
 
            self.dbussy = dbussy
714
 
            self.ravel = ravel
715
 
            self.bus = ravel.system_bus()
716
 
 
717
 
        @contextlib.contextmanager
718
 
        def convert_exception(self, exception_class=dbus.Error):
719
 
            try:
720
 
                yield
721
 
            except self.dbussy.DBusError as e:
722
 
                # This does what "raise from" would do
723
 
                exc = exception_class(*e.args)
724
 
                exc.__cause__ = e
725
 
                raise exc
726
 
 
727
 
        def call_method(self, methodname, busname, objectpath,
728
 
                        interface, *args):
729
 
            proxy_object = self.get_object(busname, objectpath)
730
 
            log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
731
 
                      interface, methodname,
732
 
                      ", ".join(repr(a) for a in args))
733
 
            iface = proxy_object.get_interface(interface)
734
 
            method = getattr(iface, methodname)
735
 
            with self.convert_exception(dbus.Error):
736
 
                value = method(*args)
737
 
            # DBussy returns values either as an empty list or as a
738
 
            # list of one element with the return value
739
 
            if value:
740
 
                return self.type_filter(value[0])
741
 
 
742
 
        def get_object(self, busname, objectpath):
743
 
            log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
744
 
                      busname, objectpath)
745
 
            with self.convert_exception(dbus.ConnectFailed):
746
 
                return self.bus[busname][objectpath]
747
 
 
748
 
        def type_filter(self, value):
749
 
            """Convert the most bothersome types to Python types"""
750
 
            # A D-Bus Variant value is represented as the Python type
751
 
            # Tuple[dbussy.DBUS.Signature, Any]
752
 
            if isinstance(value, tuple):
753
 
                if (len(value) == 2
754
 
                    and isinstance(value[0],
755
 
                                   self.dbussy.DBUS.Signature)):
756
 
                    return self.type_filter(value[1])
757
 
            elif isinstance(value, self.dbussy.DBUS.ObjectPath):
758
 
                return str(value)
759
 
            # Also recurse into dictionaries
760
 
            elif isinstance(value, dict):
761
 
                return {self.type_filter(key):
762
 
                        self.type_filter(subval)
763
 
                        for key, subval in value.items()}
764
 
            return value
765
 
 
766
 
        def set_property(self, busname, objectpath, interface, key,
767
 
                         value):
768
 
            proxy_object = self.get_object(busname, objectpath)
769
 
            log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
770
 
                      objectpath, self.properties_iface, interface,
771
 
                      key, value)
772
 
            if key == "Secret":
773
 
                # DBussy wants a Byte Array to be a sequence of
774
 
                # values, not a byte string
775
 
                value = tuple(value)
776
 
            setattr(proxy_object.get_interface(interface), key, value)
777
 
 
778
 
    class MandosBus(SystemBus, dbus.MandosBus):
779
 
        pass
780
 
 
781
 
    class CachingBus(MandosBus):
782
 
        """A caching layer for dbussy_adapter.MandosBus"""
783
 
 
784
 
        def __init__(self, *args, **kwargs):
785
 
            self.object_cache = {}
786
 
            super(dbussy_adapter.CachingBus, self).__init__(*args,
787
 
                                                            **kwargs)
788
 
 
789
 
        def get_object(self, busname, objectpath):
790
 
            try:
791
 
                return self.object_cache[(busname, objectpath)]
792
 
            except KeyError:
793
 
                new_object = super(
794
 
                    dbussy_adapter.CachingBus,
795
 
                    self).get_object(busname, objectpath)
796
 
                self.object_cache[(busname, objectpath)] = new_object
 
592
                self.object_cache[(busname, objectpath)]  = new_object
797
593
                return new_object
798
594
 
799
595
 
830
626
    return commands
831
627
 
832
628
 
833
 
class command:
 
629
class command(object):
834
630
    """A namespace for command classes"""
835
631
 
836
 
    class Base:
 
632
    class Base(object):
837
633
        """Abstract base class for commands"""
838
 
 
839
634
        def run(self, clients, bus=None):
840
635
            """Normal commands should implement run_on_one_client(),
841
636
but commands which want to operate on all clients at the same time can
845
640
            for client, properties in clients.items():
846
641
                self.run_on_one_client(client, properties)
847
642
 
 
643
 
848
644
    class IsEnabled(Base):
849
645
        def run(self, clients, bus=None):
850
646
            properties = next(iter(clients.values()))
852
648
                sys.exit(0)
853
649
            sys.exit(1)
854
650
 
 
651
 
855
652
    class Approve(Base):
856
653
        def run_on_one_client(self, client, properties):
857
654
            self.bus.call_client_method(client, "Approve", True)
858
655
 
 
656
 
859
657
    class Deny(Base):
860
658
        def run_on_one_client(self, client, properties):
861
659
            self.bus.call_client_method(client, "Approve", False)
862
660
 
 
661
 
863
662
    class Remove(Base):
864
663
        def run(self, clients, bus):
865
664
            for clientpath in frozenset(clients.keys()):
866
665
                bus.call_server_method("RemoveClient", clientpath)
867
666
 
 
667
 
868
668
    class Output(Base):
869
669
        """Abstract class for commands outputting client details"""
870
670
        all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
876
676
                        "Checker", "ExtendedTimeout", "Expires",
877
677
                        "LastCheckerStatus")
878
678
 
 
679
 
879
680
    class DumpJSON(Output):
880
681
        def run(self, clients, bus=None):
881
682
            data = {properties["Name"]:
882
683
                    {key: properties[key]
883
684
                     for key in self.all_keywords}
884
685
                    for properties in clients.values()}
885
 
            print(json.dumps(data, indent=4, separators=(",", ": ")))
 
686
            print(json.dumps(data, indent=4, separators=(',', ': ')))
 
687
 
886
688
 
887
689
    class PrintTable(Output):
888
690
        def __init__(self, verbose=False):
896
698
                keywords = self.all_keywords
897
699
            print(self.TableOfClients(clients.values(), keywords))
898
700
 
899
 
        class TableOfClients:
 
701
        class TableOfClients(object):
900
702
            tableheaders = {
901
703
                "Name": "Name",
902
704
                "Enabled": "Enabled",
929
731
 
930
732
            if sys.version_info.major == 2:
931
733
                __unicode__ = __str__
932
 
 
933
734
                def __str__(self):
934
735
                    return str(self).encode(
935
736
                        locale.getpreferredencoding())
981
782
                                minutes=(td.seconds % 3600) // 60,
982
783
                                seconds=td.seconds % 60))
983
784
 
 
785
 
984
786
    class PropertySetter(Base):
985
787
        "Abstract class for Actions for setting one client property"
986
788
 
993
795
        def propname(self):
994
796
            raise NotImplementedError()
995
797
 
 
798
 
996
799
    class Enable(PropertySetter):
997
800
        propname = "Enabled"
998
801
        value_to_set = True
999
802
 
 
803
 
1000
804
    class Disable(PropertySetter):
1001
805
        propname = "Enabled"
1002
806
        value_to_set = False
1003
807
 
 
808
 
1004
809
    class BumpTimeout(PropertySetter):
1005
810
        propname = "LastCheckedOK"
1006
811
        value_to_set = ""
1007
812
 
 
813
 
1008
814
    class StartChecker(PropertySetter):
1009
815
        propname = "CheckerRunning"
1010
816
        value_to_set = True
1011
817
 
 
818
 
1012
819
    class StopChecker(PropertySetter):
1013
820
        propname = "CheckerRunning"
1014
821
        value_to_set = False
1015
822
 
 
823
 
1016
824
    class ApproveByDefault(PropertySetter):
1017
825
        propname = "ApprovedByDefault"
1018
826
        value_to_set = True
1019
827
 
 
828
 
1020
829
    class DenyByDefault(PropertySetter):
1021
830
        propname = "ApprovedByDefault"
1022
831
        value_to_set = False
1023
832
 
 
833
 
1024
834
    class PropertySetterValue(PropertySetter):
1025
835
        """Abstract class for PropertySetter recieving a value as
1026
836
constructor argument instead of a class attribute."""
1027
 
 
1028
837
        def __init__(self, value):
1029
838
            self.value_to_set = value
1030
839
 
1037
846
    class SetChecker(PropertySetterValue):
1038
847
        propname = "Checker"
1039
848
 
 
849
 
1040
850
    class SetHost(PropertySetterValue):
1041
851
        propname = "Host"
1042
852
 
 
853
 
1043
854
    class SetSecret(PropertySetterValue):
1044
855
        propname = "Secret"
1045
856
 
1053
864
            self._vts = value.read()
1054
865
            value.close()
1055
866
 
 
867
 
1056
868
    class PropertySetterValueMilliseconds(PropertySetterValue):
1057
869
        """Abstract class for PropertySetterValue taking a value
1058
870
argument as a datetime.timedelta() but should store it as
1067
879
            "When setting, convert value from a datetime.timedelta"
1068
880
            self._vts = int(round(value.total_seconds() * 1000))
1069
881
 
 
882
 
1070
883
    class SetTimeout(PropertySetterValueMilliseconds):
1071
884
        propname = "Timeout"
1072
885
 
 
886
 
1073
887
    class SetExtendedTimeout(PropertySetterValueMilliseconds):
1074
888
        propname = "ExtendedTimeout"
1075
889
 
 
890
 
1076
891
    class SetInterval(PropertySetterValueMilliseconds):
1077
892
        propname = "Interval"
1078
893
 
 
894
 
1079
895
    class SetApprovalDelay(PropertySetterValueMilliseconds):
1080
896
        propname = "ApprovalDelay"
1081
897
 
 
898
 
1082
899
    class SetApprovalDuration(PropertySetterValueMilliseconds):
1083
900
        propname = "ApprovalDuration"
1084
901
 
1118
935
                                                     "output"))
1119
936
 
1120
937
 
1121
 
class Unique:
 
938
class Unique(object):
1122
939
    """Class for objects which exist only to be unique objects, since
1123
940
unittest.mock.sentinel only exists in Python 3.3"""
1124
941
 
1408
1225
class Test_dbus_python_adapter_SystemBus(TestCaseWithAssertLogs):
1409
1226
 
1410
1227
    def MockDBusPython_func(self, func):
1411
 
        class mock_dbus_python:
 
1228
        class mock_dbus_python(object):
1412
1229
            """mock dbus-python module"""
1413
 
            class exceptions:
 
1230
            class exceptions(object):
1414
1231
                """Pseudo-namespace"""
1415
1232
                class DBusException(Exception):
1416
1233
                    pass
1417
 
            class SystemBus:
 
1234
            class SystemBus(object):
1418
1235
                @staticmethod
1419
1236
                def get_object(busname, objectpath):
1420
1237
                    DBusObject = collections.namedtuple(
1421
 
                        "DBusObject", ("methodname", "Set"))
 
1238
                        "DBusObject", ("methodname",))
1422
1239
                    def method(*args, **kwargs):
1423
1240
                        self.assertEqual({"dbus_interface":
1424
1241
                                          "interface"},
1425
1242
                                         kwargs)
1426
1243
                        return func(*args)
1427
 
                    def set_property(interface, key, value,
1428
 
                                     dbus_interface=None):
1429
 
                        self.assertEqual(
1430
 
                            "org.freedesktop.DBus.Properties",
1431
 
                            dbus_interface)
1432
 
                        self.assertEqual("Secret", key)
1433
 
                        return func(interface, key, value,
1434
 
                                    dbus_interface=dbus_interface)
1435
 
                    return DBusObject(methodname=method,
1436
 
                                      Set=set_property)
1437
 
            class Boolean:
 
1244
                    return DBusObject(methodname=method)
 
1245
            class Boolean(object):
1438
1246
                def __init__(self, value):
1439
1247
                    self.value = bool(value)
1440
1248
                def __bool__(self):
1445
1253
                pass
1446
1254
            class Dictionary(dict):
1447
1255
                pass
1448
 
            class ByteArray(bytes):
1449
 
                pass
1450
1256
        return mock_dbus_python
1451
1257
 
1452
1258
    def call_method(self, bus, methodname, busname, objectpath,
1624
1430
        finally:
1625
1431
            dbus_logger.removeFilter(counting_handler)
1626
1432
 
1627
 
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
 
1433
        self.assertNotIsInstance(e, dbus.ConnectFailed)
1628
1434
 
1629
1435
        # Make sure the dbus logger was suppressed
1630
1436
        self.assertEqual(0, counting_handler.count)
1631
1437
 
1632
 
    def test_Set_Secret_sends_bytearray(self):
1633
 
        ret = [None]
1634
 
        def func(*args, **kwargs):
1635
 
            ret[0] = (args, kwargs)
1636
 
        mock_dbus_python = self.MockDBusPython_func(func)
1637
 
        bus = dbus_python_adapter.SystemBus(mock_dbus_python)
1638
 
        bus.set_client_property("objectpath", "Secret", "value")
1639
 
        expected_call = (("se.recompile.Mandos.Client", "Secret",
1640
 
                          mock_dbus_python.ByteArray(b"value")),
1641
 
                         {"dbus_interface":
1642
 
                          "org.freedesktop.DBus.Properties"})
1643
 
        self.assertEqual(expected_call, ret[0])
1644
 
        if sys.version_info.major == 2:
1645
 
            self.assertIsInstance(ret[0][0][-1],
1646
 
                                  mock_dbus_python.ByteArray)
1647
 
 
1648
1438
    def test_get_object_converts_to_correct_exception(self):
1649
1439
        bus = dbus_python_adapter.SystemBus(
1650
1440
            self.fake_dbus_python_raises_exception_on_connect)
1652
1442
            self.call_method(bus, "methodname", "busname",
1653
1443
                             "objectpath", "interface")
1654
1444
 
1655
 
    class fake_dbus_python_raises_exception_on_connect:
 
1445
    class fake_dbus_python_raises_exception_on_connect(object):
1656
1446
        """fake dbus-python module"""
1657
 
        class exceptions:
 
1447
        class exceptions(object):
1658
1448
            """Pseudo-namespace"""
1659
1449
            class DBusException(Exception):
1660
1450
                pass
1668
1458
 
1669
1459
 
1670
1460
class Test_dbus_python_adapter_CachingBus(unittest.TestCase):
1671
 
    class mock_dbus_python:
 
1461
    class mock_dbus_python(object):
1672
1462
        """mock dbus-python modules"""
1673
 
        class SystemBus:
 
1463
        class SystemBus(object):
1674
1464
            @staticmethod
1675
1465
            def get_object(busname, objectpath):
1676
1466
                return Unique()
1719
1509
        self.assertIs(obj1, obj1b)
1720
1510
 
1721
1511
 
1722
 
class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs):
1723
 
 
1724
 
    def Stub_pydbus_func(self, func):
1725
 
        class stub_pydbus:
1726
 
            """stub pydbus module"""
1727
 
            class SystemBus:
1728
 
                @staticmethod
1729
 
                def get(busname, objectpath):
1730
 
                    DBusObject = collections.namedtuple(
1731
 
                        "DBusObject", ("methodname",))
1732
 
                    return {"interface":
1733
 
                            DBusObject(methodname=func)}
1734
 
        return stub_pydbus
1735
 
 
1736
 
    def call_method(self, bus, methodname, busname, objectpath,
1737
 
                    interface, *args):
1738
 
        with self.assertLogs(log, logging.DEBUG):
1739
 
            return bus.call_method(methodname, busname, objectpath,
1740
 
                                   interface, *args)
1741
 
 
1742
 
    def test_call_method_returns(self):
1743
 
        expected_method_return = Unique()
1744
 
        method_args = (Unique(), Unique())
1745
 
        def func(*args):
1746
 
            self.assertEqual(len(method_args), len(args))
1747
 
            for marg, arg in zip(method_args, args):
1748
 
                self.assertIs(marg, arg)
1749
 
            return expected_method_return
1750
 
        stub_pydbus = self.Stub_pydbus_func(func)
1751
 
        bus = pydbus_adapter.SystemBus(stub_pydbus)
1752
 
        ret = self.call_method(bus, "methodname", "busname",
1753
 
                               "objectpath", "interface",
1754
 
                               *method_args)
1755
 
        self.assertIs(ret, expected_method_return)
1756
 
 
1757
 
    def test_call_method_handles_exception(self):
1758
 
        def func():
1759
 
            raise gi.repository.GLib.Error()
1760
 
 
1761
 
        stub_pydbus = self.Stub_pydbus_func(func)
1762
 
        bus = pydbus_adapter.SystemBus(stub_pydbus)
1763
 
 
1764
 
        with self.assertRaises(dbus.Error) as e:
1765
 
            self.call_method(bus, "methodname", "busname",
1766
 
                             "objectpath", "interface")
1767
 
 
1768
 
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
1769
 
 
1770
 
    def test_get_converts_to_correct_exception(self):
1771
 
        bus = pydbus_adapter.SystemBus(
1772
 
            self.fake_pydbus_raises_exception_on_connect)
1773
 
        with self.assertRaises(dbus.ConnectFailed):
1774
 
            self.call_method(bus, "methodname", "busname",
1775
 
                             "objectpath", "interface")
1776
 
 
1777
 
    class fake_pydbus_raises_exception_on_connect:
1778
 
        """fake dbus-python module"""
1779
 
        @classmethod
1780
 
        def SystemBus(cls):
1781
 
            def get(busname, objectpath):
1782
 
                raise gi.repository.GLib.Error()
1783
 
            Bus = collections.namedtuple("Bus", ["get"])
1784
 
            return Bus(get=get)
1785
 
 
1786
 
    def test_set_property_uses_setattr(self):
1787
 
        class Object:
1788
 
            pass
1789
 
        obj = Object()
1790
 
        class pydbus_spy:
1791
 
            class SystemBus:
1792
 
                @staticmethod
1793
 
                def get(busname, objectpath):
1794
 
                    return {"interface": obj}
1795
 
        bus = pydbus_adapter.SystemBus(pydbus_spy)
1796
 
        value = Unique()
1797
 
        bus.set_property("busname", "objectpath", "interface", "key",
1798
 
                         value)
1799
 
        self.assertIs(value, obj.key)
1800
 
 
1801
 
    def test_get_suppresses_xml_deprecation_warning(self):
1802
 
        if sys.version_info.major >= 3:
1803
 
            return
1804
 
        class stub_pydbus_get:
1805
 
            class SystemBus:
1806
 
                @staticmethod
1807
 
                def get(busname, objectpath):
1808
 
                    warnings.warn_explicit(
1809
 
                        "deprecated", DeprecationWarning,
1810
 
                        "xml.etree.ElementTree", 0)
1811
 
        bus = pydbus_adapter.SystemBus(stub_pydbus_get)
1812
 
        with warnings.catch_warnings(record=True) as w:
1813
 
            warnings.simplefilter("always")
1814
 
            bus.get("busname", "objectpath")
1815
 
            self.assertEqual(0, len(w))
1816
 
 
1817
 
 
1818
 
class Test_pydbus_adapter_CachingBus(unittest.TestCase):
1819
 
    class stub_pydbus:
1820
 
        """stub pydbus module"""
1821
 
        class SystemBus:
1822
 
            @staticmethod
1823
 
            def get(busname, objectpath):
1824
 
                return Unique()
1825
 
 
1826
 
    def setUp(self):
1827
 
        self.bus = pydbus_adapter.CachingBus(self.stub_pydbus)
1828
 
 
1829
 
    def test_returns_distinct_objectpaths(self):
1830
 
        obj1 = self.bus.get("busname", "objectpath1")
1831
 
        self.assertIsInstance(obj1, Unique)
1832
 
        obj2 = self.bus.get("busname", "objectpath2")
1833
 
        self.assertIsInstance(obj2, Unique)
1834
 
        self.assertIsNot(obj1, obj2)
1835
 
 
1836
 
    def test_returns_distinct_busnames(self):
1837
 
        obj1 = self.bus.get("busname1", "objectpath")
1838
 
        self.assertIsInstance(obj1, Unique)
1839
 
        obj2 = self.bus.get("busname2", "objectpath")
1840
 
        self.assertIsInstance(obj2, Unique)
1841
 
        self.assertIsNot(obj1, obj2)
1842
 
 
1843
 
    def test_returns_distinct_both(self):
1844
 
        obj1 = self.bus.get("busname1", "objectpath")
1845
 
        self.assertIsInstance(obj1, Unique)
1846
 
        obj2 = self.bus.get("busname2", "objectpath")
1847
 
        self.assertIsInstance(obj2, Unique)
1848
 
        self.assertIsNot(obj1, obj2)
1849
 
 
1850
 
    def test_returns_same(self):
1851
 
        obj1 = self.bus.get("busname", "objectpath")
1852
 
        self.assertIsInstance(obj1, Unique)
1853
 
        obj2 = self.bus.get("busname", "objectpath")
1854
 
        self.assertIsInstance(obj2, Unique)
1855
 
        self.assertIs(obj1, obj2)
1856
 
 
1857
 
    def test_returns_same_old(self):
1858
 
        obj1 = self.bus.get("busname1", "objectpath1")
1859
 
        self.assertIsInstance(obj1, Unique)
1860
 
        obj2 = self.bus.get("busname2", "objectpath2")
1861
 
        self.assertIsInstance(obj2, Unique)
1862
 
        obj1b = self.bus.get("busname1", "objectpath1")
1863
 
        self.assertIsInstance(obj1b, Unique)
1864
 
        self.assertIsNot(obj1, obj2)
1865
 
        self.assertIsNot(obj2, obj1b)
1866
 
        self.assertIs(obj1, obj1b)
1867
 
 
1868
 
 
1869
 
class Test_dbussy_adapter_SystemBus(TestCaseWithAssertLogs):
1870
 
 
1871
 
    class dummy_dbussy:
1872
 
        class DBUS:
1873
 
            class ObjectPath(str):
1874
 
                pass
1875
 
        class DBusError(Exception):
1876
 
            pass
1877
 
 
1878
 
    def fake_ravel_func(self, func):
1879
 
        class fake_ravel:
1880
 
            @staticmethod
1881
 
            def system_bus():
1882
 
                class DBusInterfaceProxy:
1883
 
                    @staticmethod
1884
 
                    def methodname(*args):
1885
 
                        return [func(*args)]
1886
 
                class DBusObject:
1887
 
                    @staticmethod
1888
 
                    def get_interface(interface):
1889
 
                        if interface == "interface":
1890
 
                            return DBusInterfaceProxy()
1891
 
                return {"busname": {"objectpath": DBusObject()}}
1892
 
        return fake_ravel
1893
 
 
1894
 
    def call_method(self, bus, methodname, busname, objectpath,
1895
 
                    interface, *args):
1896
 
        with self.assertLogs(log, logging.DEBUG):
1897
 
            return bus.call_method(methodname, busname, objectpath,
1898
 
                                   interface, *args)
1899
 
 
1900
 
    def test_call_method_returns(self):
1901
 
        expected_method_return = Unique()
1902
 
        method_args = (Unique(), Unique())
1903
 
        def func(*args):
1904
 
            self.assertEqual(len(method_args), len(args))
1905
 
            for marg, arg in zip(method_args, args):
1906
 
                self.assertIs(marg, arg)
1907
 
            return expected_method_return
1908
 
        fake_ravel = self.fake_ravel_func(func)
1909
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1910
 
        ret = self.call_method(bus, "methodname", "busname",
1911
 
                               "objectpath", "interface",
1912
 
                               *method_args)
1913
 
        self.assertIs(ret, expected_method_return)
1914
 
 
1915
 
    def test_call_method_filters_objectpath(self):
1916
 
        def func():
1917
 
            return method_return
1918
 
        fake_ravel = self.fake_ravel_func(func)
1919
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1920
 
        method_return = (self.dummy_dbussy.DBUS
1921
 
                         .ObjectPath("objectpath"))
1922
 
        ret = self.call_method(bus, "methodname", "busname",
1923
 
                               "objectpath", "interface")
1924
 
        self.assertEqual("objectpath", ret)
1925
 
        self.assertNotIsInstance(ret,
1926
 
                                 self.dummy_dbussy.DBUS.ObjectPath)
1927
 
 
1928
 
    def test_call_method_filters_objectpaths_in_dict(self):
1929
 
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1930
 
        def func():
1931
 
            return method_return
1932
 
        fake_ravel = self.fake_ravel_func(func)
1933
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1934
 
        method_return = {
1935
 
            ObjectPath("objectpath_key_1"):
1936
 
            ObjectPath("objectpath_value_1"),
1937
 
            ObjectPath("objectpath_key_2"):
1938
 
            ObjectPath("objectpath_value_2"),
1939
 
        }
1940
 
        ret = self.call_method(bus, "methodname", "busname",
1941
 
                               "objectpath", "interface")
1942
 
        expected_method_return = {str(key): str(value)
1943
 
                                  for key, value in
1944
 
                                  method_return.items()}
1945
 
        for key, value in ret.items():
1946
 
            self.assertNotIsInstance(key, ObjectPath)
1947
 
            self.assertNotIsInstance(value, ObjectPath)
1948
 
        self.assertEqual(expected_method_return, ret)
1949
 
        self.assertIsInstance(ret, dict)
1950
 
 
1951
 
    def test_call_method_filters_objectpaths_in_dict_in_dict(self):
1952
 
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1953
 
        def func():
1954
 
            return method_return
1955
 
        fake_ravel = self.fake_ravel_func(func)
1956
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1957
 
        method_return = {
1958
 
            ObjectPath("key1"): {
1959
 
                ObjectPath("key11"): ObjectPath("value11"),
1960
 
                ObjectPath("key12"): ObjectPath("value12"),
1961
 
            },
1962
 
            ObjectPath("key2"): {
1963
 
                ObjectPath("key21"): ObjectPath("value21"),
1964
 
                ObjectPath("key22"): ObjectPath("value22"),
1965
 
            },
1966
 
        }
1967
 
        ret = self.call_method(bus, "methodname", "busname",
1968
 
                               "objectpath", "interface")
1969
 
        expected_method_return = {
1970
 
            "key1": {"key11": "value11",
1971
 
                     "key12": "value12"},
1972
 
            "key2": {"key21": "value21",
1973
 
                     "key22": "value22"},
1974
 
        }
1975
 
        self.assertEqual(expected_method_return, ret)
1976
 
        for key, value in ret.items():
1977
 
            self.assertIsInstance(value, dict)
1978
 
            self.assertEqual(expected_method_return[key], value)
1979
 
            self.assertNotIsInstance(key, ObjectPath)
1980
 
            for inner_key, inner_value in value.items():
1981
 
                self.assertIsInstance(value, dict)
1982
 
                self.assertEqual(
1983
 
                    expected_method_return[key][inner_key],
1984
 
                    inner_value)
1985
 
                self.assertNotIsInstance(key, ObjectPath)
1986
 
 
1987
 
    def test_call_method_filters_objectpaths_in_dict_three_deep(self):
1988
 
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1989
 
        def func():
1990
 
            return method_return
1991
 
        fake_ravel = self.fake_ravel_func(func)
1992
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1993
 
        method_return = {
1994
 
            ObjectPath("key1"): {
1995
 
                ObjectPath("key2"): {
1996
 
                    ObjectPath("key3"): ObjectPath("value"),
1997
 
                },
1998
 
            },
1999
 
        }
2000
 
        ret = self.call_method(bus, "methodname", "busname",
2001
 
                               "objectpath", "interface")
2002
 
        expected_method_return = {"key1": {"key2": {"key3": "value"}}}
2003
 
        self.assertEqual(expected_method_return, ret)
2004
 
        self.assertIsInstance(ret, dict)
2005
 
        self.assertNotIsInstance(next(iter(ret.keys())), ObjectPath)
2006
 
        self.assertIsInstance(ret["key1"], dict)
2007
 
        self.assertNotIsInstance(next(iter(ret["key1"].keys())),
2008
 
                                 ObjectPath)
2009
 
        self.assertIsInstance(ret["key1"]["key2"], dict)
2010
 
        self.assertNotIsInstance(
2011
 
            next(iter(ret["key1"]["key2"].keys())),
2012
 
            ObjectPath)
2013
 
        self.assertEqual("value", ret["key1"]["key2"]["key3"])
2014
 
        self.assertNotIsInstance(ret["key1"]["key2"]["key3"],
2015
 
                                 self.dummy_dbussy.DBUS.ObjectPath)
2016
 
 
2017
 
    def test_call_method_handles_exception(self):
2018
 
        def func():
2019
 
            raise self.dummy_dbussy.DBusError()
2020
 
 
2021
 
        fake_ravel = self.fake_ravel_func(func)
2022
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
2023
 
 
2024
 
        with self.assertRaises(dbus.Error) as e:
2025
 
            self.call_method(bus, "methodname", "busname",
2026
 
                             "objectpath", "interface")
2027
 
 
2028
 
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
2029
 
 
2030
 
    def test_get_object_converts_to_correct_exception(self):
2031
 
        class fake_ravel_raises_exception_on_connect:
2032
 
            @staticmethod
2033
 
            def system_bus():
2034
 
                class Bus:
2035
 
                    @staticmethod
2036
 
                    def __getitem__(key):
2037
 
                        if key == "objectpath":
2038
 
                            raise self.dummy_dbussy.DBusError()
2039
 
                        raise Exception(key)
2040
 
                return {"busname": Bus()}
2041
 
        def func():
2042
 
            raise self.dummy_dbussy.DBusError()
2043
 
        bus = dbussy_adapter.SystemBus(
2044
 
            self.dummy_dbussy,
2045
 
            fake_ravel_raises_exception_on_connect)
2046
 
        with self.assertRaises(dbus.ConnectFailed):
2047
 
            self.call_method(bus, "methodname", "busname",
2048
 
                             "objectpath", "interface")
2049
 
 
2050
 
 
2051
1512
class Test_commands_from_options(unittest.TestCase):
2052
1513
 
2053
1514
    def setUp(self):
2058
1519
        self.assert_command_from_args(["--is-enabled", "client"],
2059
1520
                                      command.IsEnabled)
2060
1521
 
2061
 
    def assert_command_from_args(self, args, command_cls, length=1,
2062
 
                                 clients=None, **cmd_attrs):
 
1522
    def assert_command_from_args(self, args, command_cls,
 
1523
                                 **cmd_attrs):
2063
1524
        """Assert that parsing ARGS should result in an instance of
2064
1525
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
2065
1526
        options = self.parser.parse_args(args)
2066
1527
        check_option_syntax(self.parser, options)
2067
1528
        commands = commands_from_options(options)
2068
 
        self.assertEqual(length, len(commands))
2069
 
        for command in commands:
2070
 
            if isinstance(command, command_cls):
2071
 
                break
2072
 
        else:
2073
 
            self.assertIsInstance(command, command_cls)
2074
 
        if clients is not None:
2075
 
            self.assertEqual(clients, options.client)
 
1529
        self.assertEqual(1, len(commands))
 
1530
        command = commands[0]
 
1531
        self.assertIsInstance(command, command_cls)
2076
1532
        for key, value in cmd_attrs.items():
2077
1533
            self.assertEqual(value, getattr(command, key))
2078
1534
 
2079
 
    def assert_commands_from_args(self, args, commands, clients=None):
2080
 
        for cmd in commands:
2081
 
            self.assert_command_from_args(args, cmd,
2082
 
                                          length=len(commands),
2083
 
                                          clients=clients)
2084
 
 
2085
1535
    def test_is_enabled_short(self):
2086
1536
        self.assert_command_from_args(["-V", "client"],
2087
1537
                                      command.IsEnabled)
2278
1728
                                      verbose=True)
2279
1729
 
2280
1730
 
2281
 
    def test_manual_page_example_1(self):
2282
 
        self.assert_command_from_args("",
2283
 
                                      command.PrintTable,
2284
 
                                      clients=[],
2285
 
                                      verbose=False)
2286
 
 
2287
 
    def test_manual_page_example_2(self):
2288
 
        self.assert_command_from_args(
2289
 
            "--verbose foo1.example.org foo2.example.org".split(),
2290
 
            command.PrintTable, clients=["foo1.example.org",
2291
 
                                         "foo2.example.org"],
2292
 
            verbose=True)
2293
 
 
2294
 
    def test_manual_page_example_3(self):
2295
 
        self.assert_command_from_args("--enable --all".split(),
2296
 
                                      command.Enable,
2297
 
                                      clients=[])
2298
 
 
2299
 
    def test_manual_page_example_4(self):
2300
 
        self.assert_commands_from_args(
2301
 
            ("--timeout=PT5M --interval=PT1M foo1.example.org"
2302
 
             " foo2.example.org").split(),
2303
 
            [command.SetTimeout, command.SetInterval],
2304
 
            clients=["foo1.example.org", "foo2.example.org"])
2305
 
 
2306
 
    def test_manual_page_example_5(self):
2307
 
        self.assert_command_from_args("--approve --all".split(),
2308
 
                                      command.Approve,
2309
 
                                      clients=[])
2310
 
 
2311
 
 
2312
1731
class TestCommand(unittest.TestCase):
2313
1732
    """Abstract class for tests of command classes"""
2314
1733
 
2427
1846
        busname = "se.recompile.Mandos"
2428
1847
        client_interface = "se.recompile.Mandos.Client"
2429
1848
        command.Approve().run(self.bus.clients, self.bus)
2430
 
        self.assertTrue(self.bus.clients)
2431
1849
        for clientpath in self.bus.clients:
2432
1850
            self.assertIn(("Approve", busname, clientpath,
2433
1851
                           client_interface, (True,)), self.bus.calls)
2436
1854
        busname = "se.recompile.Mandos"
2437
1855
        client_interface = "se.recompile.Mandos.Client"
2438
1856
        command.Deny().run(self.bus.clients, self.bus)
2439
 
        self.assertTrue(self.bus.clients)
2440
1857
        for clientpath in self.bus.clients:
2441
1858
            self.assertIn(("Approve", busname, clientpath,
2442
1859
                           client_interface, (False,)),
2443
1860
                          self.bus.calls)
2444
1861
 
2445
1862
    def test_Remove(self):
2446
 
        busname = "se.recompile.Mandos"
2447
 
        server_path = "/"
2448
 
        server_interface = "se.recompile.Mandos"
2449
 
        orig_clients = self.bus.clients.copy()
2450
1863
        command.Remove().run(self.bus.clients, self.bus)
2451
 
        self.assertFalse(self.bus.clients)
2452
 
        for clientpath in orig_clients:
2453
 
            self.assertIn(("RemoveClient", busname,
2454
 
                           server_path, server_interface,
 
1864
        for clientpath in self.bus.clients:
 
1865
            self.assertIn(("RemoveClient", dbus_busname,
 
1866
                           dbus_server_path, dbus_server_interface,
2455
1867
                           (clientpath,)), self.bus.calls)
2456
1868
 
2457
1869
    expected_json = {
2659
2071
        else:
2660
2072
            cmd_args = [() for x in range(len(self.values_to_get))]
2661
2073
            values_to_get = self.values_to_get
2662
 
        self.assertTrue(values_to_get)
2663
2074
        for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2664
2075
            for clientpath in self.bus.clients:
2665
2076
                self.bus.clients[clientpath][self.propname] = (
2666
2077
                    Unique())
2667
2078
            self.command(*cmd_arg).run(self.bus.clients, self.bus)
2668
 
            self.assertTrue(self.bus.clients)
2669
2079
            for clientpath in self.bus.clients:
2670
2080
                value = (self.bus.clients[clientpath]
2671
2081
                         [self.propname])
2730
2140
class TestSetSecretCmd(TestPropertySetterCmd):
2731
2141
    command = command.SetSecret
2732
2142
    propname = "Secret"
2733
 
    def __init__(self, *args, **kwargs):
2734
 
        self.values_to_set = [io.BytesIO(b""),
2735
 
                              io.BytesIO(b"secret\0xyzzy\nbar")]
2736
 
        self.values_to_get = [f.getvalue() for f in
2737
 
                              self.values_to_set]
2738
 
        super(TestSetSecretCmd, self).__init__(*args, **kwargs)
 
2143
    values_to_set = [io.BytesIO(b""),
 
2144
                     io.BytesIO(b"secret\0xyzzy\nbar")]
 
2145
    values_to_get = [f.getvalue() for f in values_to_set]
2739
2146
 
2740
2147
 
2741
2148
class TestSetTimeoutCmd(TestPropertySetterCmd):
2794
2201
 
2795
2202
 
2796
2203
 
2797
 
def parse_test_args():
2798
 
    # type: () -> argparse.Namespace
 
2204
def should_only_run_tests():
2799
2205
    parser = argparse.ArgumentParser(add_help=False)
2800
 
    parser.add_argument("--check", action="store_true")
2801
 
    parser.add_argument("--prefix", )
 
2206
    parser.add_argument("--check", action='store_true')
2802
2207
    args, unknown_args = parser.parse_known_args()
2803
 
    if args.check:
2804
 
        # Remove test options from sys.argv
 
2208
    run_tests = args.check
 
2209
    if run_tests:
 
2210
        # Remove --check argument from sys.argv
2805
2211
        sys.argv[1:] = unknown_args
2806
 
    return args
 
2212
    return run_tests
2807
2213
 
2808
2214
# Add all tests from doctest strings
2809
2215
def load_tests(loader, tests, none):
2812
2218
    return tests
2813
2219
 
2814
2220
if __name__ == "__main__":
2815
 
    options = parse_test_args()
2816
2221
    try:
2817
 
        if options.check:
2818
 
            extra_test_prefix = options.prefix
2819
 
            if extra_test_prefix is not None:
2820
 
                if not (unittest.main(argv=[""], exit=False)
2821
 
                        .result.wasSuccessful()):
2822
 
                    sys.exit(1)
2823
 
                class ExtraTestLoader(unittest.TestLoader):
2824
 
                    testMethodPrefix = extra_test_prefix
2825
 
                # Call using ./scriptname --check [--verbose]
2826
 
                unittest.main(argv=[""], testLoader=ExtraTestLoader())
2827
 
            else:
2828
 
                unittest.main(argv=[""])
 
2222
        if should_only_run_tests():
 
2223
            # Call using ./tdd-python-script --check [--verbose]
 
2224
            unittest.main()
2829
2225
        else:
2830
2226
            main()
2831
2227
    finally:
2832
2228
        logging.shutdown()
2833
 
 
2834
 
# Local Variables:
2835
 
# run-tests:
2836
 
# (lambda (&optional extra)
2837
 
#   (if (not (funcall run-tests-in-test-buffer default-directory
2838
 
#             extra))
2839
 
#       (funcall show-test-buffer-in-test-window)
2840
 
#     (funcall remove-test-window)
2841
 
#     (if extra (message "Extra tests run successfully!"))))
2842
 
# run-tests-in-test-buffer:
2843
 
# (lambda (dir &optional extra)
2844
 
#   (with-current-buffer (get-buffer-create "*Test*")
2845
 
#     (setq buffer-read-only nil
2846
 
#           default-directory dir)
2847
 
#     (erase-buffer)
2848
 
#     (compilation-mode))
2849
 
#   (let ((process-result
2850
 
#          (let ((inhibit-read-only t))
2851
 
#            (process-file-shell-command
2852
 
#             (funcall get-command-line extra) nil "*Test*"))))
2853
 
#     (and (numberp process-result)
2854
 
#          (= process-result 0))))
2855
 
# get-command-line:
2856
 
# (lambda (&optional extra)
2857
 
#   (let ((quoted-script
2858
 
#          (shell-quote-argument (funcall get-script-name))))
2859
 
#     (format
2860
 
#      (concat "%s --check" (if extra " --prefix=atest" ""))
2861
 
#      quoted-script)))
2862
 
# get-script-name:
2863
 
# (lambda ()
2864
 
#   (if (fboundp 'file-local-name)
2865
 
#       (file-local-name (buffer-file-name))
2866
 
#     (or (file-remote-p (buffer-file-name) 'localname)
2867
 
#         (buffer-file-name))))
2868
 
# remove-test-window:
2869
 
# (lambda ()
2870
 
#   (let ((test-window (get-buffer-window "*Test*")))
2871
 
#     (if test-window (delete-window test-window))))
2872
 
# show-test-buffer-in-test-window:
2873
 
# (lambda ()
2874
 
#   (when (not (get-buffer-window-list "*Test*"))
2875
 
#     (setq next-error-last-buffer (get-buffer "*Test*"))
2876
 
#     (let* ((side (if (>= (window-width) 146) 'right 'bottom))
2877
 
#            (display-buffer-overriding-action
2878
 
#             `((display-buffer-in-side-window) (side . ,side)
2879
 
#               (window-height . fit-window-to-buffer)
2880
 
#               (window-width . fit-window-to-buffer))))
2881
 
#       (display-buffer "*Test*"))))
2882
 
# eval:
2883
 
# (progn
2884
 
#   (let* ((run-extra-tests (lambda () (interactive)
2885
 
#                             (funcall run-tests t)))
2886
 
#          (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
2887
 
#          (outer-keymap `(keymap (3 . ,inner-keymap))))     ; C-c
2888
 
#     (setq minor-mode-overriding-map-alist
2889
 
#           (cons `(run-tests . ,outer-keymap)
2890
 
#                 minor-mode-overriding-map-alist)))
2891
 
#   (add-hook 'after-save-hook run-tests 90 t))
2892
 
# End: