/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2024-09-08 07:15:13 UTC
  • mto: This revision was merged to the branch mainline in revision 410.
  • Revision ID: teddy@recompile.se-20240908071513-2jnx7ja8hlct42hl
Minor fix for manual Makefile uninstallations

If the Mandos systemd unit file was not installed, it was still
removed by the "purge-server" target.  If systemd is not installed,
this could mean removal of "mandos.service" from the root directory.

(Note: this was *not* used by the Debian package as a method of
uninstallation; this was only ever done by the Makefile if "make
purge-server" was called by hand.  And a "mandos.service" file was
presumably also unlikely to exist in the root directory.)

* Makefile (purge-server): Only remove systemd service file
  "mandos.service" if the same conditions exist which permitted its
  initial installation in the "install-server" target.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/python
2
 
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
3
 
#
4
 
# Mandos Monitor - Control and monitor the Mandos server
5
 
#
6
 
# Copyright © 2008-2019 Teddy Hogeborn
7
 
# Copyright © 2008-2019 Björn Påhlsson
 
1
#!/usr/bin/python3 -bbI
 
2
# -*- coding: utf-8; lexical-binding: t -*-
 
3
#
 
4
# Mandos Control - Control or query the Mandos server
 
5
#
 
6
# Copyright © 2008-2022 Teddy Hogeborn
 
7
# Copyright © 2008-2022 Björn Påhlsson
8
8
#
9
9
# This file is part of Mandos.
10
10
#
23
23
#
24
24
# Contact the authors at <mandos@recompile.se>.
25
25
#
26
 
 
27
26
from __future__ import (division, absolute_import, print_function,
28
27
                        unicode_literals)
29
28
 
33
32
    pass
34
33
 
35
34
import sys
 
35
import unittest
36
36
import argparse
 
37
import logging
 
38
import os
37
39
import locale
38
40
import datetime
39
41
import re
40
 
import os
41
42
import collections
42
43
import json
43
 
import unittest
44
 
import logging
45
44
import io
46
45
import tempfile
47
46
import contextlib
48
47
 
49
 
import dbus as dbus_python
 
48
if sys.version_info.major == 2:
 
49
    __metaclass__ = type
 
50
    str = unicode
 
51
    input = raw_input
 
52
 
 
53
 
 
54
class gi:
 
55
    """Dummy gi module, for the tests"""
 
56
    class repository:
 
57
        class GLib:
 
58
            class Error(Exception):
 
59
                pass
 
60
 
 
61
 
 
62
dbussy = None
 
63
ravel = None
 
64
dbus_python = None
 
65
pydbus = None
 
66
 
 
67
try:
 
68
    import dbussy
 
69
    import ravel
 
70
except ImportError:
 
71
    try:
 
72
        import pydbus
 
73
        import gi
 
74
    except ImportError:
 
75
        import dbus as dbus_python
 
76
 
50
77
 
51
78
# Show warnings by default
52
79
if not sys.warnoptions:
53
80
    import warnings
54
81
    warnings.simplefilter("default")
55
82
 
56
 
log = logging.getLogger(sys.argv[0])
57
 
logging.basicConfig(level="INFO", # Show info level messages
 
83
log = logging.getLogger(os.path.basename(sys.argv[0]))
 
84
logging.basicConfig(level="INFO",         # Show info level messages
58
85
                    format="%(message)s") # Show basic log messages
59
86
 
60
87
logging.captureWarnings(True)   # Show warnings via the logging system
61
88
 
62
89
if sys.version_info.major == 2:
63
 
    str = unicode
64
90
    import StringIO
65
91
    io.StringIO = StringIO.StringIO
66
92
 
67
93
locale.setlocale(locale.LC_ALL, "")
68
94
 
69
 
version = "1.8.3"
 
95
version = "1.8.16"
70
96
 
71
97
 
72
98
def main():
79
105
    clientnames = options.client
80
106
 
81
107
    if options.debug:
82
 
        log.setLevel(logging.DEBUG)
 
108
        logging.getLogger("").setLevel(logging.DEBUG)
83
109
 
84
 
    bus = dbus_python_adapter.CachingBus(dbus_python)
 
110
    if dbussy is not None and ravel is not None:
 
111
        bus = dbussy_adapter.CachingBus(dbussy, ravel)
 
112
    elif pydbus is not None:
 
113
        bus = pydbus_adapter.CachingBus(pydbus)
 
114
    else:
 
115
        bus = dbus_python_adapter.CachingBus(dbus_python)
85
116
 
86
117
    try:
87
118
        all_clients = bus.get_clients_and_properties()
228
259
        return rfc3339_duration_to_delta(interval)
229
260
    except ValueError as e:
230
261
        log.warning("%s - Parsing as pre-1.6.1 interval instead",
231
 
                    ' '.join(e.args))
 
262
                    " ".join(e.args))
232
263
    return parse_pre_1_6_1_interval(interval)
233
264
 
234
265
 
235
266
def rfc3339_duration_to_delta(duration):
236
267
    """Parse an RFC 3339 "duration" and return a datetime.timedelta
237
268
 
238
 
    >>> rfc3339_duration_to_delta("P7D")
239
 
    datetime.timedelta(7)
240
 
    >>> rfc3339_duration_to_delta("PT60S")
241
 
    datetime.timedelta(0, 60)
242
 
    >>> rfc3339_duration_to_delta("PT60M")
243
 
    datetime.timedelta(0, 3600)
244
 
    >>> rfc3339_duration_to_delta("P60M")
245
 
    datetime.timedelta(1680)
246
 
    >>> rfc3339_duration_to_delta("PT24H")
247
 
    datetime.timedelta(1)
248
 
    >>> rfc3339_duration_to_delta("P1W")
249
 
    datetime.timedelta(7)
250
 
    >>> rfc3339_duration_to_delta("PT5M30S")
251
 
    datetime.timedelta(0, 330)
252
 
    >>> rfc3339_duration_to_delta("P1DT3M20S")
253
 
    datetime.timedelta(1, 200)
 
269
    >>> rfc3339_duration_to_delta("P7D") == datetime.timedelta(7)
 
270
    True
 
271
    >>> rfc3339_duration_to_delta("PT60S") == datetime.timedelta(0, 60)
 
272
    True
 
273
    >>> rfc3339_duration_to_delta("PT60M") == datetime.timedelta(hours=1)
 
274
    True
 
275
    >>> # 60 months
 
276
    >>> rfc3339_duration_to_delta("P60M") == datetime.timedelta(1680)
 
277
    True
 
278
    >>> rfc3339_duration_to_delta("PT24H") == datetime.timedelta(1)
 
279
    True
 
280
    >>> rfc3339_duration_to_delta("P1W") == datetime.timedelta(7)
 
281
    True
 
282
    >>> rfc3339_duration_to_delta("PT5M30S") == datetime.timedelta(0, 330)
 
283
    True
 
284
    >>> rfc3339_duration_to_delta("P1DT3M20S") == datetime.timedelta(1, 200)
 
285
    True
254
286
    >>> # Can not be empty:
255
287
    >>> rfc3339_duration_to_delta("")
256
288
    Traceback (most recent call last):
363
395
 
364
396
 
365
397
def parse_pre_1_6_1_interval(interval):
366
 
    """Parse an interval string as documented by Mandos before 1.6.1,
 
398
    r"""Parse an interval string as documented by Mandos before 1.6.1,
367
399
    and return a datetime.timedelta
368
400
 
369
 
    >>> parse_pre_1_6_1_interval('7d')
370
 
    datetime.timedelta(7)
371
 
    >>> parse_pre_1_6_1_interval('60s')
372
 
    datetime.timedelta(0, 60)
373
 
    >>> parse_pre_1_6_1_interval('60m')
374
 
    datetime.timedelta(0, 3600)
375
 
    >>> parse_pre_1_6_1_interval('24h')
376
 
    datetime.timedelta(1)
377
 
    >>> parse_pre_1_6_1_interval('1w')
378
 
    datetime.timedelta(7)
379
 
    >>> parse_pre_1_6_1_interval('5m 30s')
380
 
    datetime.timedelta(0, 330)
381
 
    >>> parse_pre_1_6_1_interval('')
382
 
    datetime.timedelta(0)
 
401
    >>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7)
 
402
    True
 
403
    >>> parse_pre_1_6_1_interval("60s") == datetime.timedelta(0, 60)
 
404
    True
 
405
    >>> parse_pre_1_6_1_interval("60m") == datetime.timedelta(hours=1)
 
406
    True
 
407
    >>> parse_pre_1_6_1_interval("24h") == datetime.timedelta(days=1)
 
408
    True
 
409
    >>> parse_pre_1_6_1_interval("1w") == datetime.timedelta(days=7)
 
410
    True
 
411
    >>> parse_pre_1_6_1_interval("5m 30s") == datetime.timedelta(0, 330)
 
412
    True
 
413
    >>> parse_pre_1_6_1_interval("") == datetime.timedelta(0)
 
414
    True
383
415
    >>> # Ignore unknown characters, allow any order and repetitions
384
 
    >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
385
 
    datetime.timedelta(2, 480, 18000)
 
416
    >>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") \
 
417
    ... == datetime.timedelta(2, 480, 18000)
 
418
    True
386
419
 
387
420
    """
388
421
 
451
484
        parser.error("--remove can only be combined with --deny")
452
485
 
453
486
 
454
 
class dbus(object):
 
487
class dbus:
455
488
 
456
 
    class SystemBus(object):
 
489
    class SystemBus:
457
490
 
458
491
        object_manager_iface = "org.freedesktop.DBus.ObjectManager"
 
492
 
459
493
        def get_managed_objects(self, busname, objectpath):
460
494
            return self.call_method("GetManagedObjects", busname,
461
495
                                    objectpath,
462
496
                                    self.object_manager_iface)
463
497
 
464
498
        properties_iface = "org.freedesktop.DBus.Properties"
 
499
 
465
500
        def set_property(self, busname, objectpath, interface, key,
466
501
                         value):
467
502
            self.call_method("Set", busname, objectpath,
468
503
                             self.properties_iface, interface, key,
469
504
                             value)
470
505
 
 
506
        def call_method(self, methodname, busname, objectpath,
 
507
                        interface, *args):
 
508
            raise NotImplementedError()
471
509
 
472
510
    class MandosBus(SystemBus):
473
511
        busname_domain = "se.recompile"
505
543
        pass
506
544
 
507
545
 
508
 
class dbus_python_adapter(object):
 
546
class dbus_python_adapter:
509
547
 
510
548
    class SystemBus(dbus.MandosBus):
511
549
        """Use dbus-python"""
556
594
                        for key, subval in value.items()}
557
595
            return value
558
596
 
 
597
        def set_client_property(self, objectpath, key, value):
 
598
            if key == "Secret":
 
599
                if not isinstance(value, bytes):
 
600
                    value = value.encode("utf-8")
 
601
                value = self.dbus_python.ByteArray(value)
 
602
            return self.set_property(self.busname, objectpath,
 
603
                                     self.client_interface, key,
 
604
                                     value)
559
605
 
560
 
    class SilenceLogger(object):
 
606
    class SilenceLogger:
561
607
        "Simple context manager to silence a particular logger"
 
608
 
562
609
        def __init__(self, loggername):
563
610
            self.logger = logging.getLogger(loggername)
564
611
 
574
621
        def __exit__(self, exc_type, exc_val, exc_tb):
575
622
            self.logger.removeFilter(self.nullfilter)
576
623
 
577
 
 
578
624
    class CachingBus(SystemBus):
579
625
        """A caching layer for dbus_python_adapter.SystemBus"""
 
626
 
580
627
        def __init__(self, *args, **kwargs):
581
628
            self.object_cache = {}
582
629
            super(dbus_python_adapter.CachingBus,
583
630
                  self).__init__(*args, **kwargs)
 
631
 
584
632
        def get_object(self, busname, objectpath):
585
633
            try:
586
634
                return self.object_cache[(busname, objectpath)]
588
636
                new_object = super(
589
637
                    dbus_python_adapter.CachingBus,
590
638
                    self).get_object(busname, objectpath)
591
 
                self.object_cache[(busname, objectpath)]  = new_object
 
639
                self.object_cache[(busname, objectpath)] = new_object
 
640
                return new_object
 
641
 
 
642
 
 
643
class pydbus_adapter:
 
644
    class SystemBus(dbus.MandosBus):
 
645
        def __init__(self, module=pydbus):
 
646
            self.pydbus = module
 
647
            self.bus = self.pydbus.SystemBus()
 
648
 
 
649
        @contextlib.contextmanager
 
650
        def convert_exception(self, exception_class=dbus.Error):
 
651
            try:
 
652
                yield
 
653
            except gi.repository.GLib.Error as e:
 
654
                # This does what "raise from" would do
 
655
                exc = exception_class(*e.args)
 
656
                exc.__cause__ = e
 
657
                raise exc
 
658
 
 
659
        def call_method(self, methodname, busname, objectpath,
 
660
                        interface, *args):
 
661
            proxy_object = self.get(busname, objectpath)
 
662
            log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
 
663
                      interface, methodname,
 
664
                      ", ".join(repr(a) for a in args))
 
665
            method = getattr(proxy_object[interface], methodname)
 
666
            with self.convert_exception():
 
667
                return method(*args)
 
668
 
 
669
        def get(self, busname, objectpath):
 
670
            log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
 
671
                      busname, objectpath)
 
672
            with self.convert_exception(dbus.ConnectFailed):
 
673
                if sys.version_info.major <= 2:
 
674
                    with warnings.catch_warnings():
 
675
                        warnings.filterwarnings(
 
676
                            "ignore", "", DeprecationWarning,
 
677
                            r"^xml\.etree\.ElementTree$")
 
678
                        return self.bus.get(busname, objectpath)
 
679
                else:
 
680
                    return self.bus.get(busname, objectpath)
 
681
 
 
682
        def set_property(self, busname, objectpath, interface, key,
 
683
                         value):
 
684
            proxy_object = self.get(busname, objectpath)
 
685
            log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
 
686
                      objectpath, self.properties_iface, interface,
 
687
                      key, value)
 
688
            setattr(proxy_object[interface], key, value)
 
689
 
 
690
    class CachingBus(SystemBus):
 
691
        """A caching layer for pydbus_adapter.SystemBus"""
 
692
 
 
693
        def __init__(self, *args, **kwargs):
 
694
            self.object_cache = {}
 
695
            super(pydbus_adapter.CachingBus,
 
696
                  self).__init__(*args, **kwargs)
 
697
 
 
698
        def get(self, busname, objectpath):
 
699
            try:
 
700
                return self.object_cache[(busname, objectpath)]
 
701
            except KeyError:
 
702
                new_object = (super(pydbus_adapter.CachingBus, self)
 
703
                              .get(busname, objectpath))
 
704
                self.object_cache[(busname, objectpath)] = new_object
 
705
                return new_object
 
706
 
 
707
 
 
708
class dbussy_adapter:
 
709
    class SystemBus(dbus.SystemBus):
 
710
        """Use DBussy"""
 
711
 
 
712
        def __init__(self, dbussy, ravel):
 
713
            self.dbussy = dbussy
 
714
            self.ravel = ravel
 
715
            self.bus = ravel.system_bus()
 
716
 
 
717
        @contextlib.contextmanager
 
718
        def convert_exception(self, exception_class=dbus.Error):
 
719
            try:
 
720
                yield
 
721
            except self.dbussy.DBusError as e:
 
722
                # This does what "raise from" would do
 
723
                exc = exception_class(*e.args)
 
724
                exc.__cause__ = e
 
725
                raise exc
 
726
 
 
727
        def call_method(self, methodname, busname, objectpath,
 
728
                        interface, *args):
 
729
            proxy_object = self.get_object(busname, objectpath)
 
730
            log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
 
731
                      interface, methodname,
 
732
                      ", ".join(repr(a) for a in args))
 
733
            iface = proxy_object.get_interface(interface)
 
734
            method = getattr(iface, methodname)
 
735
            with self.convert_exception(dbus.Error):
 
736
                value = method(*args)
 
737
            # DBussy returns values either as an empty list or as a
 
738
            # list of one element with the return value
 
739
            if value:
 
740
                return self.type_filter(value[0])
 
741
 
 
742
        def get_object(self, busname, objectpath):
 
743
            log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
 
744
                      busname, objectpath)
 
745
            with self.convert_exception(dbus.ConnectFailed):
 
746
                return self.bus[busname][objectpath]
 
747
 
 
748
        def type_filter(self, value):
 
749
            """Convert the most bothersome types to Python types"""
 
750
            # A D-Bus Variant value is represented as the Python type
 
751
            # Tuple[dbussy.DBUS.Signature, Any]
 
752
            if isinstance(value, tuple):
 
753
                if (len(value) == 2
 
754
                    and isinstance(value[0],
 
755
                                   self.dbussy.DBUS.Signature)):
 
756
                    return self.type_filter(value[1])
 
757
            elif isinstance(value, self.dbussy.DBUS.ObjectPath):
 
758
                return str(value)
 
759
            # Also recurse into dictionaries
 
760
            elif isinstance(value, dict):
 
761
                return {self.type_filter(key):
 
762
                        self.type_filter(subval)
 
763
                        for key, subval in value.items()}
 
764
            return value
 
765
 
 
766
        def set_property(self, busname, objectpath, interface, key,
 
767
                         value):
 
768
            proxy_object = self.get_object(busname, objectpath)
 
769
            log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
 
770
                      objectpath, self.properties_iface, interface,
 
771
                      key, value)
 
772
            if key == "Secret":
 
773
                # DBussy wants a Byte Array to be a sequence of
 
774
                # values, not a byte string
 
775
                value = tuple(value)
 
776
            setattr(proxy_object.get_interface(interface), key, value)
 
777
 
 
778
    class MandosBus(SystemBus, dbus.MandosBus):
 
779
        pass
 
780
 
 
781
    class CachingBus(MandosBus):
 
782
        """A caching layer for dbussy_adapter.MandosBus"""
 
783
 
 
784
        def __init__(self, *args, **kwargs):
 
785
            self.object_cache = {}
 
786
            super(dbussy_adapter.CachingBus, self).__init__(*args,
 
787
                                                            **kwargs)
 
788
 
 
789
        def get_object(self, busname, objectpath):
 
790
            try:
 
791
                return self.object_cache[(busname, objectpath)]
 
792
            except KeyError:
 
793
                new_object = super(
 
794
                    dbussy_adapter.CachingBus,
 
795
                    self).get_object(busname, objectpath)
 
796
                self.object_cache[(busname, objectpath)] = new_object
592
797
                return new_object
593
798
 
594
799
 
625
830
    return commands
626
831
 
627
832
 
628
 
class command(object):
 
833
class command:
629
834
    """A namespace for command classes"""
630
835
 
631
 
    class Base(object):
 
836
    class Base:
632
837
        """Abstract base class for commands"""
 
838
 
633
839
        def run(self, clients, bus=None):
634
840
            """Normal commands should implement run_on_one_client(),
635
841
but commands which want to operate on all clients at the same time can
639
845
            for client, properties in clients.items():
640
846
                self.run_on_one_client(client, properties)
641
847
 
642
 
 
643
848
    class IsEnabled(Base):
644
849
        def run(self, clients, bus=None):
645
850
            properties = next(iter(clients.values()))
647
852
                sys.exit(0)
648
853
            sys.exit(1)
649
854
 
650
 
 
651
855
    class Approve(Base):
652
856
        def run_on_one_client(self, client, properties):
653
857
            self.bus.call_client_method(client, "Approve", True)
654
858
 
655
 
 
656
859
    class Deny(Base):
657
860
        def run_on_one_client(self, client, properties):
658
861
            self.bus.call_client_method(client, "Approve", False)
659
862
 
660
 
 
661
863
    class Remove(Base):
662
864
        def run(self, clients, bus):
663
865
            for clientpath in frozenset(clients.keys()):
664
866
                bus.call_server_method("RemoveClient", clientpath)
665
867
 
666
 
 
667
868
    class Output(Base):
668
869
        """Abstract class for commands outputting client details"""
669
870
        all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
675
876
                        "Checker", "ExtendedTimeout", "Expires",
676
877
                        "LastCheckerStatus")
677
878
 
678
 
 
679
879
    class DumpJSON(Output):
680
880
        def run(self, clients, bus=None):
681
881
            data = {properties["Name"]:
682
882
                    {key: properties[key]
683
883
                     for key in self.all_keywords}
684
884
                    for properties in clients.values()}
685
 
            print(json.dumps(data, indent=4, separators=(',', ': ')))
686
 
 
 
885
            print(json.dumps(data, indent=4, separators=(",", ": ")))
687
886
 
688
887
    class PrintTable(Output):
689
888
        def __init__(self, verbose=False):
697
896
                keywords = self.all_keywords
698
897
            print(self.TableOfClients(clients.values(), keywords))
699
898
 
700
 
        class TableOfClients(object):
 
899
        class TableOfClients:
701
900
            tableheaders = {
702
901
                "Name": "Name",
703
902
                "Enabled": "Enabled",
730
929
 
731
930
            if sys.version_info.major == 2:
732
931
                __unicode__ = __str__
 
932
 
733
933
                def __str__(self):
734
934
                    return str(self).encode(
735
935
                        locale.getpreferredencoding())
781
981
                                minutes=(td.seconds % 3600) // 60,
782
982
                                seconds=td.seconds % 60))
783
983
 
784
 
 
785
984
    class PropertySetter(Base):
786
985
        "Abstract class for Actions for setting one client property"
787
986
 
794
993
        def propname(self):
795
994
            raise NotImplementedError()
796
995
 
797
 
 
798
996
    class Enable(PropertySetter):
799
997
        propname = "Enabled"
800
998
        value_to_set = True
801
999
 
802
 
 
803
1000
    class Disable(PropertySetter):
804
1001
        propname = "Enabled"
805
1002
        value_to_set = False
806
1003
 
807
 
 
808
1004
    class BumpTimeout(PropertySetter):
809
1005
        propname = "LastCheckedOK"
810
1006
        value_to_set = ""
811
1007
 
812
 
 
813
1008
    class StartChecker(PropertySetter):
814
1009
        propname = "CheckerRunning"
815
1010
        value_to_set = True
816
1011
 
817
 
 
818
1012
    class StopChecker(PropertySetter):
819
1013
        propname = "CheckerRunning"
820
1014
        value_to_set = False
821
1015
 
822
 
 
823
1016
    class ApproveByDefault(PropertySetter):
824
1017
        propname = "ApprovedByDefault"
825
1018
        value_to_set = True
826
1019
 
827
 
 
828
1020
    class DenyByDefault(PropertySetter):
829
1021
        propname = "ApprovedByDefault"
830
1022
        value_to_set = False
831
1023
 
832
 
 
833
1024
    class PropertySetterValue(PropertySetter):
834
1025
        """Abstract class for PropertySetter recieving a value as
835
1026
constructor argument instead of a class attribute."""
 
1027
 
836
1028
        def __init__(self, value):
837
1029
            self.value_to_set = value
838
1030
 
845
1037
    class SetChecker(PropertySetterValue):
846
1038
        propname = "Checker"
847
1039
 
848
 
 
849
1040
    class SetHost(PropertySetterValue):
850
1041
        propname = "Host"
851
1042
 
852
 
 
853
1043
    class SetSecret(PropertySetterValue):
854
1044
        propname = "Secret"
855
1045
 
863
1053
            self._vts = value.read()
864
1054
            value.close()
865
1055
 
866
 
 
867
1056
    class PropertySetterValueMilliseconds(PropertySetterValue):
868
1057
        """Abstract class for PropertySetterValue taking a value
869
1058
argument as a datetime.timedelta() but should store it as
878
1067
            "When setting, convert value from a datetime.timedelta"
879
1068
            self._vts = int(round(value.total_seconds() * 1000))
880
1069
 
881
 
 
882
1070
    class SetTimeout(PropertySetterValueMilliseconds):
883
1071
        propname = "Timeout"
884
1072
 
885
 
 
886
1073
    class SetExtendedTimeout(PropertySetterValueMilliseconds):
887
1074
        propname = "ExtendedTimeout"
888
1075
 
889
 
 
890
1076
    class SetInterval(PropertySetterValueMilliseconds):
891
1077
        propname = "Interval"
892
1078
 
893
 
 
894
1079
    class SetApprovalDelay(PropertySetterValueMilliseconds):
895
1080
        propname = "ApprovalDelay"
896
1081
 
897
 
 
898
1082
    class SetApprovalDuration(PropertySetterValueMilliseconds):
899
1083
        propname = "ApprovalDuration"
900
1084
 
934
1118
                                                     "output"))
935
1119
 
936
1120
 
937
 
class Unique(object):
 
1121
class Unique:
938
1122
    """Class for objects which exist only to be unique objects, since
939
1123
unittest.mock.sentinel only exists in Python 3.3"""
940
1124
 
1224
1408
class Test_dbus_python_adapter_SystemBus(TestCaseWithAssertLogs):
1225
1409
 
1226
1410
    def MockDBusPython_func(self, func):
1227
 
        class mock_dbus_python(object):
 
1411
        class mock_dbus_python:
1228
1412
            """mock dbus-python module"""
1229
 
            class exceptions(object):
 
1413
            class exceptions:
1230
1414
                """Pseudo-namespace"""
1231
1415
                class DBusException(Exception):
1232
1416
                    pass
1233
 
            class SystemBus(object):
 
1417
            class SystemBus:
1234
1418
                @staticmethod
1235
1419
                def get_object(busname, objectpath):
1236
1420
                    DBusObject = collections.namedtuple(
1237
 
                        "DBusObject", ("methodname",))
 
1421
                        "DBusObject", ("methodname", "Set"))
1238
1422
                    def method(*args, **kwargs):
1239
1423
                        self.assertEqual({"dbus_interface":
1240
1424
                                          "interface"},
1241
1425
                                         kwargs)
1242
1426
                        return func(*args)
1243
 
                    return DBusObject(methodname=method)
1244
 
            class Boolean(object):
 
1427
                    def set_property(interface, key, value,
 
1428
                                     dbus_interface=None):
 
1429
                        self.assertEqual(
 
1430
                            "org.freedesktop.DBus.Properties",
 
1431
                            dbus_interface)
 
1432
                        self.assertEqual("Secret", key)
 
1433
                        return func(interface, key, value,
 
1434
                                    dbus_interface=dbus_interface)
 
1435
                    return DBusObject(methodname=method,
 
1436
                                      Set=set_property)
 
1437
            class Boolean:
1245
1438
                def __init__(self, value):
1246
1439
                    self.value = bool(value)
1247
1440
                def __bool__(self):
1252
1445
                pass
1253
1446
            class Dictionary(dict):
1254
1447
                pass
 
1448
            class ByteArray(bytes):
 
1449
                pass
1255
1450
        return mock_dbus_python
1256
1451
 
1257
1452
    def call_method(self, bus, methodname, busname, objectpath,
1429
1624
        finally:
1430
1625
            dbus_logger.removeFilter(counting_handler)
1431
1626
 
1432
 
        self.assertNotIsInstance(e, dbus.ConnectFailed)
 
1627
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
1433
1628
 
1434
1629
        # Make sure the dbus logger was suppressed
1435
1630
        self.assertEqual(0, counting_handler.count)
1436
1631
 
 
1632
    def test_Set_Secret_sends_bytearray(self):
 
1633
        ret = [None]
 
1634
        def func(*args, **kwargs):
 
1635
            ret[0] = (args, kwargs)
 
1636
        mock_dbus_python = self.MockDBusPython_func(func)
 
1637
        bus = dbus_python_adapter.SystemBus(mock_dbus_python)
 
1638
        bus.set_client_property("objectpath", "Secret", "value")
 
1639
        expected_call = (("se.recompile.Mandos.Client", "Secret",
 
1640
                          mock_dbus_python.ByteArray(b"value")),
 
1641
                         {"dbus_interface":
 
1642
                          "org.freedesktop.DBus.Properties"})
 
1643
        self.assertEqual(expected_call, ret[0])
 
1644
        if sys.version_info.major == 2:
 
1645
            self.assertIsInstance(ret[0][0][-1],
 
1646
                                  mock_dbus_python.ByteArray)
 
1647
 
1437
1648
    def test_get_object_converts_to_correct_exception(self):
1438
1649
        bus = dbus_python_adapter.SystemBus(
1439
1650
            self.fake_dbus_python_raises_exception_on_connect)
1441
1652
            self.call_method(bus, "methodname", "busname",
1442
1653
                             "objectpath", "interface")
1443
1654
 
1444
 
    class fake_dbus_python_raises_exception_on_connect(object):
 
1655
    class fake_dbus_python_raises_exception_on_connect:
1445
1656
        """fake dbus-python module"""
1446
 
        class exceptions(object):
 
1657
        class exceptions:
1447
1658
            """Pseudo-namespace"""
1448
1659
            class DBusException(Exception):
1449
1660
                pass
1457
1668
 
1458
1669
 
1459
1670
class Test_dbus_python_adapter_CachingBus(unittest.TestCase):
1460
 
    class mock_dbus_python(object):
 
1671
    class mock_dbus_python:
1461
1672
        """mock dbus-python modules"""
1462
 
        class SystemBus(object):
 
1673
        class SystemBus:
1463
1674
            @staticmethod
1464
1675
            def get_object(busname, objectpath):
1465
1676
                return Unique()
1508
1719
        self.assertIs(obj1, obj1b)
1509
1720
 
1510
1721
 
 
1722
class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs):
 
1723
 
 
1724
    def Stub_pydbus_func(self, func):
 
1725
        class stub_pydbus:
 
1726
            """stub pydbus module"""
 
1727
            class SystemBus:
 
1728
                @staticmethod
 
1729
                def get(busname, objectpath):
 
1730
                    DBusObject = collections.namedtuple(
 
1731
                        "DBusObject", ("methodname",))
 
1732
                    return {"interface":
 
1733
                            DBusObject(methodname=func)}
 
1734
        return stub_pydbus
 
1735
 
 
1736
    def call_method(self, bus, methodname, busname, objectpath,
 
1737
                    interface, *args):
 
1738
        with self.assertLogs(log, logging.DEBUG):
 
1739
            return bus.call_method(methodname, busname, objectpath,
 
1740
                                   interface, *args)
 
1741
 
 
1742
    def test_call_method_returns(self):
 
1743
        expected_method_return = Unique()
 
1744
        method_args = (Unique(), Unique())
 
1745
        def func(*args):
 
1746
            self.assertEqual(len(method_args), len(args))
 
1747
            for marg, arg in zip(method_args, args):
 
1748
                self.assertIs(marg, arg)
 
1749
            return expected_method_return
 
1750
        stub_pydbus = self.Stub_pydbus_func(func)
 
1751
        bus = pydbus_adapter.SystemBus(stub_pydbus)
 
1752
        ret = self.call_method(bus, "methodname", "busname",
 
1753
                               "objectpath", "interface",
 
1754
                               *method_args)
 
1755
        self.assertIs(ret, expected_method_return)
 
1756
 
 
1757
    def test_call_method_handles_exception(self):
 
1758
        def func():
 
1759
            raise gi.repository.GLib.Error()
 
1760
 
 
1761
        stub_pydbus = self.Stub_pydbus_func(func)
 
1762
        bus = pydbus_adapter.SystemBus(stub_pydbus)
 
1763
 
 
1764
        with self.assertRaises(dbus.Error) as e:
 
1765
            self.call_method(bus, "methodname", "busname",
 
1766
                             "objectpath", "interface")
 
1767
 
 
1768
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
 
1769
 
 
1770
    def test_get_converts_to_correct_exception(self):
 
1771
        bus = pydbus_adapter.SystemBus(
 
1772
            self.fake_pydbus_raises_exception_on_connect)
 
1773
        with self.assertRaises(dbus.ConnectFailed):
 
1774
            self.call_method(bus, "methodname", "busname",
 
1775
                             "objectpath", "interface")
 
1776
 
 
1777
    class fake_pydbus_raises_exception_on_connect:
 
1778
        """fake dbus-python module"""
 
1779
        @classmethod
 
1780
        def SystemBus(cls):
 
1781
            def get(busname, objectpath):
 
1782
                raise gi.repository.GLib.Error()
 
1783
            Bus = collections.namedtuple("Bus", ["get"])
 
1784
            return Bus(get=get)
 
1785
 
 
1786
    def test_set_property_uses_setattr(self):
 
1787
        class Object:
 
1788
            pass
 
1789
        obj = Object()
 
1790
        class pydbus_spy:
 
1791
            class SystemBus:
 
1792
                @staticmethod
 
1793
                def get(busname, objectpath):
 
1794
                    return {"interface": obj}
 
1795
        bus = pydbus_adapter.SystemBus(pydbus_spy)
 
1796
        value = Unique()
 
1797
        bus.set_property("busname", "objectpath", "interface", "key",
 
1798
                         value)
 
1799
        self.assertIs(value, obj.key)
 
1800
 
 
1801
    def test_get_suppresses_xml_deprecation_warning(self):
 
1802
        if sys.version_info.major >= 3:
 
1803
            return
 
1804
        class stub_pydbus_get:
 
1805
            class SystemBus:
 
1806
                @staticmethod
 
1807
                def get(busname, objectpath):
 
1808
                    warnings.warn_explicit(
 
1809
                        "deprecated", DeprecationWarning,
 
1810
                        "xml.etree.ElementTree", 0)
 
1811
        bus = pydbus_adapter.SystemBus(stub_pydbus_get)
 
1812
        with warnings.catch_warnings(record=True) as w:
 
1813
            warnings.simplefilter("always")
 
1814
            bus.get("busname", "objectpath")
 
1815
            self.assertEqual(0, len(w))
 
1816
 
 
1817
 
 
1818
class Test_pydbus_adapter_CachingBus(unittest.TestCase):
 
1819
    class stub_pydbus:
 
1820
        """stub pydbus module"""
 
1821
        class SystemBus:
 
1822
            @staticmethod
 
1823
            def get(busname, objectpath):
 
1824
                return Unique()
 
1825
 
 
1826
    def setUp(self):
 
1827
        self.bus = pydbus_adapter.CachingBus(self.stub_pydbus)
 
1828
 
 
1829
    def test_returns_distinct_objectpaths(self):
 
1830
        obj1 = self.bus.get("busname", "objectpath1")
 
1831
        self.assertIsInstance(obj1, Unique)
 
1832
        obj2 = self.bus.get("busname", "objectpath2")
 
1833
        self.assertIsInstance(obj2, Unique)
 
1834
        self.assertIsNot(obj1, obj2)
 
1835
 
 
1836
    def test_returns_distinct_busnames(self):
 
1837
        obj1 = self.bus.get("busname1", "objectpath")
 
1838
        self.assertIsInstance(obj1, Unique)
 
1839
        obj2 = self.bus.get("busname2", "objectpath")
 
1840
        self.assertIsInstance(obj2, Unique)
 
1841
        self.assertIsNot(obj1, obj2)
 
1842
 
 
1843
    def test_returns_distinct_both(self):
 
1844
        obj1 = self.bus.get("busname1", "objectpath")
 
1845
        self.assertIsInstance(obj1, Unique)
 
1846
        obj2 = self.bus.get("busname2", "objectpath")
 
1847
        self.assertIsInstance(obj2, Unique)
 
1848
        self.assertIsNot(obj1, obj2)
 
1849
 
 
1850
    def test_returns_same(self):
 
1851
        obj1 = self.bus.get("busname", "objectpath")
 
1852
        self.assertIsInstance(obj1, Unique)
 
1853
        obj2 = self.bus.get("busname", "objectpath")
 
1854
        self.assertIsInstance(obj2, Unique)
 
1855
        self.assertIs(obj1, obj2)
 
1856
 
 
1857
    def test_returns_same_old(self):
 
1858
        obj1 = self.bus.get("busname1", "objectpath1")
 
1859
        self.assertIsInstance(obj1, Unique)
 
1860
        obj2 = self.bus.get("busname2", "objectpath2")
 
1861
        self.assertIsInstance(obj2, Unique)
 
1862
        obj1b = self.bus.get("busname1", "objectpath1")
 
1863
        self.assertIsInstance(obj1b, Unique)
 
1864
        self.assertIsNot(obj1, obj2)
 
1865
        self.assertIsNot(obj2, obj1b)
 
1866
        self.assertIs(obj1, obj1b)
 
1867
 
 
1868
 
 
1869
class Test_dbussy_adapter_SystemBus(TestCaseWithAssertLogs):
 
1870
 
 
1871
    class dummy_dbussy:
 
1872
        class DBUS:
 
1873
            class ObjectPath(str):
 
1874
                pass
 
1875
        class DBusError(Exception):
 
1876
            pass
 
1877
 
 
1878
    def fake_ravel_func(self, func):
 
1879
        class fake_ravel:
 
1880
            @staticmethod
 
1881
            def system_bus():
 
1882
                class DBusInterfaceProxy:
 
1883
                    @staticmethod
 
1884
                    def methodname(*args):
 
1885
                        return [func(*args)]
 
1886
                class DBusObject:
 
1887
                    @staticmethod
 
1888
                    def get_interface(interface):
 
1889
                        if interface == "interface":
 
1890
                            return DBusInterfaceProxy()
 
1891
                return {"busname": {"objectpath": DBusObject()}}
 
1892
        return fake_ravel
 
1893
 
 
1894
    def call_method(self, bus, methodname, busname, objectpath,
 
1895
                    interface, *args):
 
1896
        with self.assertLogs(log, logging.DEBUG):
 
1897
            return bus.call_method(methodname, busname, objectpath,
 
1898
                                   interface, *args)
 
1899
 
 
1900
    def test_call_method_returns(self):
 
1901
        expected_method_return = Unique()
 
1902
        method_args = (Unique(), Unique())
 
1903
        def func(*args):
 
1904
            self.assertEqual(len(method_args), len(args))
 
1905
            for marg, arg in zip(method_args, args):
 
1906
                self.assertIs(marg, arg)
 
1907
            return expected_method_return
 
1908
        fake_ravel = self.fake_ravel_func(func)
 
1909
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1910
        ret = self.call_method(bus, "methodname", "busname",
 
1911
                               "objectpath", "interface",
 
1912
                               *method_args)
 
1913
        self.assertIs(ret, expected_method_return)
 
1914
 
 
1915
    def test_call_method_filters_objectpath(self):
 
1916
        def func():
 
1917
            return method_return
 
1918
        fake_ravel = self.fake_ravel_func(func)
 
1919
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1920
        method_return = (self.dummy_dbussy.DBUS
 
1921
                         .ObjectPath("objectpath"))
 
1922
        ret = self.call_method(bus, "methodname", "busname",
 
1923
                               "objectpath", "interface")
 
1924
        self.assertEqual("objectpath", ret)
 
1925
        self.assertNotIsInstance(ret,
 
1926
                                 self.dummy_dbussy.DBUS.ObjectPath)
 
1927
 
 
1928
    def test_call_method_filters_objectpaths_in_dict(self):
 
1929
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
 
1930
        def func():
 
1931
            return method_return
 
1932
        fake_ravel = self.fake_ravel_func(func)
 
1933
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1934
        method_return = {
 
1935
            ObjectPath("objectpath_key_1"):
 
1936
            ObjectPath("objectpath_value_1"),
 
1937
            ObjectPath("objectpath_key_2"):
 
1938
            ObjectPath("objectpath_value_2"),
 
1939
        }
 
1940
        ret = self.call_method(bus, "methodname", "busname",
 
1941
                               "objectpath", "interface")
 
1942
        expected_method_return = {str(key): str(value)
 
1943
                                  for key, value in
 
1944
                                  method_return.items()}
 
1945
        for key, value in ret.items():
 
1946
            self.assertNotIsInstance(key, ObjectPath)
 
1947
            self.assertNotIsInstance(value, ObjectPath)
 
1948
        self.assertEqual(expected_method_return, ret)
 
1949
        self.assertIsInstance(ret, dict)
 
1950
 
 
1951
    def test_call_method_filters_objectpaths_in_dict_in_dict(self):
 
1952
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
 
1953
        def func():
 
1954
            return method_return
 
1955
        fake_ravel = self.fake_ravel_func(func)
 
1956
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1957
        method_return = {
 
1958
            ObjectPath("key1"): {
 
1959
                ObjectPath("key11"): ObjectPath("value11"),
 
1960
                ObjectPath("key12"): ObjectPath("value12"),
 
1961
            },
 
1962
            ObjectPath("key2"): {
 
1963
                ObjectPath("key21"): ObjectPath("value21"),
 
1964
                ObjectPath("key22"): ObjectPath("value22"),
 
1965
            },
 
1966
        }
 
1967
        ret = self.call_method(bus, "methodname", "busname",
 
1968
                               "objectpath", "interface")
 
1969
        expected_method_return = {
 
1970
            "key1": {"key11": "value11",
 
1971
                     "key12": "value12"},
 
1972
            "key2": {"key21": "value21",
 
1973
                     "key22": "value22"},
 
1974
        }
 
1975
        self.assertEqual(expected_method_return, ret)
 
1976
        for key, value in ret.items():
 
1977
            self.assertIsInstance(value, dict)
 
1978
            self.assertEqual(expected_method_return[key], value)
 
1979
            self.assertNotIsInstance(key, ObjectPath)
 
1980
            for inner_key, inner_value in value.items():
 
1981
                self.assertIsInstance(value, dict)
 
1982
                self.assertEqual(
 
1983
                    expected_method_return[key][inner_key],
 
1984
                    inner_value)
 
1985
                self.assertNotIsInstance(key, ObjectPath)
 
1986
 
 
1987
    def test_call_method_filters_objectpaths_in_dict_three_deep(self):
 
1988
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
 
1989
        def func():
 
1990
            return method_return
 
1991
        fake_ravel = self.fake_ravel_func(func)
 
1992
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1993
        method_return = {
 
1994
            ObjectPath("key1"): {
 
1995
                ObjectPath("key2"): {
 
1996
                    ObjectPath("key3"): ObjectPath("value"),
 
1997
                },
 
1998
            },
 
1999
        }
 
2000
        ret = self.call_method(bus, "methodname", "busname",
 
2001
                               "objectpath", "interface")
 
2002
        expected_method_return = {"key1": {"key2": {"key3": "value"}}}
 
2003
        self.assertEqual(expected_method_return, ret)
 
2004
        self.assertIsInstance(ret, dict)
 
2005
        self.assertNotIsInstance(next(iter(ret.keys())), ObjectPath)
 
2006
        self.assertIsInstance(ret["key1"], dict)
 
2007
        self.assertNotIsInstance(next(iter(ret["key1"].keys())),
 
2008
                                 ObjectPath)
 
2009
        self.assertIsInstance(ret["key1"]["key2"], dict)
 
2010
        self.assertNotIsInstance(
 
2011
            next(iter(ret["key1"]["key2"].keys())),
 
2012
            ObjectPath)
 
2013
        self.assertEqual("value", ret["key1"]["key2"]["key3"])
 
2014
        self.assertNotIsInstance(ret["key1"]["key2"]["key3"],
 
2015
                                 self.dummy_dbussy.DBUS.ObjectPath)
 
2016
 
 
2017
    def test_call_method_handles_exception(self):
 
2018
        def func():
 
2019
            raise self.dummy_dbussy.DBusError()
 
2020
 
 
2021
        fake_ravel = self.fake_ravel_func(func)
 
2022
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
2023
 
 
2024
        with self.assertRaises(dbus.Error) as e:
 
2025
            self.call_method(bus, "methodname", "busname",
 
2026
                             "objectpath", "interface")
 
2027
 
 
2028
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
 
2029
 
 
2030
    def test_get_object_converts_to_correct_exception(self):
 
2031
        class fake_ravel_raises_exception_on_connect:
 
2032
            @staticmethod
 
2033
            def system_bus():
 
2034
                class Bus:
 
2035
                    @staticmethod
 
2036
                    def __getitem__(key):
 
2037
                        if key == "objectpath":
 
2038
                            raise self.dummy_dbussy.DBusError()
 
2039
                        raise Exception(key)
 
2040
                return {"busname": Bus()}
 
2041
        def func():
 
2042
            raise self.dummy_dbussy.DBusError()
 
2043
        bus = dbussy_adapter.SystemBus(
 
2044
            self.dummy_dbussy,
 
2045
            fake_ravel_raises_exception_on_connect)
 
2046
        with self.assertRaises(dbus.ConnectFailed):
 
2047
            self.call_method(bus, "methodname", "busname",
 
2048
                             "objectpath", "interface")
 
2049
 
 
2050
 
1511
2051
class Test_commands_from_options(unittest.TestCase):
1512
2052
 
1513
2053
    def setUp(self):
1518
2058
        self.assert_command_from_args(["--is-enabled", "client"],
1519
2059
                                      command.IsEnabled)
1520
2060
 
1521
 
    def assert_command_from_args(self, args, command_cls,
1522
 
                                 **cmd_attrs):
 
2061
    def assert_command_from_args(self, args, command_cls, length=1,
 
2062
                                 clients=None, **cmd_attrs):
1523
2063
        """Assert that parsing ARGS should result in an instance of
1524
2064
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1525
2065
        options = self.parser.parse_args(args)
1526
2066
        check_option_syntax(self.parser, options)
1527
2067
        commands = commands_from_options(options)
1528
 
        self.assertEqual(1, len(commands))
1529
 
        command = commands[0]
1530
 
        self.assertIsInstance(command, command_cls)
 
2068
        self.assertEqual(length, len(commands))
 
2069
        for command in commands:
 
2070
            if isinstance(command, command_cls):
 
2071
                break
 
2072
        else:
 
2073
            self.assertIsInstance(command, command_cls)
 
2074
        if clients is not None:
 
2075
            self.assertEqual(clients, options.client)
1531
2076
        for key, value in cmd_attrs.items():
1532
2077
            self.assertEqual(value, getattr(command, key))
1533
2078
 
 
2079
    def assert_commands_from_args(self, args, commands, clients=None):
 
2080
        for cmd in commands:
 
2081
            self.assert_command_from_args(args, cmd,
 
2082
                                          length=len(commands),
 
2083
                                          clients=clients)
 
2084
 
1534
2085
    def test_is_enabled_short(self):
1535
2086
        self.assert_command_from_args(["-V", "client"],
1536
2087
                                      command.IsEnabled)
1727
2278
                                      verbose=True)
1728
2279
 
1729
2280
 
 
2281
    def test_manual_page_example_1(self):
 
2282
        self.assert_command_from_args("",
 
2283
                                      command.PrintTable,
 
2284
                                      clients=[],
 
2285
                                      verbose=False)
 
2286
 
 
2287
    def test_manual_page_example_2(self):
 
2288
        self.assert_command_from_args(
 
2289
            "--verbose foo1.example.org foo2.example.org".split(),
 
2290
            command.PrintTable, clients=["foo1.example.org",
 
2291
                                         "foo2.example.org"],
 
2292
            verbose=True)
 
2293
 
 
2294
    def test_manual_page_example_3(self):
 
2295
        self.assert_command_from_args("--enable --all".split(),
 
2296
                                      command.Enable,
 
2297
                                      clients=[])
 
2298
 
 
2299
    def test_manual_page_example_4(self):
 
2300
        self.assert_commands_from_args(
 
2301
            ("--timeout=PT5M --interval=PT1M foo1.example.org"
 
2302
             " foo2.example.org").split(),
 
2303
            [command.SetTimeout, command.SetInterval],
 
2304
            clients=["foo1.example.org", "foo2.example.org"])
 
2305
 
 
2306
    def test_manual_page_example_5(self):
 
2307
        self.assert_command_from_args("--approve --all".split(),
 
2308
                                      command.Approve,
 
2309
                                      clients=[])
 
2310
 
 
2311
 
1730
2312
class TestCommand(unittest.TestCase):
1731
2313
    """Abstract class for tests of command classes"""
1732
2314
 
1845
2427
        busname = "se.recompile.Mandos"
1846
2428
        client_interface = "se.recompile.Mandos.Client"
1847
2429
        command.Approve().run(self.bus.clients, self.bus)
 
2430
        self.assertTrue(self.bus.clients)
1848
2431
        for clientpath in self.bus.clients:
1849
2432
            self.assertIn(("Approve", busname, clientpath,
1850
2433
                           client_interface, (True,)), self.bus.calls)
1853
2436
        busname = "se.recompile.Mandos"
1854
2437
        client_interface = "se.recompile.Mandos.Client"
1855
2438
        command.Deny().run(self.bus.clients, self.bus)
 
2439
        self.assertTrue(self.bus.clients)
1856
2440
        for clientpath in self.bus.clients:
1857
2441
            self.assertIn(("Approve", busname, clientpath,
1858
2442
                           client_interface, (False,)),
1859
2443
                          self.bus.calls)
1860
2444
 
1861
2445
    def test_Remove(self):
 
2446
        busname = "se.recompile.Mandos"
 
2447
        server_path = "/"
 
2448
        server_interface = "se.recompile.Mandos"
 
2449
        orig_clients = self.bus.clients.copy()
1862
2450
        command.Remove().run(self.bus.clients, self.bus)
1863
 
        for clientpath in self.bus.clients:
1864
 
            self.assertIn(("RemoveClient", dbus_busname,
1865
 
                           dbus_server_path, dbus_server_interface,
 
2451
        self.assertFalse(self.bus.clients)
 
2452
        for clientpath in orig_clients:
 
2453
            self.assertIn(("RemoveClient", busname,
 
2454
                           server_path, server_interface,
1866
2455
                           (clientpath,)), self.bus.calls)
1867
2456
 
1868
2457
    expected_json = {
2070
2659
        else:
2071
2660
            cmd_args = [() for x in range(len(self.values_to_get))]
2072
2661
            values_to_get = self.values_to_get
 
2662
        self.assertTrue(values_to_get)
2073
2663
        for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2074
2664
            for clientpath in self.bus.clients:
2075
2665
                self.bus.clients[clientpath][self.propname] = (
2076
2666
                    Unique())
2077
2667
            self.command(*cmd_arg).run(self.bus.clients, self.bus)
 
2668
            self.assertTrue(self.bus.clients)
2078
2669
            for clientpath in self.bus.clients:
2079
2670
                value = (self.bus.clients[clientpath]
2080
2671
                         [self.propname])
2139
2730
class TestSetSecretCmd(TestPropertySetterCmd):
2140
2731
    command = command.SetSecret
2141
2732
    propname = "Secret"
2142
 
    values_to_set = [io.BytesIO(b""),
2143
 
                     io.BytesIO(b"secret\0xyzzy\nbar")]
2144
 
    values_to_get = [f.getvalue() for f in values_to_set]
 
2733
    def __init__(self, *args, **kwargs):
 
2734
        self.values_to_set = [io.BytesIO(b""),
 
2735
                              io.BytesIO(b"secret\0xyzzy\nbar")]
 
2736
        self.values_to_get = [f.getvalue() for f in
 
2737
                              self.values_to_set]
 
2738
        super(TestSetSecretCmd, self).__init__(*args, **kwargs)
2145
2739
 
2146
2740
 
2147
2741
class TestSetTimeoutCmd(TestPropertySetterCmd):
2200
2794
 
2201
2795
 
2202
2796
 
2203
 
def should_only_run_tests():
 
2797
def parse_test_args():
 
2798
    # type: () -> argparse.Namespace
2204
2799
    parser = argparse.ArgumentParser(add_help=False)
2205
 
    parser.add_argument("--check", action='store_true')
 
2800
    parser.add_argument("--check", action="store_true")
 
2801
    parser.add_argument("--prefix", )
2206
2802
    args, unknown_args = parser.parse_known_args()
2207
 
    run_tests = args.check
2208
 
    if run_tests:
2209
 
        # Remove --check argument from sys.argv
 
2803
    if args.check:
 
2804
        # Remove test options from sys.argv
2210
2805
        sys.argv[1:] = unknown_args
2211
 
    return run_tests
 
2806
    return args
2212
2807
 
2213
2808
# Add all tests from doctest strings
2214
2809
def load_tests(loader, tests, none):
2217
2812
    return tests
2218
2813
 
2219
2814
if __name__ == "__main__":
 
2815
    options = parse_test_args()
2220
2816
    try:
2221
 
        if should_only_run_tests():
2222
 
            # Call using ./tdd-python-script --check [--verbose]
2223
 
            unittest.main()
 
2817
        if options.check:
 
2818
            extra_test_prefix = options.prefix
 
2819
            if extra_test_prefix is not None:
 
2820
                if not (unittest.main(argv=[""], exit=False)
 
2821
                        .result.wasSuccessful()):
 
2822
                    sys.exit(1)
 
2823
                class ExtraTestLoader(unittest.TestLoader):
 
2824
                    testMethodPrefix = extra_test_prefix
 
2825
                # Call using ./scriptname --check [--verbose]
 
2826
                unittest.main(argv=[""], testLoader=ExtraTestLoader())
 
2827
            else:
 
2828
                unittest.main(argv=[""])
2224
2829
        else:
2225
2830
            main()
2226
2831
    finally:
2227
2832
        logging.shutdown()
 
2833
 
 
2834
# Local Variables:
 
2835
# run-tests:
 
2836
# (lambda (&optional extra)
 
2837
#   (if (not (funcall run-tests-in-test-buffer default-directory
 
2838
#             extra))
 
2839
#       (funcall show-test-buffer-in-test-window)
 
2840
#     (funcall remove-test-window)
 
2841
#     (if extra (message "Extra tests run successfully!"))))
 
2842
# run-tests-in-test-buffer:
 
2843
# (lambda (dir &optional extra)
 
2844
#   (with-current-buffer (get-buffer-create "*Test*")
 
2845
#     (setq buffer-read-only nil
 
2846
#           default-directory dir)
 
2847
#     (erase-buffer)
 
2848
#     (compilation-mode))
 
2849
#   (let ((process-result
 
2850
#          (let ((inhibit-read-only t))
 
2851
#            (process-file-shell-command
 
2852
#             (funcall get-command-line extra) nil "*Test*"))))
 
2853
#     (and (numberp process-result)
 
2854
#          (= process-result 0))))
 
2855
# get-command-line:
 
2856
# (lambda (&optional extra)
 
2857
#   (let ((quoted-script
 
2858
#          (shell-quote-argument (funcall get-script-name))))
 
2859
#     (format
 
2860
#      (concat "%s --check" (if extra " --prefix=atest" ""))
 
2861
#      quoted-script)))
 
2862
# get-script-name:
 
2863
# (lambda ()
 
2864
#   (if (fboundp 'file-local-name)
 
2865
#       (file-local-name (buffer-file-name))
 
2866
#     (or (file-remote-p (buffer-file-name) 'localname)
 
2867
#         (buffer-file-name))))
 
2868
# remove-test-window:
 
2869
# (lambda ()
 
2870
#   (let ((test-window (get-buffer-window "*Test*")))
 
2871
#     (if test-window (delete-window test-window))))
 
2872
# show-test-buffer-in-test-window:
 
2873
# (lambda ()
 
2874
#   (when (not (get-buffer-window-list "*Test*"))
 
2875
#     (setq next-error-last-buffer (get-buffer "*Test*"))
 
2876
#     (let* ((side (if (>= (window-width) 146) 'right 'bottom))
 
2877
#            (display-buffer-overriding-action
 
2878
#             `((display-buffer-in-side-window) (side . ,side)
 
2879
#               (window-height . fit-window-to-buffer)
 
2880
#               (window-width . fit-window-to-buffer))))
 
2881
#       (display-buffer "*Test*"))))
 
2882
# eval:
 
2883
# (progn
 
2884
#   (let* ((run-extra-tests (lambda () (interactive)
 
2885
#                             (funcall run-tests t)))
 
2886
#          (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
 
2887
#          (outer-keymap `(keymap (3 . ,inner-keymap))))     ; C-c
 
2888
#     (setq minor-mode-overriding-map-alist
 
2889
#           (cons `(run-tests . ,outer-keymap)
 
2890
#                 minor-mode-overriding-map-alist)))
 
2891
#   (add-hook 'after-save-hook run-tests 90 t))
 
2892
# End: