/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-04-08 21:53:22 UTC
  • mto: This revision was merged to the branch mainline in revision 382.
  • Revision ID: teddy@recompile.se-20190408215322-y3hmfxzdgs9t84l1
plugin-runner: Fix minor memory leak

* plugin-runner.c (free_plugin): Even if argv[0] is NULL, as for the
                                 pseudo-plugin for global options,
                                 free all other arguments by simply
                                 starting at argv[1] and freeing
                                 plugin_node->name (which is always
                                 argv[0]) separately.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/python3 -bbI
2
 
# -*- coding: utf-8; lexical-binding: t -*-
3
 
#
4
 
# Mandos Control - Control or query the Mandos server
5
 
#
6
 
# Copyright © 2008-2022 Teddy Hogeborn
7
 
# Copyright © 2008-2022 Björn Påhlsson
 
1
#!/usr/bin/python
 
2
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
 
3
#
 
4
# Mandos Monitor - Control and monitor the Mandos server
 
5
#
 
6
# Copyright © 2008-2019 Teddy Hogeborn
 
7
# Copyright © 2008-2019 Björn Påhlsson
8
8
#
9
9
# This file is part of Mandos.
10
10
#
23
23
#
24
24
# Contact the authors at <mandos@recompile.se>.
25
25
#
 
26
 
26
27
from __future__ import (division, absolute_import, print_function,
27
28
                        unicode_literals)
28
29
 
32
33
    pass
33
34
 
34
35
import sys
35
 
import unittest
36
36
import argparse
37
 
import logging
38
 
import os
39
37
import locale
40
38
import datetime
41
39
import re
 
40
import os
42
41
import collections
43
42
import json
 
43
import unittest
 
44
import logging
44
45
import io
45
46
import tempfile
46
47
import contextlib
47
48
 
48
 
if sys.version_info.major == 2:
49
 
    __metaclass__ = type
50
 
    str = unicode
51
 
    input = raw_input
52
 
 
53
 
 
54
 
class gi:
55
 
    """Dummy gi module, for the tests"""
56
 
    class repository:
57
 
        class GLib:
58
 
            class Error(Exception):
59
 
                pass
60
 
 
61
 
 
62
 
dbussy = None
63
 
ravel = None
64
 
dbus_python = None
65
 
pydbus = None
66
 
 
67
49
try:
68
 
    import dbussy
69
 
    import ravel
 
50
    import pydbus
 
51
    import gi
 
52
    dbus_python = None
70
53
except ImportError:
71
 
    try:
72
 
        import pydbus
73
 
        import gi
74
 
    except ImportError:
75
 
        import dbus as dbus_python
76
 
 
 
54
    import dbus as dbus_python
 
55
    pydbus = None
 
56
    class gi(object):
 
57
        """Dummy gi module, for the tests"""
 
58
        class repository(object):
 
59
            class GLib(object):
 
60
                class Error(Exception):
 
61
                    pass
77
62
 
78
63
# Show warnings by default
79
64
if not sys.warnoptions:
80
65
    import warnings
81
66
    warnings.simplefilter("default")
82
67
 
83
 
log = logging.getLogger(os.path.basename(sys.argv[0]))
84
 
logging.basicConfig(level="INFO",         # Show info level messages
 
68
log = logging.getLogger(sys.argv[0])
 
69
logging.basicConfig(level="INFO", # Show info level messages
85
70
                    format="%(message)s") # Show basic log messages
86
71
 
87
72
logging.captureWarnings(True)   # Show warnings via the logging system
88
73
 
89
74
if sys.version_info.major == 2:
 
75
    str = unicode
90
76
    import StringIO
91
77
    io.StringIO = StringIO.StringIO
92
78
 
93
79
locale.setlocale(locale.LC_ALL, "")
94
80
 
95
 
version = "1.8.17"
 
81
version = "1.8.3"
96
82
 
97
83
 
98
84
def main():
105
91
    clientnames = options.client
106
92
 
107
93
    if options.debug:
108
 
        logging.getLogger("").setLevel(logging.DEBUG)
 
94
        log.setLevel(logging.DEBUG)
109
95
 
110
 
    if dbussy is not None and ravel is not None:
111
 
        bus = dbussy_adapter.CachingBus(dbussy, ravel)
112
 
    elif pydbus is not None:
 
96
    if pydbus is not None:
113
97
        bus = pydbus_adapter.CachingBus(pydbus)
114
98
    else:
115
99
        bus = dbus_python_adapter.CachingBus(dbus_python)
259
243
        return rfc3339_duration_to_delta(interval)
260
244
    except ValueError as e:
261
245
        log.warning("%s - Parsing as pre-1.6.1 interval instead",
262
 
                    " ".join(e.args))
 
246
                    ' '.join(e.args))
263
247
    return parse_pre_1_6_1_interval(interval)
264
248
 
265
249
 
266
250
def rfc3339_duration_to_delta(duration):
267
251
    """Parse an RFC 3339 "duration" and return a datetime.timedelta
268
252
 
269
 
    >>> rfc3339_duration_to_delta("P7D") == datetime.timedelta(7)
270
 
    True
271
 
    >>> rfc3339_duration_to_delta("PT60S") == datetime.timedelta(0, 60)
272
 
    True
273
 
    >>> rfc3339_duration_to_delta("PT60M") == datetime.timedelta(hours=1)
274
 
    True
275
 
    >>> # 60 months
276
 
    >>> rfc3339_duration_to_delta("P60M") == datetime.timedelta(1680)
277
 
    True
278
 
    >>> rfc3339_duration_to_delta("PT24H") == datetime.timedelta(1)
279
 
    True
280
 
    >>> rfc3339_duration_to_delta("P1W") == datetime.timedelta(7)
281
 
    True
282
 
    >>> rfc3339_duration_to_delta("PT5M30S") == datetime.timedelta(0, 330)
283
 
    True
284
 
    >>> rfc3339_duration_to_delta("P1DT3M20S") == datetime.timedelta(1, 200)
285
 
    True
 
253
    >>> rfc3339_duration_to_delta("P7D")
 
254
    datetime.timedelta(7)
 
255
    >>> rfc3339_duration_to_delta("PT60S")
 
256
    datetime.timedelta(0, 60)
 
257
    >>> rfc3339_duration_to_delta("PT60M")
 
258
    datetime.timedelta(0, 3600)
 
259
    >>> rfc3339_duration_to_delta("P60M")
 
260
    datetime.timedelta(1680)
 
261
    >>> rfc3339_duration_to_delta("PT24H")
 
262
    datetime.timedelta(1)
 
263
    >>> rfc3339_duration_to_delta("P1W")
 
264
    datetime.timedelta(7)
 
265
    >>> rfc3339_duration_to_delta("PT5M30S")
 
266
    datetime.timedelta(0, 330)
 
267
    >>> rfc3339_duration_to_delta("P1DT3M20S")
 
268
    datetime.timedelta(1, 200)
286
269
    >>> # Can not be empty:
287
270
    >>> rfc3339_duration_to_delta("")
288
271
    Traceback (most recent call last):
395
378
 
396
379
 
397
380
def parse_pre_1_6_1_interval(interval):
398
 
    r"""Parse an interval string as documented by Mandos before 1.6.1,
 
381
    """Parse an interval string as documented by Mandos before 1.6.1,
399
382
    and return a datetime.timedelta
400
383
 
401
 
    >>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7)
402
 
    True
403
 
    >>> parse_pre_1_6_1_interval("60s") == datetime.timedelta(0, 60)
404
 
    True
405
 
    >>> parse_pre_1_6_1_interval("60m") == datetime.timedelta(hours=1)
406
 
    True
407
 
    >>> parse_pre_1_6_1_interval("24h") == datetime.timedelta(days=1)
408
 
    True
409
 
    >>> parse_pre_1_6_1_interval("1w") == datetime.timedelta(days=7)
410
 
    True
411
 
    >>> parse_pre_1_6_1_interval("5m 30s") == datetime.timedelta(0, 330)
412
 
    True
413
 
    >>> parse_pre_1_6_1_interval("") == datetime.timedelta(0)
414
 
    True
 
384
    >>> parse_pre_1_6_1_interval('7d')
 
385
    datetime.timedelta(7)
 
386
    >>> parse_pre_1_6_1_interval('60s')
 
387
    datetime.timedelta(0, 60)
 
388
    >>> parse_pre_1_6_1_interval('60m')
 
389
    datetime.timedelta(0, 3600)
 
390
    >>> parse_pre_1_6_1_interval('24h')
 
391
    datetime.timedelta(1)
 
392
    >>> parse_pre_1_6_1_interval('1w')
 
393
    datetime.timedelta(7)
 
394
    >>> parse_pre_1_6_1_interval('5m 30s')
 
395
    datetime.timedelta(0, 330)
 
396
    >>> parse_pre_1_6_1_interval('')
 
397
    datetime.timedelta(0)
415
398
    >>> # Ignore unknown characters, allow any order and repetitions
416
 
    >>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") \
417
 
    ... == datetime.timedelta(2, 480, 18000)
418
 
    True
 
399
    >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
 
400
    datetime.timedelta(2, 480, 18000)
419
401
 
420
402
    """
421
403
 
484
466
        parser.error("--remove can only be combined with --deny")
485
467
 
486
468
 
487
 
class dbus:
 
469
class dbus(object):
488
470
 
489
 
    class SystemBus:
 
471
    class SystemBus(object):
490
472
 
491
473
        object_manager_iface = "org.freedesktop.DBus.ObjectManager"
492
 
 
493
474
        def get_managed_objects(self, busname, objectpath):
494
475
            return self.call_method("GetManagedObjects", busname,
495
476
                                    objectpath,
496
477
                                    self.object_manager_iface)
497
478
 
498
479
        properties_iface = "org.freedesktop.DBus.Properties"
499
 
 
500
480
        def set_property(self, busname, objectpath, interface, key,
501
481
                         value):
502
482
            self.call_method("Set", busname, objectpath,
503
483
                             self.properties_iface, interface, key,
504
484
                             value)
505
485
 
506
 
        def call_method(self, methodname, busname, objectpath,
507
 
                        interface, *args):
508
 
            raise NotImplementedError()
509
486
 
510
487
    class MandosBus(SystemBus):
511
488
        busname_domain = "se.recompile"
543
520
        pass
544
521
 
545
522
 
546
 
class dbus_python_adapter:
 
523
class dbus_python_adapter(object):
547
524
 
548
525
    class SystemBus(dbus.MandosBus):
549
526
        """Use dbus-python"""
594
571
                        for key, subval in value.items()}
595
572
            return value
596
573
 
597
 
        def set_client_property(self, objectpath, key, value):
598
 
            if key == "Secret":
599
 
                if not isinstance(value, bytes):
600
 
                    value = value.encode("utf-8")
601
 
                value = self.dbus_python.ByteArray(value)
602
 
            return self.set_property(self.busname, objectpath,
603
 
                                     self.client_interface, key,
604
 
                                     value)
605
574
 
606
 
    class SilenceLogger:
 
575
    class SilenceLogger(object):
607
576
        "Simple context manager to silence a particular logger"
608
 
 
609
577
        def __init__(self, loggername):
610
578
            self.logger = logging.getLogger(loggername)
611
579
 
621
589
        def __exit__(self, exc_type, exc_val, exc_tb):
622
590
            self.logger.removeFilter(self.nullfilter)
623
591
 
 
592
 
624
593
    class CachingBus(SystemBus):
625
594
        """A caching layer for dbus_python_adapter.SystemBus"""
626
 
 
627
595
        def __init__(self, *args, **kwargs):
628
596
            self.object_cache = {}
629
597
            super(dbus_python_adapter.CachingBus,
630
598
                  self).__init__(*args, **kwargs)
631
 
 
632
599
        def get_object(self, busname, objectpath):
633
600
            try:
634
601
                return self.object_cache[(busname, objectpath)]
636
603
                new_object = super(
637
604
                    dbus_python_adapter.CachingBus,
638
605
                    self).get_object(busname, objectpath)
639
 
                self.object_cache[(busname, objectpath)] = new_object
 
606
                self.object_cache[(busname, objectpath)]  = new_object
640
607
                return new_object
641
608
 
642
609
 
643
 
class pydbus_adapter:
 
610
class pydbus_adapter(object):
644
611
    class SystemBus(dbus.MandosBus):
645
612
        def __init__(self, module=pydbus):
646
613
            self.pydbus = module
689
656
 
690
657
    class CachingBus(SystemBus):
691
658
        """A caching layer for pydbus_adapter.SystemBus"""
692
 
 
693
659
        def __init__(self, *args, **kwargs):
694
660
            self.object_cache = {}
695
661
            super(pydbus_adapter.CachingBus,
696
662
                  self).__init__(*args, **kwargs)
697
 
 
698
663
        def get(self, busname, objectpath):
699
664
            try:
700
665
                return self.object_cache[(busname, objectpath)]
701
666
            except KeyError:
702
667
                new_object = (super(pydbus_adapter.CachingBus, self)
703
668
                              .get(busname, objectpath))
704
 
                self.object_cache[(busname, objectpath)] = new_object
705
 
                return new_object
706
 
 
707
 
 
708
 
class dbussy_adapter:
709
 
    class SystemBus(dbus.SystemBus):
710
 
        """Use DBussy"""
711
 
 
712
 
        def __init__(self, dbussy, ravel):
713
 
            self.dbussy = dbussy
714
 
            self.ravel = ravel
715
 
            self.bus = ravel.system_bus()
716
 
 
717
 
        @contextlib.contextmanager
718
 
        def convert_exception(self, exception_class=dbus.Error):
719
 
            try:
720
 
                yield
721
 
            except self.dbussy.DBusError as e:
722
 
                # This does what "raise from" would do
723
 
                exc = exception_class(*e.args)
724
 
                exc.__cause__ = e
725
 
                raise exc
726
 
 
727
 
        def call_method(self, methodname, busname, objectpath,
728
 
                        interface, *args):
729
 
            proxy_object = self.get_object(busname, objectpath)
730
 
            log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
731
 
                      interface, methodname,
732
 
                      ", ".join(repr(a) for a in args))
733
 
            iface = proxy_object.get_interface(interface)
734
 
            method = getattr(iface, methodname)
735
 
            with self.convert_exception(dbus.Error):
736
 
                value = method(*args)
737
 
            # DBussy returns values either as an empty list or as a
738
 
            # list of one element with the return value
739
 
            if value:
740
 
                return self.type_filter(value[0])
741
 
 
742
 
        def get_object(self, busname, objectpath):
743
 
            log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
744
 
                      busname, objectpath)
745
 
            with self.convert_exception(dbus.ConnectFailed):
746
 
                return self.bus[busname][objectpath]
747
 
 
748
 
        def type_filter(self, value):
749
 
            """Convert the most bothersome types to Python types"""
750
 
            # A D-Bus Variant value is represented as the Python type
751
 
            # Tuple[dbussy.DBUS.Signature, Any]
752
 
            if isinstance(value, tuple):
753
 
                if (len(value) == 2
754
 
                    and isinstance(value[0],
755
 
                                   self.dbussy.DBUS.Signature)):
756
 
                    return self.type_filter(value[1])
757
 
            elif isinstance(value, self.dbussy.DBUS.ObjectPath):
758
 
                return str(value)
759
 
            # Also recurse into dictionaries
760
 
            elif isinstance(value, dict):
761
 
                return {self.type_filter(key):
762
 
                        self.type_filter(subval)
763
 
                        for key, subval in value.items()}
764
 
            return value
765
 
 
766
 
        def set_property(self, busname, objectpath, interface, key,
767
 
                         value):
768
 
            proxy_object = self.get_object(busname, objectpath)
769
 
            log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
770
 
                      objectpath, self.properties_iface, interface,
771
 
                      key, value)
772
 
            if key == "Secret":
773
 
                # DBussy wants a Byte Array to be a sequence of
774
 
                # values, not a byte string
775
 
                value = tuple(value)
776
 
            setattr(proxy_object.get_interface(interface), key, value)
777
 
 
778
 
    class MandosBus(SystemBus, dbus.MandosBus):
779
 
        pass
780
 
 
781
 
    class CachingBus(MandosBus):
782
 
        """A caching layer for dbussy_adapter.MandosBus"""
783
 
 
784
 
        def __init__(self, *args, **kwargs):
785
 
            self.object_cache = {}
786
 
            super(dbussy_adapter.CachingBus, self).__init__(*args,
787
 
                                                            **kwargs)
788
 
 
789
 
        def get_object(self, busname, objectpath):
790
 
            try:
791
 
                return self.object_cache[(busname, objectpath)]
792
 
            except KeyError:
793
 
                new_object = super(
794
 
                    dbussy_adapter.CachingBus,
795
 
                    self).get_object(busname, objectpath)
796
 
                self.object_cache[(busname, objectpath)] = new_object
 
669
                self.object_cache[(busname, objectpath)]  = new_object
797
670
                return new_object
798
671
 
799
672
 
830
703
    return commands
831
704
 
832
705
 
833
 
class command:
 
706
class command(object):
834
707
    """A namespace for command classes"""
835
708
 
836
 
    class Base:
 
709
    class Base(object):
837
710
        """Abstract base class for commands"""
838
 
 
839
711
        def run(self, clients, bus=None):
840
712
            """Normal commands should implement run_on_one_client(),
841
713
but commands which want to operate on all clients at the same time can
845
717
            for client, properties in clients.items():
846
718
                self.run_on_one_client(client, properties)
847
719
 
 
720
 
848
721
    class IsEnabled(Base):
849
722
        def run(self, clients, bus=None):
850
723
            properties = next(iter(clients.values()))
852
725
                sys.exit(0)
853
726
            sys.exit(1)
854
727
 
 
728
 
855
729
    class Approve(Base):
856
730
        def run_on_one_client(self, client, properties):
857
731
            self.bus.call_client_method(client, "Approve", True)
858
732
 
 
733
 
859
734
    class Deny(Base):
860
735
        def run_on_one_client(self, client, properties):
861
736
            self.bus.call_client_method(client, "Approve", False)
862
737
 
 
738
 
863
739
    class Remove(Base):
864
740
        def run(self, clients, bus):
865
741
            for clientpath in frozenset(clients.keys()):
866
742
                bus.call_server_method("RemoveClient", clientpath)
867
743
 
 
744
 
868
745
    class Output(Base):
869
746
        """Abstract class for commands outputting client details"""
870
747
        all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
876
753
                        "Checker", "ExtendedTimeout", "Expires",
877
754
                        "LastCheckerStatus")
878
755
 
 
756
 
879
757
    class DumpJSON(Output):
880
758
        def run(self, clients, bus=None):
881
759
            data = {properties["Name"]:
882
760
                    {key: properties[key]
883
761
                     for key in self.all_keywords}
884
762
                    for properties in clients.values()}
885
 
            print(json.dumps(data, indent=4, separators=(",", ": ")))
 
763
            print(json.dumps(data, indent=4, separators=(',', ': ')))
 
764
 
886
765
 
887
766
    class PrintTable(Output):
888
767
        def __init__(self, verbose=False):
896
775
                keywords = self.all_keywords
897
776
            print(self.TableOfClients(clients.values(), keywords))
898
777
 
899
 
        class TableOfClients:
 
778
        class TableOfClients(object):
900
779
            tableheaders = {
901
780
                "Name": "Name",
902
781
                "Enabled": "Enabled",
929
808
 
930
809
            if sys.version_info.major == 2:
931
810
                __unicode__ = __str__
932
 
 
933
811
                def __str__(self):
934
812
                    return str(self).encode(
935
813
                        locale.getpreferredencoding())
981
859
                                minutes=(td.seconds % 3600) // 60,
982
860
                                seconds=td.seconds % 60))
983
861
 
 
862
 
984
863
    class PropertySetter(Base):
985
864
        "Abstract class for Actions for setting one client property"
986
865
 
993
872
        def propname(self):
994
873
            raise NotImplementedError()
995
874
 
 
875
 
996
876
    class Enable(PropertySetter):
997
877
        propname = "Enabled"
998
878
        value_to_set = True
999
879
 
 
880
 
1000
881
    class Disable(PropertySetter):
1001
882
        propname = "Enabled"
1002
883
        value_to_set = False
1003
884
 
 
885
 
1004
886
    class BumpTimeout(PropertySetter):
1005
887
        propname = "LastCheckedOK"
1006
888
        value_to_set = ""
1007
889
 
 
890
 
1008
891
    class StartChecker(PropertySetter):
1009
892
        propname = "CheckerRunning"
1010
893
        value_to_set = True
1011
894
 
 
895
 
1012
896
    class StopChecker(PropertySetter):
1013
897
        propname = "CheckerRunning"
1014
898
        value_to_set = False
1015
899
 
 
900
 
1016
901
    class ApproveByDefault(PropertySetter):
1017
902
        propname = "ApprovedByDefault"
1018
903
        value_to_set = True
1019
904
 
 
905
 
1020
906
    class DenyByDefault(PropertySetter):
1021
907
        propname = "ApprovedByDefault"
1022
908
        value_to_set = False
1023
909
 
 
910
 
1024
911
    class PropertySetterValue(PropertySetter):
1025
912
        """Abstract class for PropertySetter recieving a value as
1026
913
constructor argument instead of a class attribute."""
1027
 
 
1028
914
        def __init__(self, value):
1029
915
            self.value_to_set = value
1030
916
 
1037
923
    class SetChecker(PropertySetterValue):
1038
924
        propname = "Checker"
1039
925
 
 
926
 
1040
927
    class SetHost(PropertySetterValue):
1041
928
        propname = "Host"
1042
929
 
 
930
 
1043
931
    class SetSecret(PropertySetterValue):
1044
932
        propname = "Secret"
1045
933
 
1053
941
            self._vts = value.read()
1054
942
            value.close()
1055
943
 
 
944
 
1056
945
    class PropertySetterValueMilliseconds(PropertySetterValue):
1057
946
        """Abstract class for PropertySetterValue taking a value
1058
947
argument as a datetime.timedelta() but should store it as
1067
956
            "When setting, convert value from a datetime.timedelta"
1068
957
            self._vts = int(round(value.total_seconds() * 1000))
1069
958
 
 
959
 
1070
960
    class SetTimeout(PropertySetterValueMilliseconds):
1071
961
        propname = "Timeout"
1072
962
 
 
963
 
1073
964
    class SetExtendedTimeout(PropertySetterValueMilliseconds):
1074
965
        propname = "ExtendedTimeout"
1075
966
 
 
967
 
1076
968
    class SetInterval(PropertySetterValueMilliseconds):
1077
969
        propname = "Interval"
1078
970
 
 
971
 
1079
972
    class SetApprovalDelay(PropertySetterValueMilliseconds):
1080
973
        propname = "ApprovalDelay"
1081
974
 
 
975
 
1082
976
    class SetApprovalDuration(PropertySetterValueMilliseconds):
1083
977
        propname = "ApprovalDuration"
1084
978
 
1118
1012
                                                     "output"))
1119
1013
 
1120
1014
 
1121
 
class Unique:
 
1015
class Unique(object):
1122
1016
    """Class for objects which exist only to be unique objects, since
1123
1017
unittest.mock.sentinel only exists in Python 3.3"""
1124
1018
 
1408
1302
class Test_dbus_python_adapter_SystemBus(TestCaseWithAssertLogs):
1409
1303
 
1410
1304
    def MockDBusPython_func(self, func):
1411
 
        class mock_dbus_python:
 
1305
        class mock_dbus_python(object):
1412
1306
            """mock dbus-python module"""
1413
 
            class exceptions:
 
1307
            class exceptions(object):
1414
1308
                """Pseudo-namespace"""
1415
1309
                class DBusException(Exception):
1416
1310
                    pass
1417
 
            class SystemBus:
 
1311
            class SystemBus(object):
1418
1312
                @staticmethod
1419
1313
                def get_object(busname, objectpath):
1420
1314
                    DBusObject = collections.namedtuple(
1421
 
                        "DBusObject", ("methodname", "Set"))
 
1315
                        "DBusObject", ("methodname",))
1422
1316
                    def method(*args, **kwargs):
1423
1317
                        self.assertEqual({"dbus_interface":
1424
1318
                                          "interface"},
1425
1319
                                         kwargs)
1426
1320
                        return func(*args)
1427
 
                    def set_property(interface, key, value,
1428
 
                                     dbus_interface=None):
1429
 
                        self.assertEqual(
1430
 
                            "org.freedesktop.DBus.Properties",
1431
 
                            dbus_interface)
1432
 
                        self.assertEqual("Secret", key)
1433
 
                        return func(interface, key, value,
1434
 
                                    dbus_interface=dbus_interface)
1435
 
                    return DBusObject(methodname=method,
1436
 
                                      Set=set_property)
1437
 
            class Boolean:
 
1321
                    return DBusObject(methodname=method)
 
1322
            class Boolean(object):
1438
1323
                def __init__(self, value):
1439
1324
                    self.value = bool(value)
1440
1325
                def __bool__(self):
1445
1330
                pass
1446
1331
            class Dictionary(dict):
1447
1332
                pass
1448
 
            class ByteArray(bytes):
1449
 
                pass
1450
1333
        return mock_dbus_python
1451
1334
 
1452
1335
    def call_method(self, bus, methodname, busname, objectpath,
1624
1507
        finally:
1625
1508
            dbus_logger.removeFilter(counting_handler)
1626
1509
 
1627
 
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
 
1510
        self.assertNotIsInstance(e, dbus.ConnectFailed)
1628
1511
 
1629
1512
        # Make sure the dbus logger was suppressed
1630
1513
        self.assertEqual(0, counting_handler.count)
1631
1514
 
1632
 
    def test_Set_Secret_sends_bytearray(self):
1633
 
        ret = [None]
1634
 
        def func(*args, **kwargs):
1635
 
            ret[0] = (args, kwargs)
1636
 
        mock_dbus_python = self.MockDBusPython_func(func)
1637
 
        bus = dbus_python_adapter.SystemBus(mock_dbus_python)
1638
 
        bus.set_client_property("objectpath", "Secret", "value")
1639
 
        expected_call = (("se.recompile.Mandos.Client", "Secret",
1640
 
                          mock_dbus_python.ByteArray(b"value")),
1641
 
                         {"dbus_interface":
1642
 
                          "org.freedesktop.DBus.Properties"})
1643
 
        self.assertEqual(expected_call, ret[0])
1644
 
        if sys.version_info.major == 2:
1645
 
            self.assertIsInstance(ret[0][0][-1],
1646
 
                                  mock_dbus_python.ByteArray)
1647
 
 
1648
1515
    def test_get_object_converts_to_correct_exception(self):
1649
1516
        bus = dbus_python_adapter.SystemBus(
1650
1517
            self.fake_dbus_python_raises_exception_on_connect)
1652
1519
            self.call_method(bus, "methodname", "busname",
1653
1520
                             "objectpath", "interface")
1654
1521
 
1655
 
    class fake_dbus_python_raises_exception_on_connect:
 
1522
    class fake_dbus_python_raises_exception_on_connect(object):
1656
1523
        """fake dbus-python module"""
1657
 
        class exceptions:
 
1524
        class exceptions(object):
1658
1525
            """Pseudo-namespace"""
1659
1526
            class DBusException(Exception):
1660
1527
                pass
1668
1535
 
1669
1536
 
1670
1537
class Test_dbus_python_adapter_CachingBus(unittest.TestCase):
1671
 
    class mock_dbus_python:
 
1538
    class mock_dbus_python(object):
1672
1539
        """mock dbus-python modules"""
1673
 
        class SystemBus:
 
1540
        class SystemBus(object):
1674
1541
            @staticmethod
1675
1542
            def get_object(busname, objectpath):
1676
1543
                return Unique()
1722
1589
class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs):
1723
1590
 
1724
1591
    def Stub_pydbus_func(self, func):
1725
 
        class stub_pydbus:
 
1592
        class stub_pydbus(object):
1726
1593
            """stub pydbus module"""
1727
 
            class SystemBus:
 
1594
            class SystemBus(object):
1728
1595
                @staticmethod
1729
1596
                def get(busname, objectpath):
1730
1597
                    DBusObject = collections.namedtuple(
1755
1622
        self.assertIs(ret, expected_method_return)
1756
1623
 
1757
1624
    def test_call_method_handles_exception(self):
 
1625
        dbus_logger = logging.getLogger("dbus.proxies")
 
1626
 
1758
1627
        def func():
1759
1628
            raise gi.repository.GLib.Error()
1760
1629
 
1765
1634
            self.call_method(bus, "methodname", "busname",
1766
1635
                             "objectpath", "interface")
1767
1636
 
1768
 
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
 
1637
        self.assertNotIsInstance(e, dbus.ConnectFailed)
1769
1638
 
1770
1639
    def test_get_converts_to_correct_exception(self):
1771
1640
        bus = pydbus_adapter.SystemBus(
1774
1643
            self.call_method(bus, "methodname", "busname",
1775
1644
                             "objectpath", "interface")
1776
1645
 
1777
 
    class fake_pydbus_raises_exception_on_connect:
 
1646
    class fake_pydbus_raises_exception_on_connect(object):
1778
1647
        """fake dbus-python module"""
1779
1648
        @classmethod
1780
1649
        def SystemBus(cls):
1784
1653
            return Bus(get=get)
1785
1654
 
1786
1655
    def test_set_property_uses_setattr(self):
1787
 
        class Object:
 
1656
        class Object(object):
1788
1657
            pass
1789
1658
        obj = Object()
1790
 
        class pydbus_spy:
1791
 
            class SystemBus:
 
1659
        class pydbus_spy(object):
 
1660
            class SystemBus(object):
1792
1661
                @staticmethod
1793
1662
                def get(busname, objectpath):
1794
1663
                    return {"interface": obj}
1801
1670
    def test_get_suppresses_xml_deprecation_warning(self):
1802
1671
        if sys.version_info.major >= 3:
1803
1672
            return
1804
 
        class stub_pydbus_get:
1805
 
            class SystemBus:
 
1673
        class stub_pydbus_get(object):
 
1674
            class SystemBus(object):
1806
1675
                @staticmethod
1807
1676
                def get(busname, objectpath):
1808
1677
                    warnings.warn_explicit(
1816
1685
 
1817
1686
 
1818
1687
class Test_pydbus_adapter_CachingBus(unittest.TestCase):
1819
 
    class stub_pydbus:
 
1688
    class stub_pydbus(object):
1820
1689
        """stub pydbus module"""
1821
 
        class SystemBus:
 
1690
        class SystemBus(object):
1822
1691
            @staticmethod
1823
1692
            def get(busname, objectpath):
1824
1693
                return Unique()
1866
1735
        self.assertIs(obj1, obj1b)
1867
1736
 
1868
1737
 
1869
 
class Test_dbussy_adapter_SystemBus(TestCaseWithAssertLogs):
1870
 
 
1871
 
    class dummy_dbussy:
1872
 
        class DBUS:
1873
 
            class ObjectPath(str):
1874
 
                pass
1875
 
        class DBusError(Exception):
1876
 
            pass
1877
 
 
1878
 
    def fake_ravel_func(self, func):
1879
 
        class fake_ravel:
1880
 
            @staticmethod
1881
 
            def system_bus():
1882
 
                class DBusInterfaceProxy:
1883
 
                    @staticmethod
1884
 
                    def methodname(*args):
1885
 
                        return [func(*args)]
1886
 
                class DBusObject:
1887
 
                    @staticmethod
1888
 
                    def get_interface(interface):
1889
 
                        if interface == "interface":
1890
 
                            return DBusInterfaceProxy()
1891
 
                return {"busname": {"objectpath": DBusObject()}}
1892
 
        return fake_ravel
1893
 
 
1894
 
    def call_method(self, bus, methodname, busname, objectpath,
1895
 
                    interface, *args):
1896
 
        with self.assertLogs(log, logging.DEBUG):
1897
 
            return bus.call_method(methodname, busname, objectpath,
1898
 
                                   interface, *args)
1899
 
 
1900
 
    def test_call_method_returns(self):
1901
 
        expected_method_return = Unique()
1902
 
        method_args = (Unique(), Unique())
1903
 
        def func(*args):
1904
 
            self.assertEqual(len(method_args), len(args))
1905
 
            for marg, arg in zip(method_args, args):
1906
 
                self.assertIs(marg, arg)
1907
 
            return expected_method_return
1908
 
        fake_ravel = self.fake_ravel_func(func)
1909
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1910
 
        ret = self.call_method(bus, "methodname", "busname",
1911
 
                               "objectpath", "interface",
1912
 
                               *method_args)
1913
 
        self.assertIs(ret, expected_method_return)
1914
 
 
1915
 
    def test_call_method_filters_objectpath(self):
1916
 
        def func():
1917
 
            return method_return
1918
 
        fake_ravel = self.fake_ravel_func(func)
1919
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1920
 
        method_return = (self.dummy_dbussy.DBUS
1921
 
                         .ObjectPath("objectpath"))
1922
 
        ret = self.call_method(bus, "methodname", "busname",
1923
 
                               "objectpath", "interface")
1924
 
        self.assertEqual("objectpath", ret)
1925
 
        self.assertNotIsInstance(ret,
1926
 
                                 self.dummy_dbussy.DBUS.ObjectPath)
1927
 
 
1928
 
    def test_call_method_filters_objectpaths_in_dict(self):
1929
 
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1930
 
        def func():
1931
 
            return method_return
1932
 
        fake_ravel = self.fake_ravel_func(func)
1933
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1934
 
        method_return = {
1935
 
            ObjectPath("objectpath_key_1"):
1936
 
            ObjectPath("objectpath_value_1"),
1937
 
            ObjectPath("objectpath_key_2"):
1938
 
            ObjectPath("objectpath_value_2"),
1939
 
        }
1940
 
        ret = self.call_method(bus, "methodname", "busname",
1941
 
                               "objectpath", "interface")
1942
 
        expected_method_return = {str(key): str(value)
1943
 
                                  for key, value in
1944
 
                                  method_return.items()}
1945
 
        for key, value in ret.items():
1946
 
            self.assertNotIsInstance(key, ObjectPath)
1947
 
            self.assertNotIsInstance(value, ObjectPath)
1948
 
        self.assertEqual(expected_method_return, ret)
1949
 
        self.assertIsInstance(ret, dict)
1950
 
 
1951
 
    def test_call_method_filters_objectpaths_in_dict_in_dict(self):
1952
 
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1953
 
        def func():
1954
 
            return method_return
1955
 
        fake_ravel = self.fake_ravel_func(func)
1956
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1957
 
        method_return = {
1958
 
            ObjectPath("key1"): {
1959
 
                ObjectPath("key11"): ObjectPath("value11"),
1960
 
                ObjectPath("key12"): ObjectPath("value12"),
1961
 
            },
1962
 
            ObjectPath("key2"): {
1963
 
                ObjectPath("key21"): ObjectPath("value21"),
1964
 
                ObjectPath("key22"): ObjectPath("value22"),
1965
 
            },
1966
 
        }
1967
 
        ret = self.call_method(bus, "methodname", "busname",
1968
 
                               "objectpath", "interface")
1969
 
        expected_method_return = {
1970
 
            "key1": {"key11": "value11",
1971
 
                     "key12": "value12"},
1972
 
            "key2": {"key21": "value21",
1973
 
                     "key22": "value22"},
1974
 
        }
1975
 
        self.assertEqual(expected_method_return, ret)
1976
 
        for key, value in ret.items():
1977
 
            self.assertIsInstance(value, dict)
1978
 
            self.assertEqual(expected_method_return[key], value)
1979
 
            self.assertNotIsInstance(key, ObjectPath)
1980
 
            for inner_key, inner_value in value.items():
1981
 
                self.assertIsInstance(value, dict)
1982
 
                self.assertEqual(
1983
 
                    expected_method_return[key][inner_key],
1984
 
                    inner_value)
1985
 
                self.assertNotIsInstance(key, ObjectPath)
1986
 
 
1987
 
    def test_call_method_filters_objectpaths_in_dict_three_deep(self):
1988
 
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1989
 
        def func():
1990
 
            return method_return
1991
 
        fake_ravel = self.fake_ravel_func(func)
1992
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1993
 
        method_return = {
1994
 
            ObjectPath("key1"): {
1995
 
                ObjectPath("key2"): {
1996
 
                    ObjectPath("key3"): ObjectPath("value"),
1997
 
                },
1998
 
            },
1999
 
        }
2000
 
        ret = self.call_method(bus, "methodname", "busname",
2001
 
                               "objectpath", "interface")
2002
 
        expected_method_return = {"key1": {"key2": {"key3": "value"}}}
2003
 
        self.assertEqual(expected_method_return, ret)
2004
 
        self.assertIsInstance(ret, dict)
2005
 
        self.assertNotIsInstance(next(iter(ret.keys())), ObjectPath)
2006
 
        self.assertIsInstance(ret["key1"], dict)
2007
 
        self.assertNotIsInstance(next(iter(ret["key1"].keys())),
2008
 
                                 ObjectPath)
2009
 
        self.assertIsInstance(ret["key1"]["key2"], dict)
2010
 
        self.assertNotIsInstance(
2011
 
            next(iter(ret["key1"]["key2"].keys())),
2012
 
            ObjectPath)
2013
 
        self.assertEqual("value", ret["key1"]["key2"]["key3"])
2014
 
        self.assertNotIsInstance(ret["key1"]["key2"]["key3"],
2015
 
                                 self.dummy_dbussy.DBUS.ObjectPath)
2016
 
 
2017
 
    def test_call_method_handles_exception(self):
2018
 
        def func():
2019
 
            raise self.dummy_dbussy.DBusError()
2020
 
 
2021
 
        fake_ravel = self.fake_ravel_func(func)
2022
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
2023
 
 
2024
 
        with self.assertRaises(dbus.Error) as e:
2025
 
            self.call_method(bus, "methodname", "busname",
2026
 
                             "objectpath", "interface")
2027
 
 
2028
 
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
2029
 
 
2030
 
    def test_get_object_converts_to_correct_exception(self):
2031
 
        class fake_ravel_raises_exception_on_connect:
2032
 
            @staticmethod
2033
 
            def system_bus():
2034
 
                class Bus:
2035
 
                    @staticmethod
2036
 
                    def __getitem__(key):
2037
 
                        if key == "objectpath":
2038
 
                            raise self.dummy_dbussy.DBusError()
2039
 
                        raise Exception(key)
2040
 
                return {"busname": Bus()}
2041
 
        def func():
2042
 
            raise self.dummy_dbussy.DBusError()
2043
 
        bus = dbussy_adapter.SystemBus(
2044
 
            self.dummy_dbussy,
2045
 
            fake_ravel_raises_exception_on_connect)
2046
 
        with self.assertRaises(dbus.ConnectFailed):
2047
 
            self.call_method(bus, "methodname", "busname",
2048
 
                             "objectpath", "interface")
2049
 
 
2050
 
 
2051
1738
class Test_commands_from_options(unittest.TestCase):
2052
1739
 
2053
1740
    def setUp(self):
2058
1745
        self.assert_command_from_args(["--is-enabled", "client"],
2059
1746
                                      command.IsEnabled)
2060
1747
 
2061
 
    def assert_command_from_args(self, args, command_cls, length=1,
2062
 
                                 clients=None, **cmd_attrs):
 
1748
    def assert_command_from_args(self, args, command_cls,
 
1749
                                 **cmd_attrs):
2063
1750
        """Assert that parsing ARGS should result in an instance of
2064
1751
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
2065
1752
        options = self.parser.parse_args(args)
2066
1753
        check_option_syntax(self.parser, options)
2067
1754
        commands = commands_from_options(options)
2068
 
        self.assertEqual(length, len(commands))
2069
 
        for command in commands:
2070
 
            if isinstance(command, command_cls):
2071
 
                break
2072
 
        else:
2073
 
            self.assertIsInstance(command, command_cls)
2074
 
        if clients is not None:
2075
 
            self.assertEqual(clients, options.client)
 
1755
        self.assertEqual(1, len(commands))
 
1756
        command = commands[0]
 
1757
        self.assertIsInstance(command, command_cls)
2076
1758
        for key, value in cmd_attrs.items():
2077
1759
            self.assertEqual(value, getattr(command, key))
2078
1760
 
2079
 
    def assert_commands_from_args(self, args, commands, clients=None):
2080
 
        for cmd in commands:
2081
 
            self.assert_command_from_args(args, cmd,
2082
 
                                          length=len(commands),
2083
 
                                          clients=clients)
2084
 
 
2085
1761
    def test_is_enabled_short(self):
2086
1762
        self.assert_command_from_args(["-V", "client"],
2087
1763
                                      command.IsEnabled)
2278
1954
                                      verbose=True)
2279
1955
 
2280
1956
 
2281
 
    def test_manual_page_example_1(self):
2282
 
        self.assert_command_from_args("",
2283
 
                                      command.PrintTable,
2284
 
                                      clients=[],
2285
 
                                      verbose=False)
2286
 
 
2287
 
    def test_manual_page_example_2(self):
2288
 
        self.assert_command_from_args(
2289
 
            "--verbose foo1.example.org foo2.example.org".split(),
2290
 
            command.PrintTable, clients=["foo1.example.org",
2291
 
                                         "foo2.example.org"],
2292
 
            verbose=True)
2293
 
 
2294
 
    def test_manual_page_example_3(self):
2295
 
        self.assert_command_from_args("--enable --all".split(),
2296
 
                                      command.Enable,
2297
 
                                      clients=[])
2298
 
 
2299
 
    def test_manual_page_example_4(self):
2300
 
        self.assert_commands_from_args(
2301
 
            ("--timeout=PT5M --interval=PT1M foo1.example.org"
2302
 
             " foo2.example.org").split(),
2303
 
            [command.SetTimeout, command.SetInterval],
2304
 
            clients=["foo1.example.org", "foo2.example.org"])
2305
 
 
2306
 
    def test_manual_page_example_5(self):
2307
 
        self.assert_command_from_args("--approve --all".split(),
2308
 
                                      command.Approve,
2309
 
                                      clients=[])
2310
 
 
2311
 
 
2312
1957
class TestCommand(unittest.TestCase):
2313
1958
    """Abstract class for tests of command classes"""
2314
1959
 
2427
2072
        busname = "se.recompile.Mandos"
2428
2073
        client_interface = "se.recompile.Mandos.Client"
2429
2074
        command.Approve().run(self.bus.clients, self.bus)
2430
 
        self.assertTrue(self.bus.clients)
2431
2075
        for clientpath in self.bus.clients:
2432
2076
            self.assertIn(("Approve", busname, clientpath,
2433
2077
                           client_interface, (True,)), self.bus.calls)
2436
2080
        busname = "se.recompile.Mandos"
2437
2081
        client_interface = "se.recompile.Mandos.Client"
2438
2082
        command.Deny().run(self.bus.clients, self.bus)
2439
 
        self.assertTrue(self.bus.clients)
2440
2083
        for clientpath in self.bus.clients:
2441
2084
            self.assertIn(("Approve", busname, clientpath,
2442
2085
                           client_interface, (False,)),
2443
2086
                          self.bus.calls)
2444
2087
 
2445
2088
    def test_Remove(self):
2446
 
        busname = "se.recompile.Mandos"
2447
 
        server_path = "/"
2448
 
        server_interface = "se.recompile.Mandos"
2449
 
        orig_clients = self.bus.clients.copy()
2450
2089
        command.Remove().run(self.bus.clients, self.bus)
2451
 
        self.assertFalse(self.bus.clients)
2452
 
        for clientpath in orig_clients:
2453
 
            self.assertIn(("RemoveClient", busname,
2454
 
                           server_path, server_interface,
 
2090
        for clientpath in self.bus.clients:
 
2091
            self.assertIn(("RemoveClient", dbus_busname,
 
2092
                           dbus_server_path, dbus_server_interface,
2455
2093
                           (clientpath,)), self.bus.calls)
2456
2094
 
2457
2095
    expected_json = {
2659
2297
        else:
2660
2298
            cmd_args = [() for x in range(len(self.values_to_get))]
2661
2299
            values_to_get = self.values_to_get
2662
 
        self.assertTrue(values_to_get)
2663
2300
        for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2664
2301
            for clientpath in self.bus.clients:
2665
2302
                self.bus.clients[clientpath][self.propname] = (
2666
2303
                    Unique())
2667
2304
            self.command(*cmd_arg).run(self.bus.clients, self.bus)
2668
 
            self.assertTrue(self.bus.clients)
2669
2305
            for clientpath in self.bus.clients:
2670
2306
                value = (self.bus.clients[clientpath]
2671
2307
                         [self.propname])
2730
2366
class TestSetSecretCmd(TestPropertySetterCmd):
2731
2367
    command = command.SetSecret
2732
2368
    propname = "Secret"
2733
 
    def __init__(self, *args, **kwargs):
2734
 
        self.values_to_set = [io.BytesIO(b""),
2735
 
                              io.BytesIO(b"secret\0xyzzy\nbar")]
2736
 
        self.values_to_get = [f.getvalue() for f in
2737
 
                              self.values_to_set]
2738
 
        super(TestSetSecretCmd, self).__init__(*args, **kwargs)
 
2369
    values_to_set = [io.BytesIO(b""),
 
2370
                     io.BytesIO(b"secret\0xyzzy\nbar")]
 
2371
    values_to_get = [f.getvalue() for f in values_to_set]
2739
2372
 
2740
2373
 
2741
2374
class TestSetTimeoutCmd(TestPropertySetterCmd):
2794
2427
 
2795
2428
 
2796
2429
 
2797
 
def parse_test_args():
2798
 
    # type: () -> argparse.Namespace
 
2430
def should_only_run_tests():
2799
2431
    parser = argparse.ArgumentParser(add_help=False)
2800
 
    parser.add_argument("--check", action="store_true")
2801
 
    parser.add_argument("--prefix", )
 
2432
    parser.add_argument("--check", action='store_true')
2802
2433
    args, unknown_args = parser.parse_known_args()
2803
 
    if args.check:
2804
 
        # Remove test options from sys.argv
 
2434
    run_tests = args.check
 
2435
    if run_tests:
 
2436
        # Remove --check argument from sys.argv
2805
2437
        sys.argv[1:] = unknown_args
2806
 
    return args
 
2438
    return run_tests
2807
2439
 
2808
2440
# Add all tests from doctest strings
2809
2441
def load_tests(loader, tests, none):
2812
2444
    return tests
2813
2445
 
2814
2446
if __name__ == "__main__":
2815
 
    options = parse_test_args()
2816
2447
    try:
2817
 
        if options.check:
2818
 
            extra_test_prefix = options.prefix
2819
 
            if extra_test_prefix is not None:
2820
 
                if not (unittest.main(argv=[""], exit=False)
2821
 
                        .result.wasSuccessful()):
2822
 
                    sys.exit(1)
2823
 
                class ExtraTestLoader(unittest.TestLoader):
2824
 
                    testMethodPrefix = extra_test_prefix
2825
 
                # Call using ./scriptname --check [--verbose]
2826
 
                unittest.main(argv=[""], testLoader=ExtraTestLoader())
2827
 
            else:
2828
 
                unittest.main(argv=[""])
 
2448
        if should_only_run_tests():
 
2449
            # Call using ./tdd-python-script --check [--verbose]
 
2450
            unittest.main()
2829
2451
        else:
2830
2452
            main()
2831
2453
    finally:
2832
2454
        logging.shutdown()
2833
 
 
2834
 
# Local Variables:
2835
 
# run-tests:
2836
 
# (lambda (&optional extra)
2837
 
#   (if (not (funcall run-tests-in-test-buffer default-directory
2838
 
#             extra))
2839
 
#       (funcall show-test-buffer-in-test-window)
2840
 
#     (funcall remove-test-window)
2841
 
#     (if extra (message "Extra tests run successfully!"))))
2842
 
# run-tests-in-test-buffer:
2843
 
# (lambda (dir &optional extra)
2844
 
#   (with-current-buffer (get-buffer-create "*Test*")
2845
 
#     (setq buffer-read-only nil
2846
 
#           default-directory dir)
2847
 
#     (erase-buffer)
2848
 
#     (compilation-mode))
2849
 
#   (let ((process-result
2850
 
#          (let ((inhibit-read-only t))
2851
 
#            (process-file-shell-command
2852
 
#             (funcall get-command-line extra) nil "*Test*"))))
2853
 
#     (and (numberp process-result)
2854
 
#          (= process-result 0))))
2855
 
# get-command-line:
2856
 
# (lambda (&optional extra)
2857
 
#   (let ((quoted-script
2858
 
#          (shell-quote-argument (funcall get-script-name))))
2859
 
#     (format
2860
 
#      (concat "%s --check" (if extra " --prefix=atest" ""))
2861
 
#      quoted-script)))
2862
 
# get-script-name:
2863
 
# (lambda ()
2864
 
#   (if (fboundp 'file-local-name)
2865
 
#       (file-local-name (buffer-file-name))
2866
 
#     (or (file-remote-p (buffer-file-name) 'localname)
2867
 
#         (buffer-file-name))))
2868
 
# remove-test-window:
2869
 
# (lambda ()
2870
 
#   (let ((test-window (get-buffer-window "*Test*")))
2871
 
#     (if test-window (delete-window test-window))))
2872
 
# show-test-buffer-in-test-window:
2873
 
# (lambda ()
2874
 
#   (when (not (get-buffer-window-list "*Test*"))
2875
 
#     (setq next-error-last-buffer (get-buffer "*Test*"))
2876
 
#     (let* ((side (if (>= (window-width) 146) 'right 'bottom))
2877
 
#            (display-buffer-overriding-action
2878
 
#             `((display-buffer-in-side-window) (side . ,side)
2879
 
#               (window-height . fit-window-to-buffer)
2880
 
#               (window-width . fit-window-to-buffer))))
2881
 
#       (display-buffer "*Test*"))))
2882
 
# eval:
2883
 
# (progn
2884
 
#   (let* ((run-extra-tests (lambda () (interactive)
2885
 
#                             (funcall run-tests t)))
2886
 
#          (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
2887
 
#          (outer-keymap `(keymap (3 . ,inner-keymap))))     ; C-c
2888
 
#     (setq minor-mode-overriding-map-alist
2889
 
#           (cons `(run-tests . ,outer-keymap)
2890
 
#                 minor-mode-overriding-map-alist)))
2891
 
#   (add-hook 'after-save-hook run-tests 90 t))
2892
 
# End: