/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2024-09-12 18:31:12 UTC
  • mfrom: (237.4.142 release)
  • Revision ID: teddy@recompile.se-20240912183112-xxy0akg67c0ky0t8
Merge from release branch

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/python
2
 
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
3
 
#
4
 
# Mandos Monitor - Control and monitor the Mandos server
5
 
#
6
 
# Copyright © 2008-2019 Teddy Hogeborn
7
 
# Copyright © 2008-2019 Björn Påhlsson
 
1
#!/usr/bin/python3 -bbI
 
2
# -*- coding: utf-8; lexical-binding: t -*-
 
3
#
 
4
# Mandos Control - Control or query the Mandos server
 
5
#
 
6
# Copyright © 2008-2022 Teddy Hogeborn
 
7
# Copyright © 2008-2022 Björn Påhlsson
8
8
#
9
9
# This file is part of Mandos.
10
10
#
23
23
#
24
24
# Contact the authors at <mandos@recompile.se>.
25
25
#
26
 
 
27
26
from __future__ import (division, absolute_import, print_function,
28
27
                        unicode_literals)
29
28
 
33
32
    pass
34
33
 
35
34
import sys
 
35
import unittest
36
36
import argparse
 
37
import logging
 
38
import os
37
39
import locale
38
40
import datetime
39
41
import re
40
 
import os
41
42
import collections
42
43
import json
43
 
import unittest
44
 
import logging
45
44
import io
46
45
import tempfile
47
46
import contextlib
48
47
 
 
48
if sys.version_info.major == 2:
 
49
    __metaclass__ = type
 
50
    str = unicode
 
51
    input = raw_input
 
52
 
 
53
 
 
54
class gi:
 
55
    """Dummy gi module, for the tests"""
 
56
    class repository:
 
57
        class GLib:
 
58
            class Error(Exception):
 
59
                pass
 
60
 
 
61
 
 
62
dbussy = None
 
63
ravel = None
 
64
dbus_python = None
 
65
pydbus = None
 
66
 
49
67
try:
50
 
    import pydbus
51
 
    import gi
52
 
    dbus_python = None
 
68
    import dbussy
 
69
    import ravel
53
70
except ImportError:
54
 
    import dbus as dbus_python
55
 
    pydbus = None
56
 
    class gi(object):
57
 
        """Dummy gi module, for the tests"""
58
 
        class repository(object):
59
 
            class GLib(object):
60
 
                class Error(Exception):
61
 
                    pass
 
71
    try:
 
72
        import pydbus
 
73
        import gi
 
74
    except ImportError:
 
75
        import dbus as dbus_python
 
76
 
62
77
 
63
78
# Show warnings by default
64
79
if not sys.warnoptions:
65
80
    import warnings
66
81
    warnings.simplefilter("default")
67
82
 
68
 
log = logging.getLogger(sys.argv[0])
69
 
logging.basicConfig(level="INFO", # Show info level messages
 
83
log = logging.getLogger(os.path.basename(sys.argv[0]))
 
84
logging.basicConfig(level="INFO",         # Show info level messages
70
85
                    format="%(message)s") # Show basic log messages
71
86
 
72
87
logging.captureWarnings(True)   # Show warnings via the logging system
73
88
 
74
89
if sys.version_info.major == 2:
75
 
    str = unicode
76
90
    import StringIO
77
91
    io.StringIO = StringIO.StringIO
78
92
 
79
93
locale.setlocale(locale.LC_ALL, "")
80
94
 
81
 
version = "1.8.3"
 
95
version = "1.8.17"
82
96
 
83
97
 
84
98
def main():
91
105
    clientnames = options.client
92
106
 
93
107
    if options.debug:
94
 
        log.setLevel(logging.DEBUG)
 
108
        logging.getLogger("").setLevel(logging.DEBUG)
95
109
 
96
 
    if pydbus is not None:
 
110
    if dbussy is not None and ravel is not None:
 
111
        bus = dbussy_adapter.CachingBus(dbussy, ravel)
 
112
    elif pydbus is not None:
97
113
        bus = pydbus_adapter.CachingBus(pydbus)
98
114
    else:
99
115
        bus = dbus_python_adapter.CachingBus(dbus_python)
243
259
        return rfc3339_duration_to_delta(interval)
244
260
    except ValueError as e:
245
261
        log.warning("%s - Parsing as pre-1.6.1 interval instead",
246
 
                    ' '.join(e.args))
 
262
                    " ".join(e.args))
247
263
    return parse_pre_1_6_1_interval(interval)
248
264
 
249
265
 
250
266
def rfc3339_duration_to_delta(duration):
251
267
    """Parse an RFC 3339 "duration" and return a datetime.timedelta
252
268
 
253
 
    >>> rfc3339_duration_to_delta("P7D")
254
 
    datetime.timedelta(7)
255
 
    >>> rfc3339_duration_to_delta("PT60S")
256
 
    datetime.timedelta(0, 60)
257
 
    >>> rfc3339_duration_to_delta("PT60M")
258
 
    datetime.timedelta(0, 3600)
259
 
    >>> rfc3339_duration_to_delta("P60M")
260
 
    datetime.timedelta(1680)
261
 
    >>> rfc3339_duration_to_delta("PT24H")
262
 
    datetime.timedelta(1)
263
 
    >>> rfc3339_duration_to_delta("P1W")
264
 
    datetime.timedelta(7)
265
 
    >>> rfc3339_duration_to_delta("PT5M30S")
266
 
    datetime.timedelta(0, 330)
267
 
    >>> rfc3339_duration_to_delta("P1DT3M20S")
268
 
    datetime.timedelta(1, 200)
 
269
    >>> rfc3339_duration_to_delta("P7D") == datetime.timedelta(7)
 
270
    True
 
271
    >>> rfc3339_duration_to_delta("PT60S") == datetime.timedelta(0, 60)
 
272
    True
 
273
    >>> rfc3339_duration_to_delta("PT60M") == datetime.timedelta(hours=1)
 
274
    True
 
275
    >>> # 60 months
 
276
    >>> rfc3339_duration_to_delta("P60M") == datetime.timedelta(1680)
 
277
    True
 
278
    >>> rfc3339_duration_to_delta("PT24H") == datetime.timedelta(1)
 
279
    True
 
280
    >>> rfc3339_duration_to_delta("P1W") == datetime.timedelta(7)
 
281
    True
 
282
    >>> rfc3339_duration_to_delta("PT5M30S") == datetime.timedelta(0, 330)
 
283
    True
 
284
    >>> rfc3339_duration_to_delta("P1DT3M20S") == datetime.timedelta(1, 200)
 
285
    True
269
286
    >>> # Can not be empty:
270
287
    >>> rfc3339_duration_to_delta("")
271
288
    Traceback (most recent call last):
378
395
 
379
396
 
380
397
def parse_pre_1_6_1_interval(interval):
381
 
    """Parse an interval string as documented by Mandos before 1.6.1,
 
398
    r"""Parse an interval string as documented by Mandos before 1.6.1,
382
399
    and return a datetime.timedelta
383
400
 
384
 
    >>> parse_pre_1_6_1_interval('7d')
385
 
    datetime.timedelta(7)
386
 
    >>> parse_pre_1_6_1_interval('60s')
387
 
    datetime.timedelta(0, 60)
388
 
    >>> parse_pre_1_6_1_interval('60m')
389
 
    datetime.timedelta(0, 3600)
390
 
    >>> parse_pre_1_6_1_interval('24h')
391
 
    datetime.timedelta(1)
392
 
    >>> parse_pre_1_6_1_interval('1w')
393
 
    datetime.timedelta(7)
394
 
    >>> parse_pre_1_6_1_interval('5m 30s')
395
 
    datetime.timedelta(0, 330)
396
 
    >>> parse_pre_1_6_1_interval('')
397
 
    datetime.timedelta(0)
 
401
    >>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7)
 
402
    True
 
403
    >>> parse_pre_1_6_1_interval("60s") == datetime.timedelta(0, 60)
 
404
    True
 
405
    >>> parse_pre_1_6_1_interval("60m") == datetime.timedelta(hours=1)
 
406
    True
 
407
    >>> parse_pre_1_6_1_interval("24h") == datetime.timedelta(days=1)
 
408
    True
 
409
    >>> parse_pre_1_6_1_interval("1w") == datetime.timedelta(days=7)
 
410
    True
 
411
    >>> parse_pre_1_6_1_interval("5m 30s") == datetime.timedelta(0, 330)
 
412
    True
 
413
    >>> parse_pre_1_6_1_interval("") == datetime.timedelta(0)
 
414
    True
398
415
    >>> # Ignore unknown characters, allow any order and repetitions
399
 
    >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
400
 
    datetime.timedelta(2, 480, 18000)
 
416
    >>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") \
 
417
    ... == datetime.timedelta(2, 480, 18000)
 
418
    True
401
419
 
402
420
    """
403
421
 
466
484
        parser.error("--remove can only be combined with --deny")
467
485
 
468
486
 
469
 
class dbus(object):
 
487
class dbus:
470
488
 
471
 
    class SystemBus(object):
 
489
    class SystemBus:
472
490
 
473
491
        object_manager_iface = "org.freedesktop.DBus.ObjectManager"
 
492
 
474
493
        def get_managed_objects(self, busname, objectpath):
475
494
            return self.call_method("GetManagedObjects", busname,
476
495
                                    objectpath,
477
496
                                    self.object_manager_iface)
478
497
 
479
498
        properties_iface = "org.freedesktop.DBus.Properties"
 
499
 
480
500
        def set_property(self, busname, objectpath, interface, key,
481
501
                         value):
482
502
            self.call_method("Set", busname, objectpath,
483
503
                             self.properties_iface, interface, key,
484
504
                             value)
485
505
 
 
506
        def call_method(self, methodname, busname, objectpath,
 
507
                        interface, *args):
 
508
            raise NotImplementedError()
486
509
 
487
510
    class MandosBus(SystemBus):
488
511
        busname_domain = "se.recompile"
520
543
        pass
521
544
 
522
545
 
523
 
class dbus_python_adapter(object):
 
546
class dbus_python_adapter:
524
547
 
525
548
    class SystemBus(dbus.MandosBus):
526
549
        """Use dbus-python"""
571
594
                        for key, subval in value.items()}
572
595
            return value
573
596
 
 
597
        def set_client_property(self, objectpath, key, value):
 
598
            if key == "Secret":
 
599
                if not isinstance(value, bytes):
 
600
                    value = value.encode("utf-8")
 
601
                value = self.dbus_python.ByteArray(value)
 
602
            return self.set_property(self.busname, objectpath,
 
603
                                     self.client_interface, key,
 
604
                                     value)
574
605
 
575
 
    class SilenceLogger(object):
 
606
    class SilenceLogger:
576
607
        "Simple context manager to silence a particular logger"
 
608
 
577
609
        def __init__(self, loggername):
578
610
            self.logger = logging.getLogger(loggername)
579
611
 
589
621
        def __exit__(self, exc_type, exc_val, exc_tb):
590
622
            self.logger.removeFilter(self.nullfilter)
591
623
 
592
 
 
593
624
    class CachingBus(SystemBus):
594
625
        """A caching layer for dbus_python_adapter.SystemBus"""
 
626
 
595
627
        def __init__(self, *args, **kwargs):
596
628
            self.object_cache = {}
597
629
            super(dbus_python_adapter.CachingBus,
598
630
                  self).__init__(*args, **kwargs)
 
631
 
599
632
        def get_object(self, busname, objectpath):
600
633
            try:
601
634
                return self.object_cache[(busname, objectpath)]
603
636
                new_object = super(
604
637
                    dbus_python_adapter.CachingBus,
605
638
                    self).get_object(busname, objectpath)
606
 
                self.object_cache[(busname, objectpath)]  = new_object
 
639
                self.object_cache[(busname, objectpath)] = new_object
607
640
                return new_object
608
641
 
609
642
 
610
 
class pydbus_adapter(object):
 
643
class pydbus_adapter:
611
644
    class SystemBus(dbus.MandosBus):
612
645
        def __init__(self, module=pydbus):
613
646
            self.pydbus = module
656
689
 
657
690
    class CachingBus(SystemBus):
658
691
        """A caching layer for pydbus_adapter.SystemBus"""
 
692
 
659
693
        def __init__(self, *args, **kwargs):
660
694
            self.object_cache = {}
661
695
            super(pydbus_adapter.CachingBus,
662
696
                  self).__init__(*args, **kwargs)
 
697
 
663
698
        def get(self, busname, objectpath):
664
699
            try:
665
700
                return self.object_cache[(busname, objectpath)]
666
701
            except KeyError:
667
702
                new_object = (super(pydbus_adapter.CachingBus, self)
668
703
                              .get(busname, objectpath))
669
 
                self.object_cache[(busname, objectpath)]  = new_object
 
704
                self.object_cache[(busname, objectpath)] = new_object
 
705
                return new_object
 
706
 
 
707
 
 
708
class dbussy_adapter:
 
709
    class SystemBus(dbus.SystemBus):
 
710
        """Use DBussy"""
 
711
 
 
712
        def __init__(self, dbussy, ravel):
 
713
            self.dbussy = dbussy
 
714
            self.ravel = ravel
 
715
            self.bus = ravel.system_bus()
 
716
 
 
717
        @contextlib.contextmanager
 
718
        def convert_exception(self, exception_class=dbus.Error):
 
719
            try:
 
720
                yield
 
721
            except self.dbussy.DBusError as e:
 
722
                # This does what "raise from" would do
 
723
                exc = exception_class(*e.args)
 
724
                exc.__cause__ = e
 
725
                raise exc
 
726
 
 
727
        def call_method(self, methodname, busname, objectpath,
 
728
                        interface, *args):
 
729
            proxy_object = self.get_object(busname, objectpath)
 
730
            log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
 
731
                      interface, methodname,
 
732
                      ", ".join(repr(a) for a in args))
 
733
            iface = proxy_object.get_interface(interface)
 
734
            method = getattr(iface, methodname)
 
735
            with self.convert_exception(dbus.Error):
 
736
                value = method(*args)
 
737
            # DBussy returns values either as an empty list or as a
 
738
            # list of one element with the return value
 
739
            if value:
 
740
                return self.type_filter(value[0])
 
741
 
 
742
        def get_object(self, busname, objectpath):
 
743
            log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
 
744
                      busname, objectpath)
 
745
            with self.convert_exception(dbus.ConnectFailed):
 
746
                return self.bus[busname][objectpath]
 
747
 
 
748
        def type_filter(self, value):
 
749
            """Convert the most bothersome types to Python types"""
 
750
            # A D-Bus Variant value is represented as the Python type
 
751
            # Tuple[dbussy.DBUS.Signature, Any]
 
752
            if isinstance(value, tuple):
 
753
                if (len(value) == 2
 
754
                    and isinstance(value[0],
 
755
                                   self.dbussy.DBUS.Signature)):
 
756
                    return self.type_filter(value[1])
 
757
            elif isinstance(value, self.dbussy.DBUS.ObjectPath):
 
758
                return str(value)
 
759
            # Also recurse into dictionaries
 
760
            elif isinstance(value, dict):
 
761
                return {self.type_filter(key):
 
762
                        self.type_filter(subval)
 
763
                        for key, subval in value.items()}
 
764
            return value
 
765
 
 
766
        def set_property(self, busname, objectpath, interface, key,
 
767
                         value):
 
768
            proxy_object = self.get_object(busname, objectpath)
 
769
            log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
 
770
                      objectpath, self.properties_iface, interface,
 
771
                      key, value)
 
772
            if key == "Secret":
 
773
                # DBussy wants a Byte Array to be a sequence of
 
774
                # values, not a byte string
 
775
                value = tuple(value)
 
776
            setattr(proxy_object.get_interface(interface), key, value)
 
777
 
 
778
    class MandosBus(SystemBus, dbus.MandosBus):
 
779
        pass
 
780
 
 
781
    class CachingBus(MandosBus):
 
782
        """A caching layer for dbussy_adapter.MandosBus"""
 
783
 
 
784
        def __init__(self, *args, **kwargs):
 
785
            self.object_cache = {}
 
786
            super(dbussy_adapter.CachingBus, self).__init__(*args,
 
787
                                                            **kwargs)
 
788
 
 
789
        def get_object(self, busname, objectpath):
 
790
            try:
 
791
                return self.object_cache[(busname, objectpath)]
 
792
            except KeyError:
 
793
                new_object = super(
 
794
                    dbussy_adapter.CachingBus,
 
795
                    self).get_object(busname, objectpath)
 
796
                self.object_cache[(busname, objectpath)] = new_object
670
797
                return new_object
671
798
 
672
799
 
703
830
    return commands
704
831
 
705
832
 
706
 
class command(object):
 
833
class command:
707
834
    """A namespace for command classes"""
708
835
 
709
 
    class Base(object):
 
836
    class Base:
710
837
        """Abstract base class for commands"""
 
838
 
711
839
        def run(self, clients, bus=None):
712
840
            """Normal commands should implement run_on_one_client(),
713
841
but commands which want to operate on all clients at the same time can
717
845
            for client, properties in clients.items():
718
846
                self.run_on_one_client(client, properties)
719
847
 
720
 
 
721
848
    class IsEnabled(Base):
722
849
        def run(self, clients, bus=None):
723
850
            properties = next(iter(clients.values()))
725
852
                sys.exit(0)
726
853
            sys.exit(1)
727
854
 
728
 
 
729
855
    class Approve(Base):
730
856
        def run_on_one_client(self, client, properties):
731
857
            self.bus.call_client_method(client, "Approve", True)
732
858
 
733
 
 
734
859
    class Deny(Base):
735
860
        def run_on_one_client(self, client, properties):
736
861
            self.bus.call_client_method(client, "Approve", False)
737
862
 
738
 
 
739
863
    class Remove(Base):
740
864
        def run(self, clients, bus):
741
865
            for clientpath in frozenset(clients.keys()):
742
866
                bus.call_server_method("RemoveClient", clientpath)
743
867
 
744
 
 
745
868
    class Output(Base):
746
869
        """Abstract class for commands outputting client details"""
747
870
        all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
753
876
                        "Checker", "ExtendedTimeout", "Expires",
754
877
                        "LastCheckerStatus")
755
878
 
756
 
 
757
879
    class DumpJSON(Output):
758
880
        def run(self, clients, bus=None):
759
881
            data = {properties["Name"]:
760
882
                    {key: properties[key]
761
883
                     for key in self.all_keywords}
762
884
                    for properties in clients.values()}
763
 
            print(json.dumps(data, indent=4, separators=(',', ': ')))
764
 
 
 
885
            print(json.dumps(data, indent=4, separators=(",", ": ")))
765
886
 
766
887
    class PrintTable(Output):
767
888
        def __init__(self, verbose=False):
775
896
                keywords = self.all_keywords
776
897
            print(self.TableOfClients(clients.values(), keywords))
777
898
 
778
 
        class TableOfClients(object):
 
899
        class TableOfClients:
779
900
            tableheaders = {
780
901
                "Name": "Name",
781
902
                "Enabled": "Enabled",
808
929
 
809
930
            if sys.version_info.major == 2:
810
931
                __unicode__ = __str__
 
932
 
811
933
                def __str__(self):
812
934
                    return str(self).encode(
813
935
                        locale.getpreferredencoding())
859
981
                                minutes=(td.seconds % 3600) // 60,
860
982
                                seconds=td.seconds % 60))
861
983
 
862
 
 
863
984
    class PropertySetter(Base):
864
985
        "Abstract class for Actions for setting one client property"
865
986
 
872
993
        def propname(self):
873
994
            raise NotImplementedError()
874
995
 
875
 
 
876
996
    class Enable(PropertySetter):
877
997
        propname = "Enabled"
878
998
        value_to_set = True
879
999
 
880
 
 
881
1000
    class Disable(PropertySetter):
882
1001
        propname = "Enabled"
883
1002
        value_to_set = False
884
1003
 
885
 
 
886
1004
    class BumpTimeout(PropertySetter):
887
1005
        propname = "LastCheckedOK"
888
1006
        value_to_set = ""
889
1007
 
890
 
 
891
1008
    class StartChecker(PropertySetter):
892
1009
        propname = "CheckerRunning"
893
1010
        value_to_set = True
894
1011
 
895
 
 
896
1012
    class StopChecker(PropertySetter):
897
1013
        propname = "CheckerRunning"
898
1014
        value_to_set = False
899
1015
 
900
 
 
901
1016
    class ApproveByDefault(PropertySetter):
902
1017
        propname = "ApprovedByDefault"
903
1018
        value_to_set = True
904
1019
 
905
 
 
906
1020
    class DenyByDefault(PropertySetter):
907
1021
        propname = "ApprovedByDefault"
908
1022
        value_to_set = False
909
1023
 
910
 
 
911
1024
    class PropertySetterValue(PropertySetter):
912
1025
        """Abstract class for PropertySetter recieving a value as
913
1026
constructor argument instead of a class attribute."""
 
1027
 
914
1028
        def __init__(self, value):
915
1029
            self.value_to_set = value
916
1030
 
923
1037
    class SetChecker(PropertySetterValue):
924
1038
        propname = "Checker"
925
1039
 
926
 
 
927
1040
    class SetHost(PropertySetterValue):
928
1041
        propname = "Host"
929
1042
 
930
 
 
931
1043
    class SetSecret(PropertySetterValue):
932
1044
        propname = "Secret"
933
1045
 
941
1053
            self._vts = value.read()
942
1054
            value.close()
943
1055
 
944
 
 
945
1056
    class PropertySetterValueMilliseconds(PropertySetterValue):
946
1057
        """Abstract class for PropertySetterValue taking a value
947
1058
argument as a datetime.timedelta() but should store it as
956
1067
            "When setting, convert value from a datetime.timedelta"
957
1068
            self._vts = int(round(value.total_seconds() * 1000))
958
1069
 
959
 
 
960
1070
    class SetTimeout(PropertySetterValueMilliseconds):
961
1071
        propname = "Timeout"
962
1072
 
963
 
 
964
1073
    class SetExtendedTimeout(PropertySetterValueMilliseconds):
965
1074
        propname = "ExtendedTimeout"
966
1075
 
967
 
 
968
1076
    class SetInterval(PropertySetterValueMilliseconds):
969
1077
        propname = "Interval"
970
1078
 
971
 
 
972
1079
    class SetApprovalDelay(PropertySetterValueMilliseconds):
973
1080
        propname = "ApprovalDelay"
974
1081
 
975
 
 
976
1082
    class SetApprovalDuration(PropertySetterValueMilliseconds):
977
1083
        propname = "ApprovalDuration"
978
1084
 
1012
1118
                                                     "output"))
1013
1119
 
1014
1120
 
1015
 
class Unique(object):
 
1121
class Unique:
1016
1122
    """Class for objects which exist only to be unique objects, since
1017
1123
unittest.mock.sentinel only exists in Python 3.3"""
1018
1124
 
1302
1408
class Test_dbus_python_adapter_SystemBus(TestCaseWithAssertLogs):
1303
1409
 
1304
1410
    def MockDBusPython_func(self, func):
1305
 
        class mock_dbus_python(object):
 
1411
        class mock_dbus_python:
1306
1412
            """mock dbus-python module"""
1307
 
            class exceptions(object):
 
1413
            class exceptions:
1308
1414
                """Pseudo-namespace"""
1309
1415
                class DBusException(Exception):
1310
1416
                    pass
1311
 
            class SystemBus(object):
 
1417
            class SystemBus:
1312
1418
                @staticmethod
1313
1419
                def get_object(busname, objectpath):
1314
1420
                    DBusObject = collections.namedtuple(
1315
 
                        "DBusObject", ("methodname",))
 
1421
                        "DBusObject", ("methodname", "Set"))
1316
1422
                    def method(*args, **kwargs):
1317
1423
                        self.assertEqual({"dbus_interface":
1318
1424
                                          "interface"},
1319
1425
                                         kwargs)
1320
1426
                        return func(*args)
1321
 
                    return DBusObject(methodname=method)
1322
 
            class Boolean(object):
 
1427
                    def set_property(interface, key, value,
 
1428
                                     dbus_interface=None):
 
1429
                        self.assertEqual(
 
1430
                            "org.freedesktop.DBus.Properties",
 
1431
                            dbus_interface)
 
1432
                        self.assertEqual("Secret", key)
 
1433
                        return func(interface, key, value,
 
1434
                                    dbus_interface=dbus_interface)
 
1435
                    return DBusObject(methodname=method,
 
1436
                                      Set=set_property)
 
1437
            class Boolean:
1323
1438
                def __init__(self, value):
1324
1439
                    self.value = bool(value)
1325
1440
                def __bool__(self):
1330
1445
                pass
1331
1446
            class Dictionary(dict):
1332
1447
                pass
 
1448
            class ByteArray(bytes):
 
1449
                pass
1333
1450
        return mock_dbus_python
1334
1451
 
1335
1452
    def call_method(self, bus, methodname, busname, objectpath,
1507
1624
        finally:
1508
1625
            dbus_logger.removeFilter(counting_handler)
1509
1626
 
1510
 
        self.assertNotIsInstance(e, dbus.ConnectFailed)
 
1627
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
1511
1628
 
1512
1629
        # Make sure the dbus logger was suppressed
1513
1630
        self.assertEqual(0, counting_handler.count)
1514
1631
 
 
1632
    def test_Set_Secret_sends_bytearray(self):
 
1633
        ret = [None]
 
1634
        def func(*args, **kwargs):
 
1635
            ret[0] = (args, kwargs)
 
1636
        mock_dbus_python = self.MockDBusPython_func(func)
 
1637
        bus = dbus_python_adapter.SystemBus(mock_dbus_python)
 
1638
        bus.set_client_property("objectpath", "Secret", "value")
 
1639
        expected_call = (("se.recompile.Mandos.Client", "Secret",
 
1640
                          mock_dbus_python.ByteArray(b"value")),
 
1641
                         {"dbus_interface":
 
1642
                          "org.freedesktop.DBus.Properties"})
 
1643
        self.assertEqual(expected_call, ret[0])
 
1644
        if sys.version_info.major == 2:
 
1645
            self.assertIsInstance(ret[0][0][-1],
 
1646
                                  mock_dbus_python.ByteArray)
 
1647
 
1515
1648
    def test_get_object_converts_to_correct_exception(self):
1516
1649
        bus = dbus_python_adapter.SystemBus(
1517
1650
            self.fake_dbus_python_raises_exception_on_connect)
1519
1652
            self.call_method(bus, "methodname", "busname",
1520
1653
                             "objectpath", "interface")
1521
1654
 
1522
 
    class fake_dbus_python_raises_exception_on_connect(object):
 
1655
    class fake_dbus_python_raises_exception_on_connect:
1523
1656
        """fake dbus-python module"""
1524
 
        class exceptions(object):
 
1657
        class exceptions:
1525
1658
            """Pseudo-namespace"""
1526
1659
            class DBusException(Exception):
1527
1660
                pass
1535
1668
 
1536
1669
 
1537
1670
class Test_dbus_python_adapter_CachingBus(unittest.TestCase):
1538
 
    class mock_dbus_python(object):
 
1671
    class mock_dbus_python:
1539
1672
        """mock dbus-python modules"""
1540
 
        class SystemBus(object):
 
1673
        class SystemBus:
1541
1674
            @staticmethod
1542
1675
            def get_object(busname, objectpath):
1543
1676
                return Unique()
1589
1722
class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs):
1590
1723
 
1591
1724
    def Stub_pydbus_func(self, func):
1592
 
        class stub_pydbus(object):
 
1725
        class stub_pydbus:
1593
1726
            """stub pydbus module"""
1594
 
            class SystemBus(object):
 
1727
            class SystemBus:
1595
1728
                @staticmethod
1596
1729
                def get(busname, objectpath):
1597
1730
                    DBusObject = collections.namedtuple(
1622
1755
        self.assertIs(ret, expected_method_return)
1623
1756
 
1624
1757
    def test_call_method_handles_exception(self):
1625
 
        dbus_logger = logging.getLogger("dbus.proxies")
1626
 
 
1627
1758
        def func():
1628
1759
            raise gi.repository.GLib.Error()
1629
1760
 
1634
1765
            self.call_method(bus, "methodname", "busname",
1635
1766
                             "objectpath", "interface")
1636
1767
 
1637
 
        self.assertNotIsInstance(e, dbus.ConnectFailed)
 
1768
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
1638
1769
 
1639
1770
    def test_get_converts_to_correct_exception(self):
1640
1771
        bus = pydbus_adapter.SystemBus(
1643
1774
            self.call_method(bus, "methodname", "busname",
1644
1775
                             "objectpath", "interface")
1645
1776
 
1646
 
    class fake_pydbus_raises_exception_on_connect(object):
 
1777
    class fake_pydbus_raises_exception_on_connect:
1647
1778
        """fake dbus-python module"""
1648
1779
        @classmethod
1649
1780
        def SystemBus(cls):
1653
1784
            return Bus(get=get)
1654
1785
 
1655
1786
    def test_set_property_uses_setattr(self):
1656
 
        class Object(object):
 
1787
        class Object:
1657
1788
            pass
1658
1789
        obj = Object()
1659
 
        class pydbus_spy(object):
1660
 
            class SystemBus(object):
 
1790
        class pydbus_spy:
 
1791
            class SystemBus:
1661
1792
                @staticmethod
1662
1793
                def get(busname, objectpath):
1663
1794
                    return {"interface": obj}
1670
1801
    def test_get_suppresses_xml_deprecation_warning(self):
1671
1802
        if sys.version_info.major >= 3:
1672
1803
            return
1673
 
        class stub_pydbus_get(object):
1674
 
            class SystemBus(object):
 
1804
        class stub_pydbus_get:
 
1805
            class SystemBus:
1675
1806
                @staticmethod
1676
1807
                def get(busname, objectpath):
1677
1808
                    warnings.warn_explicit(
1685
1816
 
1686
1817
 
1687
1818
class Test_pydbus_adapter_CachingBus(unittest.TestCase):
1688
 
    class stub_pydbus(object):
 
1819
    class stub_pydbus:
1689
1820
        """stub pydbus module"""
1690
 
        class SystemBus(object):
 
1821
        class SystemBus:
1691
1822
            @staticmethod
1692
1823
            def get(busname, objectpath):
1693
1824
                return Unique()
1735
1866
        self.assertIs(obj1, obj1b)
1736
1867
 
1737
1868
 
 
1869
class Test_dbussy_adapter_SystemBus(TestCaseWithAssertLogs):
 
1870
 
 
1871
    class dummy_dbussy:
 
1872
        class DBUS:
 
1873
            class ObjectPath(str):
 
1874
                pass
 
1875
        class DBusError(Exception):
 
1876
            pass
 
1877
 
 
1878
    def fake_ravel_func(self, func):
 
1879
        class fake_ravel:
 
1880
            @staticmethod
 
1881
            def system_bus():
 
1882
                class DBusInterfaceProxy:
 
1883
                    @staticmethod
 
1884
                    def methodname(*args):
 
1885
                        return [func(*args)]
 
1886
                class DBusObject:
 
1887
                    @staticmethod
 
1888
                    def get_interface(interface):
 
1889
                        if interface == "interface":
 
1890
                            return DBusInterfaceProxy()
 
1891
                return {"busname": {"objectpath": DBusObject()}}
 
1892
        return fake_ravel
 
1893
 
 
1894
    def call_method(self, bus, methodname, busname, objectpath,
 
1895
                    interface, *args):
 
1896
        with self.assertLogs(log, logging.DEBUG):
 
1897
            return bus.call_method(methodname, busname, objectpath,
 
1898
                                   interface, *args)
 
1899
 
 
1900
    def test_call_method_returns(self):
 
1901
        expected_method_return = Unique()
 
1902
        method_args = (Unique(), Unique())
 
1903
        def func(*args):
 
1904
            self.assertEqual(len(method_args), len(args))
 
1905
            for marg, arg in zip(method_args, args):
 
1906
                self.assertIs(marg, arg)
 
1907
            return expected_method_return
 
1908
        fake_ravel = self.fake_ravel_func(func)
 
1909
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1910
        ret = self.call_method(bus, "methodname", "busname",
 
1911
                               "objectpath", "interface",
 
1912
                               *method_args)
 
1913
        self.assertIs(ret, expected_method_return)
 
1914
 
 
1915
    def test_call_method_filters_objectpath(self):
 
1916
        def func():
 
1917
            return method_return
 
1918
        fake_ravel = self.fake_ravel_func(func)
 
1919
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1920
        method_return = (self.dummy_dbussy.DBUS
 
1921
                         .ObjectPath("objectpath"))
 
1922
        ret = self.call_method(bus, "methodname", "busname",
 
1923
                               "objectpath", "interface")
 
1924
        self.assertEqual("objectpath", ret)
 
1925
        self.assertNotIsInstance(ret,
 
1926
                                 self.dummy_dbussy.DBUS.ObjectPath)
 
1927
 
 
1928
    def test_call_method_filters_objectpaths_in_dict(self):
 
1929
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
 
1930
        def func():
 
1931
            return method_return
 
1932
        fake_ravel = self.fake_ravel_func(func)
 
1933
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1934
        method_return = {
 
1935
            ObjectPath("objectpath_key_1"):
 
1936
            ObjectPath("objectpath_value_1"),
 
1937
            ObjectPath("objectpath_key_2"):
 
1938
            ObjectPath("objectpath_value_2"),
 
1939
        }
 
1940
        ret = self.call_method(bus, "methodname", "busname",
 
1941
                               "objectpath", "interface")
 
1942
        expected_method_return = {str(key): str(value)
 
1943
                                  for key, value in
 
1944
                                  method_return.items()}
 
1945
        for key, value in ret.items():
 
1946
            self.assertNotIsInstance(key, ObjectPath)
 
1947
            self.assertNotIsInstance(value, ObjectPath)
 
1948
        self.assertEqual(expected_method_return, ret)
 
1949
        self.assertIsInstance(ret, dict)
 
1950
 
 
1951
    def test_call_method_filters_objectpaths_in_dict_in_dict(self):
 
1952
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
 
1953
        def func():
 
1954
            return method_return
 
1955
        fake_ravel = self.fake_ravel_func(func)
 
1956
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1957
        method_return = {
 
1958
            ObjectPath("key1"): {
 
1959
                ObjectPath("key11"): ObjectPath("value11"),
 
1960
                ObjectPath("key12"): ObjectPath("value12"),
 
1961
            },
 
1962
            ObjectPath("key2"): {
 
1963
                ObjectPath("key21"): ObjectPath("value21"),
 
1964
                ObjectPath("key22"): ObjectPath("value22"),
 
1965
            },
 
1966
        }
 
1967
        ret = self.call_method(bus, "methodname", "busname",
 
1968
                               "objectpath", "interface")
 
1969
        expected_method_return = {
 
1970
            "key1": {"key11": "value11",
 
1971
                     "key12": "value12"},
 
1972
            "key2": {"key21": "value21",
 
1973
                     "key22": "value22"},
 
1974
        }
 
1975
        self.assertEqual(expected_method_return, ret)
 
1976
        for key, value in ret.items():
 
1977
            self.assertIsInstance(value, dict)
 
1978
            self.assertEqual(expected_method_return[key], value)
 
1979
            self.assertNotIsInstance(key, ObjectPath)
 
1980
            for inner_key, inner_value in value.items():
 
1981
                self.assertIsInstance(value, dict)
 
1982
                self.assertEqual(
 
1983
                    expected_method_return[key][inner_key],
 
1984
                    inner_value)
 
1985
                self.assertNotIsInstance(key, ObjectPath)
 
1986
 
 
1987
    def test_call_method_filters_objectpaths_in_dict_three_deep(self):
 
1988
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
 
1989
        def func():
 
1990
            return method_return
 
1991
        fake_ravel = self.fake_ravel_func(func)
 
1992
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1993
        method_return = {
 
1994
            ObjectPath("key1"): {
 
1995
                ObjectPath("key2"): {
 
1996
                    ObjectPath("key3"): ObjectPath("value"),
 
1997
                },
 
1998
            },
 
1999
        }
 
2000
        ret = self.call_method(bus, "methodname", "busname",
 
2001
                               "objectpath", "interface")
 
2002
        expected_method_return = {"key1": {"key2": {"key3": "value"}}}
 
2003
        self.assertEqual(expected_method_return, ret)
 
2004
        self.assertIsInstance(ret, dict)
 
2005
        self.assertNotIsInstance(next(iter(ret.keys())), ObjectPath)
 
2006
        self.assertIsInstance(ret["key1"], dict)
 
2007
        self.assertNotIsInstance(next(iter(ret["key1"].keys())),
 
2008
                                 ObjectPath)
 
2009
        self.assertIsInstance(ret["key1"]["key2"], dict)
 
2010
        self.assertNotIsInstance(
 
2011
            next(iter(ret["key1"]["key2"].keys())),
 
2012
            ObjectPath)
 
2013
        self.assertEqual("value", ret["key1"]["key2"]["key3"])
 
2014
        self.assertNotIsInstance(ret["key1"]["key2"]["key3"],
 
2015
                                 self.dummy_dbussy.DBUS.ObjectPath)
 
2016
 
 
2017
    def test_call_method_handles_exception(self):
 
2018
        def func():
 
2019
            raise self.dummy_dbussy.DBusError()
 
2020
 
 
2021
        fake_ravel = self.fake_ravel_func(func)
 
2022
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
2023
 
 
2024
        with self.assertRaises(dbus.Error) as e:
 
2025
            self.call_method(bus, "methodname", "busname",
 
2026
                             "objectpath", "interface")
 
2027
 
 
2028
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
 
2029
 
 
2030
    def test_get_object_converts_to_correct_exception(self):
 
2031
        class fake_ravel_raises_exception_on_connect:
 
2032
            @staticmethod
 
2033
            def system_bus():
 
2034
                class Bus:
 
2035
                    @staticmethod
 
2036
                    def __getitem__(key):
 
2037
                        if key == "objectpath":
 
2038
                            raise self.dummy_dbussy.DBusError()
 
2039
                        raise Exception(key)
 
2040
                return {"busname": Bus()}
 
2041
        def func():
 
2042
            raise self.dummy_dbussy.DBusError()
 
2043
        bus = dbussy_adapter.SystemBus(
 
2044
            self.dummy_dbussy,
 
2045
            fake_ravel_raises_exception_on_connect)
 
2046
        with self.assertRaises(dbus.ConnectFailed):
 
2047
            self.call_method(bus, "methodname", "busname",
 
2048
                             "objectpath", "interface")
 
2049
 
 
2050
 
1738
2051
class Test_commands_from_options(unittest.TestCase):
1739
2052
 
1740
2053
    def setUp(self):
1745
2058
        self.assert_command_from_args(["--is-enabled", "client"],
1746
2059
                                      command.IsEnabled)
1747
2060
 
1748
 
    def assert_command_from_args(self, args, command_cls,
1749
 
                                 **cmd_attrs):
 
2061
    def assert_command_from_args(self, args, command_cls, length=1,
 
2062
                                 clients=None, **cmd_attrs):
1750
2063
        """Assert that parsing ARGS should result in an instance of
1751
2064
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1752
2065
        options = self.parser.parse_args(args)
1753
2066
        check_option_syntax(self.parser, options)
1754
2067
        commands = commands_from_options(options)
1755
 
        self.assertEqual(1, len(commands))
1756
 
        command = commands[0]
1757
 
        self.assertIsInstance(command, command_cls)
 
2068
        self.assertEqual(length, len(commands))
 
2069
        for command in commands:
 
2070
            if isinstance(command, command_cls):
 
2071
                break
 
2072
        else:
 
2073
            self.assertIsInstance(command, command_cls)
 
2074
        if clients is not None:
 
2075
            self.assertEqual(clients, options.client)
1758
2076
        for key, value in cmd_attrs.items():
1759
2077
            self.assertEqual(value, getattr(command, key))
1760
2078
 
 
2079
    def assert_commands_from_args(self, args, commands, clients=None):
 
2080
        for cmd in commands:
 
2081
            self.assert_command_from_args(args, cmd,
 
2082
                                          length=len(commands),
 
2083
                                          clients=clients)
 
2084
 
1761
2085
    def test_is_enabled_short(self):
1762
2086
        self.assert_command_from_args(["-V", "client"],
1763
2087
                                      command.IsEnabled)
1954
2278
                                      verbose=True)
1955
2279
 
1956
2280
 
 
2281
    def test_manual_page_example_1(self):
 
2282
        self.assert_command_from_args("",
 
2283
                                      command.PrintTable,
 
2284
                                      clients=[],
 
2285
                                      verbose=False)
 
2286
 
 
2287
    def test_manual_page_example_2(self):
 
2288
        self.assert_command_from_args(
 
2289
            "--verbose foo1.example.org foo2.example.org".split(),
 
2290
            command.PrintTable, clients=["foo1.example.org",
 
2291
                                         "foo2.example.org"],
 
2292
            verbose=True)
 
2293
 
 
2294
    def test_manual_page_example_3(self):
 
2295
        self.assert_command_from_args("--enable --all".split(),
 
2296
                                      command.Enable,
 
2297
                                      clients=[])
 
2298
 
 
2299
    def test_manual_page_example_4(self):
 
2300
        self.assert_commands_from_args(
 
2301
            ("--timeout=PT5M --interval=PT1M foo1.example.org"
 
2302
             " foo2.example.org").split(),
 
2303
            [command.SetTimeout, command.SetInterval],
 
2304
            clients=["foo1.example.org", "foo2.example.org"])
 
2305
 
 
2306
    def test_manual_page_example_5(self):
 
2307
        self.assert_command_from_args("--approve --all".split(),
 
2308
                                      command.Approve,
 
2309
                                      clients=[])
 
2310
 
 
2311
 
1957
2312
class TestCommand(unittest.TestCase):
1958
2313
    """Abstract class for tests of command classes"""
1959
2314
 
2072
2427
        busname = "se.recompile.Mandos"
2073
2428
        client_interface = "se.recompile.Mandos.Client"
2074
2429
        command.Approve().run(self.bus.clients, self.bus)
 
2430
        self.assertTrue(self.bus.clients)
2075
2431
        for clientpath in self.bus.clients:
2076
2432
            self.assertIn(("Approve", busname, clientpath,
2077
2433
                           client_interface, (True,)), self.bus.calls)
2080
2436
        busname = "se.recompile.Mandos"
2081
2437
        client_interface = "se.recompile.Mandos.Client"
2082
2438
        command.Deny().run(self.bus.clients, self.bus)
 
2439
        self.assertTrue(self.bus.clients)
2083
2440
        for clientpath in self.bus.clients:
2084
2441
            self.assertIn(("Approve", busname, clientpath,
2085
2442
                           client_interface, (False,)),
2086
2443
                          self.bus.calls)
2087
2444
 
2088
2445
    def test_Remove(self):
 
2446
        busname = "se.recompile.Mandos"
 
2447
        server_path = "/"
 
2448
        server_interface = "se.recompile.Mandos"
 
2449
        orig_clients = self.bus.clients.copy()
2089
2450
        command.Remove().run(self.bus.clients, self.bus)
2090
 
        for clientpath in self.bus.clients:
2091
 
            self.assertIn(("RemoveClient", dbus_busname,
2092
 
                           dbus_server_path, dbus_server_interface,
 
2451
        self.assertFalse(self.bus.clients)
 
2452
        for clientpath in orig_clients:
 
2453
            self.assertIn(("RemoveClient", busname,
 
2454
                           server_path, server_interface,
2093
2455
                           (clientpath,)), self.bus.calls)
2094
2456
 
2095
2457
    expected_json = {
2297
2659
        else:
2298
2660
            cmd_args = [() for x in range(len(self.values_to_get))]
2299
2661
            values_to_get = self.values_to_get
 
2662
        self.assertTrue(values_to_get)
2300
2663
        for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2301
2664
            for clientpath in self.bus.clients:
2302
2665
                self.bus.clients[clientpath][self.propname] = (
2303
2666
                    Unique())
2304
2667
            self.command(*cmd_arg).run(self.bus.clients, self.bus)
 
2668
            self.assertTrue(self.bus.clients)
2305
2669
            for clientpath in self.bus.clients:
2306
2670
                value = (self.bus.clients[clientpath]
2307
2671
                         [self.propname])
2366
2730
class TestSetSecretCmd(TestPropertySetterCmd):
2367
2731
    command = command.SetSecret
2368
2732
    propname = "Secret"
2369
 
    values_to_set = [io.BytesIO(b""),
2370
 
                     io.BytesIO(b"secret\0xyzzy\nbar")]
2371
 
    values_to_get = [f.getvalue() for f in values_to_set]
 
2733
    def __init__(self, *args, **kwargs):
 
2734
        self.values_to_set = [io.BytesIO(b""),
 
2735
                              io.BytesIO(b"secret\0xyzzy\nbar")]
 
2736
        self.values_to_get = [f.getvalue() for f in
 
2737
                              self.values_to_set]
 
2738
        super(TestSetSecretCmd, self).__init__(*args, **kwargs)
2372
2739
 
2373
2740
 
2374
2741
class TestSetTimeoutCmd(TestPropertySetterCmd):
2427
2794
 
2428
2795
 
2429
2796
 
2430
 
def should_only_run_tests():
 
2797
def parse_test_args():
 
2798
    # type: () -> argparse.Namespace
2431
2799
    parser = argparse.ArgumentParser(add_help=False)
2432
 
    parser.add_argument("--check", action='store_true')
 
2800
    parser.add_argument("--check", action="store_true")
 
2801
    parser.add_argument("--prefix", )
2433
2802
    args, unknown_args = parser.parse_known_args()
2434
 
    run_tests = args.check
2435
 
    if run_tests:
2436
 
        # Remove --check argument from sys.argv
 
2803
    if args.check:
 
2804
        # Remove test options from sys.argv
2437
2805
        sys.argv[1:] = unknown_args
2438
 
    return run_tests
 
2806
    return args
2439
2807
 
2440
2808
# Add all tests from doctest strings
2441
2809
def load_tests(loader, tests, none):
2444
2812
    return tests
2445
2813
 
2446
2814
if __name__ == "__main__":
 
2815
    options = parse_test_args()
2447
2816
    try:
2448
 
        if should_only_run_tests():
2449
 
            # Call using ./tdd-python-script --check [--verbose]
2450
 
            unittest.main()
 
2817
        if options.check:
 
2818
            extra_test_prefix = options.prefix
 
2819
            if extra_test_prefix is not None:
 
2820
                if not (unittest.main(argv=[""], exit=False)
 
2821
                        .result.wasSuccessful()):
 
2822
                    sys.exit(1)
 
2823
                class ExtraTestLoader(unittest.TestLoader):
 
2824
                    testMethodPrefix = extra_test_prefix
 
2825
                # Call using ./scriptname --check [--verbose]
 
2826
                unittest.main(argv=[""], testLoader=ExtraTestLoader())
 
2827
            else:
 
2828
                unittest.main(argv=[""])
2451
2829
        else:
2452
2830
            main()
2453
2831
    finally:
2454
2832
        logging.shutdown()
 
2833
 
 
2834
# Local Variables:
 
2835
# run-tests:
 
2836
# (lambda (&optional extra)
 
2837
#   (if (not (funcall run-tests-in-test-buffer default-directory
 
2838
#             extra))
 
2839
#       (funcall show-test-buffer-in-test-window)
 
2840
#     (funcall remove-test-window)
 
2841
#     (if extra (message "Extra tests run successfully!"))))
 
2842
# run-tests-in-test-buffer:
 
2843
# (lambda (dir &optional extra)
 
2844
#   (with-current-buffer (get-buffer-create "*Test*")
 
2845
#     (setq buffer-read-only nil
 
2846
#           default-directory dir)
 
2847
#     (erase-buffer)
 
2848
#     (compilation-mode))
 
2849
#   (let ((process-result
 
2850
#          (let ((inhibit-read-only t))
 
2851
#            (process-file-shell-command
 
2852
#             (funcall get-command-line extra) nil "*Test*"))))
 
2853
#     (and (numberp process-result)
 
2854
#          (= process-result 0))))
 
2855
# get-command-line:
 
2856
# (lambda (&optional extra)
 
2857
#   (let ((quoted-script
 
2858
#          (shell-quote-argument (funcall get-script-name))))
 
2859
#     (format
 
2860
#      (concat "%s --check" (if extra " --prefix=atest" ""))
 
2861
#      quoted-script)))
 
2862
# get-script-name:
 
2863
# (lambda ()
 
2864
#   (if (fboundp 'file-local-name)
 
2865
#       (file-local-name (buffer-file-name))
 
2866
#     (or (file-remote-p (buffer-file-name) 'localname)
 
2867
#         (buffer-file-name))))
 
2868
# remove-test-window:
 
2869
# (lambda ()
 
2870
#   (let ((test-window (get-buffer-window "*Test*")))
 
2871
#     (if test-window (delete-window test-window))))
 
2872
# show-test-buffer-in-test-window:
 
2873
# (lambda ()
 
2874
#   (when (not (get-buffer-window-list "*Test*"))
 
2875
#     (setq next-error-last-buffer (get-buffer "*Test*"))
 
2876
#     (let* ((side (if (>= (window-width) 146) 'right 'bottom))
 
2877
#            (display-buffer-overriding-action
 
2878
#             `((display-buffer-in-side-window) (side . ,side)
 
2879
#               (window-height . fit-window-to-buffer)
 
2880
#               (window-width . fit-window-to-buffer))))
 
2881
#       (display-buffer "*Test*"))))
 
2882
# eval:
 
2883
# (progn
 
2884
#   (let* ((run-extra-tests (lambda () (interactive)
 
2885
#                             (funcall run-tests t)))
 
2886
#          (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
 
2887
#          (outer-keymap `(keymap (3 . ,inner-keymap))))     ; C-c
 
2888
#     (setq minor-mode-overriding-map-alist
 
2889
#           (cons `(run-tests . ,outer-keymap)
 
2890
#                 minor-mode-overriding-map-alist)))
 
2891
#   (add-hook 'after-save-hook run-tests 90 t))
 
2892
# End: