/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2024-11-24 14:41:36 UTC
  • mfrom: (237.7.863 trunk)
  • Revision ID: teddy@recompile.se-20241124144136-0fej6fm6woitsooj
Merge from trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/python
2
 
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
3
 
#
4
 
# Mandos Monitor - Control and monitor the Mandos server
5
 
#
6
 
# Copyright © 2008-2019 Teddy Hogeborn
7
 
# Copyright © 2008-2019 Björn Påhlsson
 
1
#!/usr/bin/python3 -bbI
 
2
# -*- coding: utf-8; lexical-binding: t -*-
 
3
#
 
4
# Mandos Control - Control or query the Mandos server
 
5
#
 
6
# Copyright © 2008-2022 Teddy Hogeborn
 
7
# Copyright © 2008-2022 Björn Påhlsson
8
8
#
9
9
# This file is part of Mandos.
10
10
#
23
23
#
24
24
# Contact the authors at <mandos@recompile.se>.
25
25
#
26
 
 
27
26
from __future__ import (division, absolute_import, print_function,
28
27
                        unicode_literals)
29
28
 
33
32
    pass
34
33
 
35
34
import sys
 
35
import unittest
36
36
import argparse
 
37
import logging
 
38
import os
37
39
import locale
38
40
import datetime
39
41
import re
40
 
import os
41
42
import collections
42
43
import json
43
 
import unittest
44
 
import logging
45
44
import io
46
45
import tempfile
47
46
import contextlib
48
 
import abc
49
 
 
50
 
import dbus as dbus_python
 
47
 
 
48
if sys.version_info.major == 2:
 
49
    __metaclass__ = type
 
50
    str = unicode
 
51
    input = raw_input
 
52
 
 
53
 
 
54
class gi:
 
55
    """Dummy gi module, for the tests"""
 
56
    class repository:
 
57
        class GLib:
 
58
            class Error(Exception):
 
59
                pass
 
60
 
 
61
 
 
62
dbussy = None
 
63
ravel = None
 
64
dbus_python = None
 
65
pydbus = None
 
66
 
 
67
try:
 
68
    import dbussy
 
69
    import ravel
 
70
except ImportError:
 
71
    try:
 
72
        import pydbus
 
73
        import gi
 
74
    except ImportError:
 
75
        import dbus as dbus_python
 
76
 
51
77
 
52
78
# Show warnings by default
53
79
if not sys.warnoptions:
54
80
    import warnings
55
81
    warnings.simplefilter("default")
56
82
 
57
 
log = logging.getLogger(sys.argv[0])
58
 
logging.basicConfig(level="INFO", # Show info level messages
 
83
log = logging.getLogger(os.path.basename(sys.argv[0]))
 
84
logging.basicConfig(level="INFO",         # Show info level messages
59
85
                    format="%(message)s") # Show basic log messages
60
86
 
61
87
logging.captureWarnings(True)   # Show warnings via the logging system
62
88
 
63
89
if sys.version_info.major == 2:
64
 
    str = unicode
65
90
    import StringIO
66
91
    io.StringIO = StringIO.StringIO
67
92
 
68
93
locale.setlocale(locale.LC_ALL, "")
69
94
 
70
 
version = "1.8.3"
 
95
version = "1.8.17"
71
96
 
72
97
 
73
98
def main():
80
105
    clientnames = options.client
81
106
 
82
107
    if options.debug:
83
 
        log.setLevel(logging.DEBUG)
 
108
        logging.getLogger("").setLevel(logging.DEBUG)
84
109
 
85
 
    bus = dbus_python_adapter.CachingBus(dbus_python)
 
110
    if dbussy is not None and ravel is not None:
 
111
        bus = dbussy_adapter.CachingBus(dbussy, ravel)
 
112
    elif pydbus is not None:
 
113
        bus = pydbus_adapter.CachingBus(pydbus)
 
114
    else:
 
115
        bus = dbus_python_adapter.CachingBus(dbus_python)
86
116
 
87
117
    try:
88
118
        all_clients = bus.get_clients_and_properties()
229
259
        return rfc3339_duration_to_delta(interval)
230
260
    except ValueError as e:
231
261
        log.warning("%s - Parsing as pre-1.6.1 interval instead",
232
 
                    ' '.join(e.args))
 
262
                    " ".join(e.args))
233
263
    return parse_pre_1_6_1_interval(interval)
234
264
 
235
265
 
236
266
def rfc3339_duration_to_delta(duration):
237
267
    """Parse an RFC 3339 "duration" and return a datetime.timedelta
238
268
 
239
 
    >>> rfc3339_duration_to_delta("P7D")
240
 
    datetime.timedelta(7)
241
 
    >>> rfc3339_duration_to_delta("PT60S")
242
 
    datetime.timedelta(0, 60)
243
 
    >>> rfc3339_duration_to_delta("PT60M")
244
 
    datetime.timedelta(0, 3600)
245
 
    >>> rfc3339_duration_to_delta("P60M")
246
 
    datetime.timedelta(1680)
247
 
    >>> rfc3339_duration_to_delta("PT24H")
248
 
    datetime.timedelta(1)
249
 
    >>> rfc3339_duration_to_delta("P1W")
250
 
    datetime.timedelta(7)
251
 
    >>> rfc3339_duration_to_delta("PT5M30S")
252
 
    datetime.timedelta(0, 330)
253
 
    >>> rfc3339_duration_to_delta("P1DT3M20S")
254
 
    datetime.timedelta(1, 200)
 
269
    >>> rfc3339_duration_to_delta("P7D") == datetime.timedelta(7)
 
270
    True
 
271
    >>> rfc3339_duration_to_delta("PT60S") == datetime.timedelta(0, 60)
 
272
    True
 
273
    >>> rfc3339_duration_to_delta("PT60M") == datetime.timedelta(hours=1)
 
274
    True
 
275
    >>> # 60 months
 
276
    >>> rfc3339_duration_to_delta("P60M") == datetime.timedelta(1680)
 
277
    True
 
278
    >>> rfc3339_duration_to_delta("PT24H") == datetime.timedelta(1)
 
279
    True
 
280
    >>> rfc3339_duration_to_delta("P1W") == datetime.timedelta(7)
 
281
    True
 
282
    >>> rfc3339_duration_to_delta("PT5M30S") == datetime.timedelta(0, 330)
 
283
    True
 
284
    >>> rfc3339_duration_to_delta("P1DT3M20S") == datetime.timedelta(1, 200)
 
285
    True
255
286
    >>> # Can not be empty:
256
287
    >>> rfc3339_duration_to_delta("")
257
288
    Traceback (most recent call last):
364
395
 
365
396
 
366
397
def parse_pre_1_6_1_interval(interval):
367
 
    """Parse an interval string as documented by Mandos before 1.6.1,
 
398
    r"""Parse an interval string as documented by Mandos before 1.6.1,
368
399
    and return a datetime.timedelta
369
400
 
370
 
    >>> parse_pre_1_6_1_interval('7d')
371
 
    datetime.timedelta(7)
372
 
    >>> parse_pre_1_6_1_interval('60s')
373
 
    datetime.timedelta(0, 60)
374
 
    >>> parse_pre_1_6_1_interval('60m')
375
 
    datetime.timedelta(0, 3600)
376
 
    >>> parse_pre_1_6_1_interval('24h')
377
 
    datetime.timedelta(1)
378
 
    >>> parse_pre_1_6_1_interval('1w')
379
 
    datetime.timedelta(7)
380
 
    >>> parse_pre_1_6_1_interval('5m 30s')
381
 
    datetime.timedelta(0, 330)
382
 
    >>> parse_pre_1_6_1_interval('')
383
 
    datetime.timedelta(0)
 
401
    >>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7)
 
402
    True
 
403
    >>> parse_pre_1_6_1_interval("60s") == datetime.timedelta(0, 60)
 
404
    True
 
405
    >>> parse_pre_1_6_1_interval("60m") == datetime.timedelta(hours=1)
 
406
    True
 
407
    >>> parse_pre_1_6_1_interval("24h") == datetime.timedelta(days=1)
 
408
    True
 
409
    >>> parse_pre_1_6_1_interval("1w") == datetime.timedelta(days=7)
 
410
    True
 
411
    >>> parse_pre_1_6_1_interval("5m 30s") == datetime.timedelta(0, 330)
 
412
    True
 
413
    >>> parse_pre_1_6_1_interval("") == datetime.timedelta(0)
 
414
    True
384
415
    >>> # Ignore unknown characters, allow any order and repetitions
385
 
    >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
386
 
    datetime.timedelta(2, 480, 18000)
 
416
    >>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") \
 
417
    ... == datetime.timedelta(2, 480, 18000)
 
418
    True
387
419
 
388
420
    """
389
421
 
452
484
        parser.error("--remove can only be combined with --deny")
453
485
 
454
486
 
455
 
class dbus(object):
 
487
class dbus:
456
488
 
457
 
    class SystemBus(object):
 
489
    class SystemBus:
458
490
 
459
491
        object_manager_iface = "org.freedesktop.DBus.ObjectManager"
 
492
 
460
493
        def get_managed_objects(self, busname, objectpath):
461
494
            return self.call_method("GetManagedObjects", busname,
462
495
                                    objectpath,
463
496
                                    self.object_manager_iface)
464
497
 
465
498
        properties_iface = "org.freedesktop.DBus.Properties"
 
499
 
466
500
        def set_property(self, busname, objectpath, interface, key,
467
501
                         value):
468
502
            self.call_method("Set", busname, objectpath,
469
503
                             self.properties_iface, interface, key,
470
504
                             value)
471
505
 
 
506
        def call_method(self, methodname, busname, objectpath,
 
507
                        interface, *args):
 
508
            raise NotImplementedError()
472
509
 
473
510
    class MandosBus(SystemBus):
474
511
        busname_domain = "se.recompile"
506
543
        pass
507
544
 
508
545
 
509
 
class dbus_python_adapter(object):
 
546
class dbus_python_adapter:
510
547
 
511
548
    class SystemBus(dbus.MandosBus):
512
549
        """Use dbus-python"""
557
594
                        for key, subval in value.items()}
558
595
            return value
559
596
 
 
597
        def set_client_property(self, objectpath, key, value):
 
598
            if key == "Secret":
 
599
                if not isinstance(value, bytes):
 
600
                    value = value.encode("utf-8")
 
601
                value = self.dbus_python.ByteArray(value)
 
602
            return self.set_property(self.busname, objectpath,
 
603
                                     self.client_interface, key,
 
604
                                     value)
560
605
 
561
 
    class SilenceLogger(object):
 
606
    class SilenceLogger:
562
607
        "Simple context manager to silence a particular logger"
 
608
 
563
609
        def __init__(self, loggername):
564
610
            self.logger = logging.getLogger(loggername)
565
611
 
575
621
        def __exit__(self, exc_type, exc_val, exc_tb):
576
622
            self.logger.removeFilter(self.nullfilter)
577
623
 
578
 
 
579
624
    class CachingBus(SystemBus):
580
625
        """A caching layer for dbus_python_adapter.SystemBus"""
 
626
 
581
627
        def __init__(self, *args, **kwargs):
582
628
            self.object_cache = {}
583
629
            super(dbus_python_adapter.CachingBus,
584
630
                  self).__init__(*args, **kwargs)
 
631
 
585
632
        def get_object(self, busname, objectpath):
586
633
            try:
587
634
                return self.object_cache[(busname, objectpath)]
589
636
                new_object = super(
590
637
                    dbus_python_adapter.CachingBus,
591
638
                    self).get_object(busname, objectpath)
592
 
                self.object_cache[(busname, objectpath)]  = new_object
 
639
                self.object_cache[(busname, objectpath)] = new_object
 
640
                return new_object
 
641
 
 
642
 
 
643
class pydbus_adapter:
 
644
    class SystemBus(dbus.MandosBus):
 
645
        def __init__(self, module=pydbus):
 
646
            self.pydbus = module
 
647
            self.bus = self.pydbus.SystemBus()
 
648
 
 
649
        @contextlib.contextmanager
 
650
        def convert_exception(self, exception_class=dbus.Error):
 
651
            try:
 
652
                yield
 
653
            except gi.repository.GLib.Error as e:
 
654
                # This does what "raise from" would do
 
655
                exc = exception_class(*e.args)
 
656
                exc.__cause__ = e
 
657
                raise exc
 
658
 
 
659
        def call_method(self, methodname, busname, objectpath,
 
660
                        interface, *args):
 
661
            proxy_object = self.get(busname, objectpath)
 
662
            log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
 
663
                      interface, methodname,
 
664
                      ", ".join(repr(a) for a in args))
 
665
            method = getattr(proxy_object[interface], methodname)
 
666
            with self.convert_exception():
 
667
                return method(*args)
 
668
 
 
669
        def get(self, busname, objectpath):
 
670
            log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
 
671
                      busname, objectpath)
 
672
            with self.convert_exception(dbus.ConnectFailed):
 
673
                if sys.version_info.major <= 2:
 
674
                    with warnings.catch_warnings():
 
675
                        warnings.filterwarnings(
 
676
                            "ignore", "", DeprecationWarning,
 
677
                            r"^xml\.etree\.ElementTree$")
 
678
                        return self.bus.get(busname, objectpath)
 
679
                else:
 
680
                    return self.bus.get(busname, objectpath)
 
681
 
 
682
        def set_property(self, busname, objectpath, interface, key,
 
683
                         value):
 
684
            proxy_object = self.get(busname, objectpath)
 
685
            log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
 
686
                      objectpath, self.properties_iface, interface,
 
687
                      key, value)
 
688
            setattr(proxy_object[interface], key, value)
 
689
 
 
690
    class CachingBus(SystemBus):
 
691
        """A caching layer for pydbus_adapter.SystemBus"""
 
692
 
 
693
        def __init__(self, *args, **kwargs):
 
694
            self.object_cache = {}
 
695
            super(pydbus_adapter.CachingBus,
 
696
                  self).__init__(*args, **kwargs)
 
697
 
 
698
        def get(self, busname, objectpath):
 
699
            try:
 
700
                return self.object_cache[(busname, objectpath)]
 
701
            except KeyError:
 
702
                new_object = (super(pydbus_adapter.CachingBus, self)
 
703
                              .get(busname, objectpath))
 
704
                self.object_cache[(busname, objectpath)] = new_object
 
705
                return new_object
 
706
 
 
707
 
 
708
class dbussy_adapter:
 
709
    class SystemBus(dbus.SystemBus):
 
710
        """Use DBussy"""
 
711
 
 
712
        def __init__(self, dbussy, ravel):
 
713
            self.dbussy = dbussy
 
714
            self.ravel = ravel
 
715
            self.bus = ravel.system_bus()
 
716
 
 
717
        @contextlib.contextmanager
 
718
        def convert_exception(self, exception_class=dbus.Error):
 
719
            try:
 
720
                yield
 
721
            except self.dbussy.DBusError as e:
 
722
                # This does what "raise from" would do
 
723
                exc = exception_class(*e.args)
 
724
                exc.__cause__ = e
 
725
                raise exc
 
726
 
 
727
        def call_method(self, methodname, busname, objectpath,
 
728
                        interface, *args):
 
729
            proxy_object = self.get_object(busname, objectpath)
 
730
            log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
 
731
                      interface, methodname,
 
732
                      ", ".join(repr(a) for a in args))
 
733
            iface = proxy_object.get_interface(interface)
 
734
            method = getattr(iface, methodname)
 
735
            with self.convert_exception(dbus.Error):
 
736
                value = method(*args)
 
737
            # DBussy returns values either as an empty list or as a
 
738
            # list of one element with the return value
 
739
            if value:
 
740
                return self.type_filter(value[0])
 
741
 
 
742
        def get_object(self, busname, objectpath):
 
743
            log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
 
744
                      busname, objectpath)
 
745
            with self.convert_exception(dbus.ConnectFailed):
 
746
                return self.bus[busname][objectpath]
 
747
 
 
748
        def type_filter(self, value):
 
749
            """Convert the most bothersome types to Python types"""
 
750
            # A D-Bus Variant value is represented as the Python type
 
751
            # Tuple[dbussy.DBUS.Signature, Any]
 
752
            if isinstance(value, tuple):
 
753
                if (len(value) == 2
 
754
                    and isinstance(value[0],
 
755
                                   self.dbussy.DBUS.Signature)):
 
756
                    return self.type_filter(value[1])
 
757
            elif isinstance(value, self.dbussy.DBUS.ObjectPath):
 
758
                return str(value)
 
759
            # Also recurse into dictionaries
 
760
            elif isinstance(value, dict):
 
761
                return {self.type_filter(key):
 
762
                        self.type_filter(subval)
 
763
                        for key, subval in value.items()}
 
764
            return value
 
765
 
 
766
        def set_property(self, busname, objectpath, interface, key,
 
767
                         value):
 
768
            proxy_object = self.get_object(busname, objectpath)
 
769
            log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
 
770
                      objectpath, self.properties_iface, interface,
 
771
                      key, value)
 
772
            if key == "Secret":
 
773
                # DBussy wants a Byte Array to be a sequence of
 
774
                # values, not a byte string
 
775
                value = tuple(value)
 
776
            setattr(proxy_object.get_interface(interface), key, value)
 
777
 
 
778
    class MandosBus(SystemBus, dbus.MandosBus):
 
779
        pass
 
780
 
 
781
    class CachingBus(MandosBus):
 
782
        """A caching layer for dbussy_adapter.MandosBus"""
 
783
 
 
784
        def __init__(self, *args, **kwargs):
 
785
            self.object_cache = {}
 
786
            super(dbussy_adapter.CachingBus, self).__init__(*args,
 
787
                                                            **kwargs)
 
788
 
 
789
        def get_object(self, busname, objectpath):
 
790
            try:
 
791
                return self.object_cache[(busname, objectpath)]
 
792
            except KeyError:
 
793
                new_object = super(
 
794
                    dbussy_adapter.CachingBus,
 
795
                    self).get_object(busname, objectpath)
 
796
                self.object_cache[(busname, objectpath)] = new_object
593
797
                return new_object
594
798
 
595
799
 
626
830
    return commands
627
831
 
628
832
 
629
 
class command(object):
 
833
class command:
630
834
    """A namespace for command classes"""
631
835
 
632
 
    class Base(object):
 
836
    class Base:
633
837
        """Abstract base class for commands"""
 
838
 
634
839
        def run(self, clients, bus=None):
635
840
            """Normal commands should implement run_on_one_client(),
636
841
but commands which want to operate on all clients at the same time can
640
845
            for client, properties in clients.items():
641
846
                self.run_on_one_client(client, properties)
642
847
 
643
 
 
644
848
    class IsEnabled(Base):
645
849
        def run(self, clients, bus=None):
646
850
            properties = next(iter(clients.values()))
648
852
                sys.exit(0)
649
853
            sys.exit(1)
650
854
 
651
 
 
652
855
    class Approve(Base):
653
856
        def run_on_one_client(self, client, properties):
654
857
            self.bus.call_client_method(client, "Approve", True)
655
858
 
656
 
 
657
859
    class Deny(Base):
658
860
        def run_on_one_client(self, client, properties):
659
861
            self.bus.call_client_method(client, "Approve", False)
660
862
 
661
 
 
662
863
    class Remove(Base):
663
864
        def run(self, clients, bus):
664
865
            for clientpath in frozenset(clients.keys()):
665
866
                bus.call_server_method("RemoveClient", clientpath)
666
867
 
667
 
 
668
868
    class Output(Base):
669
869
        """Abstract class for commands outputting client details"""
670
870
        all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
676
876
                        "Checker", "ExtendedTimeout", "Expires",
677
877
                        "LastCheckerStatus")
678
878
 
679
 
 
680
879
    class DumpJSON(Output):
681
880
        def run(self, clients, bus=None):
682
881
            data = {properties["Name"]:
683
882
                    {key: properties[key]
684
883
                     for key in self.all_keywords}
685
884
                    for properties in clients.values()}
686
 
            print(json.dumps(data, indent=4, separators=(',', ': ')))
687
 
 
 
885
            print(json.dumps(data, indent=4, separators=(",", ": ")))
688
886
 
689
887
    class PrintTable(Output):
690
888
        def __init__(self, verbose=False):
698
896
                keywords = self.all_keywords
699
897
            print(self.TableOfClients(clients.values(), keywords))
700
898
 
701
 
        class TableOfClients(object):
 
899
        class TableOfClients:
702
900
            tableheaders = {
703
901
                "Name": "Name",
704
902
                "Enabled": "Enabled",
731
929
 
732
930
            if sys.version_info.major == 2:
733
931
                __unicode__ = __str__
 
932
 
734
933
                def __str__(self):
735
934
                    return str(self).encode(
736
935
                        locale.getpreferredencoding())
782
981
                                minutes=(td.seconds % 3600) // 60,
783
982
                                seconds=td.seconds % 60))
784
983
 
785
 
 
786
984
    class PropertySetter(Base):
787
985
        "Abstract class for Actions for setting one client property"
788
986
 
795
993
        def propname(self):
796
994
            raise NotImplementedError()
797
995
 
798
 
 
799
996
    class Enable(PropertySetter):
800
997
        propname = "Enabled"
801
998
        value_to_set = True
802
999
 
803
 
 
804
1000
    class Disable(PropertySetter):
805
1001
        propname = "Enabled"
806
1002
        value_to_set = False
807
1003
 
808
 
 
809
1004
    class BumpTimeout(PropertySetter):
810
1005
        propname = "LastCheckedOK"
811
1006
        value_to_set = ""
812
1007
 
813
 
 
814
1008
    class StartChecker(PropertySetter):
815
1009
        propname = "CheckerRunning"
816
1010
        value_to_set = True
817
1011
 
818
 
 
819
1012
    class StopChecker(PropertySetter):
820
1013
        propname = "CheckerRunning"
821
1014
        value_to_set = False
822
1015
 
823
 
 
824
1016
    class ApproveByDefault(PropertySetter):
825
1017
        propname = "ApprovedByDefault"
826
1018
        value_to_set = True
827
1019
 
828
 
 
829
1020
    class DenyByDefault(PropertySetter):
830
1021
        propname = "ApprovedByDefault"
831
1022
        value_to_set = False
832
1023
 
833
 
 
834
1024
    class PropertySetterValue(PropertySetter):
835
1025
        """Abstract class for PropertySetter recieving a value as
836
1026
constructor argument instead of a class attribute."""
 
1027
 
837
1028
        def __init__(self, value):
838
1029
            self.value_to_set = value
839
1030
 
846
1037
    class SetChecker(PropertySetterValue):
847
1038
        propname = "Checker"
848
1039
 
849
 
 
850
1040
    class SetHost(PropertySetterValue):
851
1041
        propname = "Host"
852
1042
 
853
 
 
854
1043
    class SetSecret(PropertySetterValue):
855
1044
        propname = "Secret"
856
1045
 
864
1053
            self._vts = value.read()
865
1054
            value.close()
866
1055
 
867
 
 
868
1056
    class PropertySetterValueMilliseconds(PropertySetterValue):
869
1057
        """Abstract class for PropertySetterValue taking a value
870
1058
argument as a datetime.timedelta() but should store it as
879
1067
            "When setting, convert value from a datetime.timedelta"
880
1068
            self._vts = int(round(value.total_seconds() * 1000))
881
1069
 
882
 
 
883
1070
    class SetTimeout(PropertySetterValueMilliseconds):
884
1071
        propname = "Timeout"
885
1072
 
886
 
 
887
1073
    class SetExtendedTimeout(PropertySetterValueMilliseconds):
888
1074
        propname = "ExtendedTimeout"
889
1075
 
890
 
 
891
1076
    class SetInterval(PropertySetterValueMilliseconds):
892
1077
        propname = "Interval"
893
1078
 
894
 
 
895
1079
    class SetApprovalDelay(PropertySetterValueMilliseconds):
896
1080
        propname = "ApprovalDelay"
897
1081
 
898
 
 
899
1082
    class SetApprovalDuration(PropertySetterValueMilliseconds):
900
1083
        propname = "ApprovalDuration"
901
1084
 
935
1118
                                                     "output"))
936
1119
 
937
1120
 
938
 
class Unique(object):
 
1121
class Unique:
939
1122
    """Class for objects which exist only to be unique objects, since
940
1123
unittest.mock.sentinel only exists in Python 3.3"""
941
1124
 
1225
1408
class Test_dbus_python_adapter_SystemBus(TestCaseWithAssertLogs):
1226
1409
 
1227
1410
    def MockDBusPython_func(self, func):
1228
 
        class mock_dbus_python(object):
 
1411
        class mock_dbus_python:
1229
1412
            """mock dbus-python module"""
1230
 
            class exceptions(object):
 
1413
            class exceptions:
1231
1414
                """Pseudo-namespace"""
1232
1415
                class DBusException(Exception):
1233
1416
                    pass
1234
 
            class SystemBus(object):
 
1417
            class SystemBus:
1235
1418
                @staticmethod
1236
1419
                def get_object(busname, objectpath):
1237
1420
                    DBusObject = collections.namedtuple(
1238
 
                        "DBusObject", ("methodname",))
 
1421
                        "DBusObject", ("methodname", "Set"))
1239
1422
                    def method(*args, **kwargs):
1240
1423
                        self.assertEqual({"dbus_interface":
1241
1424
                                          "interface"},
1242
1425
                                         kwargs)
1243
1426
                        return func(*args)
1244
 
                    return DBusObject(methodname=method)
1245
 
            class Boolean(object):
 
1427
                    def set_property(interface, key, value,
 
1428
                                     dbus_interface=None):
 
1429
                        self.assertEqual(
 
1430
                            "org.freedesktop.DBus.Properties",
 
1431
                            dbus_interface)
 
1432
                        self.assertEqual("Secret", key)
 
1433
                        return func(interface, key, value,
 
1434
                                    dbus_interface=dbus_interface)
 
1435
                    return DBusObject(methodname=method,
 
1436
                                      Set=set_property)
 
1437
            class Boolean:
1246
1438
                def __init__(self, value):
1247
1439
                    self.value = bool(value)
1248
1440
                def __bool__(self):
1253
1445
                pass
1254
1446
            class Dictionary(dict):
1255
1447
                pass
 
1448
            class ByteArray(bytes):
 
1449
                pass
1256
1450
        return mock_dbus_python
1257
1451
 
1258
1452
    def call_method(self, bus, methodname, busname, objectpath,
1430
1624
        finally:
1431
1625
            dbus_logger.removeFilter(counting_handler)
1432
1626
 
1433
 
        self.assertNotIsInstance(e, dbus.ConnectFailed)
 
1627
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
1434
1628
 
1435
1629
        # Make sure the dbus logger was suppressed
1436
1630
        self.assertEqual(0, counting_handler.count)
1437
1631
 
 
1632
    def test_Set_Secret_sends_bytearray(self):
 
1633
        ret = [None]
 
1634
        def func(*args, **kwargs):
 
1635
            ret[0] = (args, kwargs)
 
1636
        mock_dbus_python = self.MockDBusPython_func(func)
 
1637
        bus = dbus_python_adapter.SystemBus(mock_dbus_python)
 
1638
        bus.set_client_property("objectpath", "Secret", "value")
 
1639
        expected_call = (("se.recompile.Mandos.Client", "Secret",
 
1640
                          mock_dbus_python.ByteArray(b"value")),
 
1641
                         {"dbus_interface":
 
1642
                          "org.freedesktop.DBus.Properties"})
 
1643
        self.assertEqual(expected_call, ret[0])
 
1644
        if sys.version_info.major == 2:
 
1645
            self.assertIsInstance(ret[0][0][-1],
 
1646
                                  mock_dbus_python.ByteArray)
 
1647
 
1438
1648
    def test_get_object_converts_to_correct_exception(self):
1439
1649
        bus = dbus_python_adapter.SystemBus(
1440
1650
            self.fake_dbus_python_raises_exception_on_connect)
1442
1652
            self.call_method(bus, "methodname", "busname",
1443
1653
                             "objectpath", "interface")
1444
1654
 
1445
 
    class fake_dbus_python_raises_exception_on_connect(object):
 
1655
    class fake_dbus_python_raises_exception_on_connect:
1446
1656
        """fake dbus-python module"""
1447
 
        class exceptions(object):
 
1657
        class exceptions:
1448
1658
            """Pseudo-namespace"""
1449
1659
            class DBusException(Exception):
1450
1660
                pass
1458
1668
 
1459
1669
 
1460
1670
class Test_dbus_python_adapter_CachingBus(unittest.TestCase):
1461
 
    class mock_dbus_python(object):
 
1671
    class mock_dbus_python:
1462
1672
        """mock dbus-python modules"""
1463
 
        class SystemBus(object):
 
1673
        class SystemBus:
1464
1674
            @staticmethod
1465
1675
            def get_object(busname, objectpath):
1466
1676
                return Unique()
1509
1719
        self.assertIs(obj1, obj1b)
1510
1720
 
1511
1721
 
 
1722
class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs):
 
1723
 
 
1724
    def Stub_pydbus_func(self, func):
 
1725
        class stub_pydbus:
 
1726
            """stub pydbus module"""
 
1727
            class SystemBus:
 
1728
                @staticmethod
 
1729
                def get(busname, objectpath):
 
1730
                    DBusObject = collections.namedtuple(
 
1731
                        "DBusObject", ("methodname",))
 
1732
                    return {"interface":
 
1733
                            DBusObject(methodname=func)}
 
1734
        return stub_pydbus
 
1735
 
 
1736
    def call_method(self, bus, methodname, busname, objectpath,
 
1737
                    interface, *args):
 
1738
        with self.assertLogs(log, logging.DEBUG):
 
1739
            return bus.call_method(methodname, busname, objectpath,
 
1740
                                   interface, *args)
 
1741
 
 
1742
    def test_call_method_returns(self):
 
1743
        expected_method_return = Unique()
 
1744
        method_args = (Unique(), Unique())
 
1745
        def func(*args):
 
1746
            self.assertEqual(len(method_args), len(args))
 
1747
            for marg, arg in zip(method_args, args):
 
1748
                self.assertIs(marg, arg)
 
1749
            return expected_method_return
 
1750
        stub_pydbus = self.Stub_pydbus_func(func)
 
1751
        bus = pydbus_adapter.SystemBus(stub_pydbus)
 
1752
        ret = self.call_method(bus, "methodname", "busname",
 
1753
                               "objectpath", "interface",
 
1754
                               *method_args)
 
1755
        self.assertIs(ret, expected_method_return)
 
1756
 
 
1757
    def test_call_method_handles_exception(self):
 
1758
        def func():
 
1759
            raise gi.repository.GLib.Error()
 
1760
 
 
1761
        stub_pydbus = self.Stub_pydbus_func(func)
 
1762
        bus = pydbus_adapter.SystemBus(stub_pydbus)
 
1763
 
 
1764
        with self.assertRaises(dbus.Error) as e:
 
1765
            self.call_method(bus, "methodname", "busname",
 
1766
                             "objectpath", "interface")
 
1767
 
 
1768
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
 
1769
 
 
1770
    def test_get_converts_to_correct_exception(self):
 
1771
        bus = pydbus_adapter.SystemBus(
 
1772
            self.fake_pydbus_raises_exception_on_connect)
 
1773
        with self.assertRaises(dbus.ConnectFailed):
 
1774
            self.call_method(bus, "methodname", "busname",
 
1775
                             "objectpath", "interface")
 
1776
 
 
1777
    class fake_pydbus_raises_exception_on_connect:
 
1778
        """fake dbus-python module"""
 
1779
        @classmethod
 
1780
        def SystemBus(cls):
 
1781
            def get(busname, objectpath):
 
1782
                raise gi.repository.GLib.Error()
 
1783
            Bus = collections.namedtuple("Bus", ["get"])
 
1784
            return Bus(get=get)
 
1785
 
 
1786
    def test_set_property_uses_setattr(self):
 
1787
        class Object:
 
1788
            pass
 
1789
        obj = Object()
 
1790
        class pydbus_spy:
 
1791
            class SystemBus:
 
1792
                @staticmethod
 
1793
                def get(busname, objectpath):
 
1794
                    return {"interface": obj}
 
1795
        bus = pydbus_adapter.SystemBus(pydbus_spy)
 
1796
        value = Unique()
 
1797
        bus.set_property("busname", "objectpath", "interface", "key",
 
1798
                         value)
 
1799
        self.assertIs(value, obj.key)
 
1800
 
 
1801
    def test_get_suppresses_xml_deprecation_warning(self):
 
1802
        if sys.version_info.major >= 3:
 
1803
            return
 
1804
        class stub_pydbus_get:
 
1805
            class SystemBus:
 
1806
                @staticmethod
 
1807
                def get(busname, objectpath):
 
1808
                    warnings.warn_explicit(
 
1809
                        "deprecated", DeprecationWarning,
 
1810
                        "xml.etree.ElementTree", 0)
 
1811
        bus = pydbus_adapter.SystemBus(stub_pydbus_get)
 
1812
        with warnings.catch_warnings(record=True) as w:
 
1813
            warnings.simplefilter("always")
 
1814
            bus.get("busname", "objectpath")
 
1815
            self.assertEqual(0, len(w))
 
1816
 
 
1817
 
 
1818
class Test_pydbus_adapter_CachingBus(unittest.TestCase):
 
1819
    class stub_pydbus:
 
1820
        """stub pydbus module"""
 
1821
        class SystemBus:
 
1822
            @staticmethod
 
1823
            def get(busname, objectpath):
 
1824
                return Unique()
 
1825
 
 
1826
    def setUp(self):
 
1827
        self.bus = pydbus_adapter.CachingBus(self.stub_pydbus)
 
1828
 
 
1829
    def test_returns_distinct_objectpaths(self):
 
1830
        obj1 = self.bus.get("busname", "objectpath1")
 
1831
        self.assertIsInstance(obj1, Unique)
 
1832
        obj2 = self.bus.get("busname", "objectpath2")
 
1833
        self.assertIsInstance(obj2, Unique)
 
1834
        self.assertIsNot(obj1, obj2)
 
1835
 
 
1836
    def test_returns_distinct_busnames(self):
 
1837
        obj1 = self.bus.get("busname1", "objectpath")
 
1838
        self.assertIsInstance(obj1, Unique)
 
1839
        obj2 = self.bus.get("busname2", "objectpath")
 
1840
        self.assertIsInstance(obj2, Unique)
 
1841
        self.assertIsNot(obj1, obj2)
 
1842
 
 
1843
    def test_returns_distinct_both(self):
 
1844
        obj1 = self.bus.get("busname1", "objectpath")
 
1845
        self.assertIsInstance(obj1, Unique)
 
1846
        obj2 = self.bus.get("busname2", "objectpath")
 
1847
        self.assertIsInstance(obj2, Unique)
 
1848
        self.assertIsNot(obj1, obj2)
 
1849
 
 
1850
    def test_returns_same(self):
 
1851
        obj1 = self.bus.get("busname", "objectpath")
 
1852
        self.assertIsInstance(obj1, Unique)
 
1853
        obj2 = self.bus.get("busname", "objectpath")
 
1854
        self.assertIsInstance(obj2, Unique)
 
1855
        self.assertIs(obj1, obj2)
 
1856
 
 
1857
    def test_returns_same_old(self):
 
1858
        obj1 = self.bus.get("busname1", "objectpath1")
 
1859
        self.assertIsInstance(obj1, Unique)
 
1860
        obj2 = self.bus.get("busname2", "objectpath2")
 
1861
        self.assertIsInstance(obj2, Unique)
 
1862
        obj1b = self.bus.get("busname1", "objectpath1")
 
1863
        self.assertIsInstance(obj1b, Unique)
 
1864
        self.assertIsNot(obj1, obj2)
 
1865
        self.assertIsNot(obj2, obj1b)
 
1866
        self.assertIs(obj1, obj1b)
 
1867
 
 
1868
 
 
1869
class Test_dbussy_adapter_SystemBus(TestCaseWithAssertLogs):
 
1870
 
 
1871
    class dummy_dbussy:
 
1872
        class DBUS:
 
1873
            class ObjectPath(str):
 
1874
                pass
 
1875
        class DBusError(Exception):
 
1876
            pass
 
1877
 
 
1878
    def fake_ravel_func(self, func):
 
1879
        class fake_ravel:
 
1880
            @staticmethod
 
1881
            def system_bus():
 
1882
                class DBusInterfaceProxy:
 
1883
                    @staticmethod
 
1884
                    def methodname(*args):
 
1885
                        return [func(*args)]
 
1886
                class DBusObject:
 
1887
                    @staticmethod
 
1888
                    def get_interface(interface):
 
1889
                        if interface == "interface":
 
1890
                            return DBusInterfaceProxy()
 
1891
                return {"busname": {"objectpath": DBusObject()}}
 
1892
        return fake_ravel
 
1893
 
 
1894
    def call_method(self, bus, methodname, busname, objectpath,
 
1895
                    interface, *args):
 
1896
        with self.assertLogs(log, logging.DEBUG):
 
1897
            return bus.call_method(methodname, busname, objectpath,
 
1898
                                   interface, *args)
 
1899
 
 
1900
    def test_call_method_returns(self):
 
1901
        expected_method_return = Unique()
 
1902
        method_args = (Unique(), Unique())
 
1903
        def func(*args):
 
1904
            self.assertEqual(len(method_args), len(args))
 
1905
            for marg, arg in zip(method_args, args):
 
1906
                self.assertIs(marg, arg)
 
1907
            return expected_method_return
 
1908
        fake_ravel = self.fake_ravel_func(func)
 
1909
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1910
        ret = self.call_method(bus, "methodname", "busname",
 
1911
                               "objectpath", "interface",
 
1912
                               *method_args)
 
1913
        self.assertIs(ret, expected_method_return)
 
1914
 
 
1915
    def test_call_method_filters_objectpath(self):
 
1916
        def func():
 
1917
            return method_return
 
1918
        fake_ravel = self.fake_ravel_func(func)
 
1919
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1920
        method_return = (self.dummy_dbussy.DBUS
 
1921
                         .ObjectPath("objectpath"))
 
1922
        ret = self.call_method(bus, "methodname", "busname",
 
1923
                               "objectpath", "interface")
 
1924
        self.assertEqual("objectpath", ret)
 
1925
        self.assertNotIsInstance(ret,
 
1926
                                 self.dummy_dbussy.DBUS.ObjectPath)
 
1927
 
 
1928
    def test_call_method_filters_objectpaths_in_dict(self):
 
1929
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
 
1930
        def func():
 
1931
            return method_return
 
1932
        fake_ravel = self.fake_ravel_func(func)
 
1933
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1934
        method_return = {
 
1935
            ObjectPath("objectpath_key_1"):
 
1936
            ObjectPath("objectpath_value_1"),
 
1937
            ObjectPath("objectpath_key_2"):
 
1938
            ObjectPath("objectpath_value_2"),
 
1939
        }
 
1940
        ret = self.call_method(bus, "methodname", "busname",
 
1941
                               "objectpath", "interface")
 
1942
        expected_method_return = {str(key): str(value)
 
1943
                                  for key, value in
 
1944
                                  method_return.items()}
 
1945
        for key, value in ret.items():
 
1946
            self.assertNotIsInstance(key, ObjectPath)
 
1947
            self.assertNotIsInstance(value, ObjectPath)
 
1948
        self.assertEqual(expected_method_return, ret)
 
1949
        self.assertIsInstance(ret, dict)
 
1950
 
 
1951
    def test_call_method_filters_objectpaths_in_dict_in_dict(self):
 
1952
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
 
1953
        def func():
 
1954
            return method_return
 
1955
        fake_ravel = self.fake_ravel_func(func)
 
1956
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1957
        method_return = {
 
1958
            ObjectPath("key1"): {
 
1959
                ObjectPath("key11"): ObjectPath("value11"),
 
1960
                ObjectPath("key12"): ObjectPath("value12"),
 
1961
            },
 
1962
            ObjectPath("key2"): {
 
1963
                ObjectPath("key21"): ObjectPath("value21"),
 
1964
                ObjectPath("key22"): ObjectPath("value22"),
 
1965
            },
 
1966
        }
 
1967
        ret = self.call_method(bus, "methodname", "busname",
 
1968
                               "objectpath", "interface")
 
1969
        expected_method_return = {
 
1970
            "key1": {"key11": "value11",
 
1971
                     "key12": "value12"},
 
1972
            "key2": {"key21": "value21",
 
1973
                     "key22": "value22"},
 
1974
        }
 
1975
        self.assertEqual(expected_method_return, ret)
 
1976
        for key, value in ret.items():
 
1977
            self.assertIsInstance(value, dict)
 
1978
            self.assertEqual(expected_method_return[key], value)
 
1979
            self.assertNotIsInstance(key, ObjectPath)
 
1980
            for inner_key, inner_value in value.items():
 
1981
                self.assertIsInstance(value, dict)
 
1982
                self.assertEqual(
 
1983
                    expected_method_return[key][inner_key],
 
1984
                    inner_value)
 
1985
                self.assertNotIsInstance(key, ObjectPath)
 
1986
 
 
1987
    def test_call_method_filters_objectpaths_in_dict_three_deep(self):
 
1988
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
 
1989
        def func():
 
1990
            return method_return
 
1991
        fake_ravel = self.fake_ravel_func(func)
 
1992
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
1993
        method_return = {
 
1994
            ObjectPath("key1"): {
 
1995
                ObjectPath("key2"): {
 
1996
                    ObjectPath("key3"): ObjectPath("value"),
 
1997
                },
 
1998
            },
 
1999
        }
 
2000
        ret = self.call_method(bus, "methodname", "busname",
 
2001
                               "objectpath", "interface")
 
2002
        expected_method_return = {"key1": {"key2": {"key3": "value"}}}
 
2003
        self.assertEqual(expected_method_return, ret)
 
2004
        self.assertIsInstance(ret, dict)
 
2005
        self.assertNotIsInstance(next(iter(ret.keys())), ObjectPath)
 
2006
        self.assertIsInstance(ret["key1"], dict)
 
2007
        self.assertNotIsInstance(next(iter(ret["key1"].keys())),
 
2008
                                 ObjectPath)
 
2009
        self.assertIsInstance(ret["key1"]["key2"], dict)
 
2010
        self.assertNotIsInstance(
 
2011
            next(iter(ret["key1"]["key2"].keys())),
 
2012
            ObjectPath)
 
2013
        self.assertEqual("value", ret["key1"]["key2"]["key3"])
 
2014
        self.assertNotIsInstance(ret["key1"]["key2"]["key3"],
 
2015
                                 self.dummy_dbussy.DBUS.ObjectPath)
 
2016
 
 
2017
    def test_call_method_handles_exception(self):
 
2018
        def func():
 
2019
            raise self.dummy_dbussy.DBusError()
 
2020
 
 
2021
        fake_ravel = self.fake_ravel_func(func)
 
2022
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
 
2023
 
 
2024
        with self.assertRaises(dbus.Error) as e:
 
2025
            self.call_method(bus, "methodname", "busname",
 
2026
                             "objectpath", "interface")
 
2027
 
 
2028
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
 
2029
 
 
2030
    def test_get_object_converts_to_correct_exception(self):
 
2031
        class fake_ravel_raises_exception_on_connect:
 
2032
            @staticmethod
 
2033
            def system_bus():
 
2034
                class Bus:
 
2035
                    @staticmethod
 
2036
                    def __getitem__(key):
 
2037
                        if key == "objectpath":
 
2038
                            raise self.dummy_dbussy.DBusError()
 
2039
                        raise Exception(key)
 
2040
                return {"busname": Bus()}
 
2041
        def func():
 
2042
            raise self.dummy_dbussy.DBusError()
 
2043
        bus = dbussy_adapter.SystemBus(
 
2044
            self.dummy_dbussy,
 
2045
            fake_ravel_raises_exception_on_connect)
 
2046
        with self.assertRaises(dbus.ConnectFailed):
 
2047
            self.call_method(bus, "methodname", "busname",
 
2048
                             "objectpath", "interface")
 
2049
 
 
2050
 
1512
2051
class Test_commands_from_options(unittest.TestCase):
1513
2052
 
1514
2053
    def setUp(self):
1519
2058
        self.assert_command_from_args(["--is-enabled", "client"],
1520
2059
                                      command.IsEnabled)
1521
2060
 
1522
 
    def assert_command_from_args(self, args, command_cls,
1523
 
                                 **cmd_attrs):
 
2061
    def assert_command_from_args(self, args, command_cls, length=1,
 
2062
                                 clients=None, **cmd_attrs):
1524
2063
        """Assert that parsing ARGS should result in an instance of
1525
2064
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1526
2065
        options = self.parser.parse_args(args)
1527
2066
        check_option_syntax(self.parser, options)
1528
2067
        commands = commands_from_options(options)
1529
 
        self.assertEqual(1, len(commands))
1530
 
        command = commands[0]
1531
 
        self.assertIsInstance(command, command_cls)
 
2068
        self.assertEqual(length, len(commands))
 
2069
        for command in commands:
 
2070
            if isinstance(command, command_cls):
 
2071
                break
 
2072
        else:
 
2073
            self.assertIsInstance(command, command_cls)
 
2074
        if clients is not None:
 
2075
            self.assertEqual(clients, options.client)
1532
2076
        for key, value in cmd_attrs.items():
1533
2077
            self.assertEqual(value, getattr(command, key))
1534
2078
 
 
2079
    def assert_commands_from_args(self, args, commands, clients=None):
 
2080
        for cmd in commands:
 
2081
            self.assert_command_from_args(args, cmd,
 
2082
                                          length=len(commands),
 
2083
                                          clients=clients)
 
2084
 
1535
2085
    def test_is_enabled_short(self):
1536
2086
        self.assert_command_from_args(["-V", "client"],
1537
2087
                                      command.IsEnabled)
1728
2278
                                      verbose=True)
1729
2279
 
1730
2280
 
 
2281
    def test_manual_page_example_1(self):
 
2282
        self.assert_command_from_args("",
 
2283
                                      command.PrintTable,
 
2284
                                      clients=[],
 
2285
                                      verbose=False)
 
2286
 
 
2287
    def test_manual_page_example_2(self):
 
2288
        self.assert_command_from_args(
 
2289
            "--verbose foo1.example.org foo2.example.org".split(),
 
2290
            command.PrintTable, clients=["foo1.example.org",
 
2291
                                         "foo2.example.org"],
 
2292
            verbose=True)
 
2293
 
 
2294
    def test_manual_page_example_3(self):
 
2295
        self.assert_command_from_args("--enable --all".split(),
 
2296
                                      command.Enable,
 
2297
                                      clients=[])
 
2298
 
 
2299
    def test_manual_page_example_4(self):
 
2300
        self.assert_commands_from_args(
 
2301
            ("--timeout=PT5M --interval=PT1M foo1.example.org"
 
2302
             " foo2.example.org").split(),
 
2303
            [command.SetTimeout, command.SetInterval],
 
2304
            clients=["foo1.example.org", "foo2.example.org"])
 
2305
 
 
2306
    def test_manual_page_example_5(self):
 
2307
        self.assert_command_from_args("--approve --all".split(),
 
2308
                                      command.Approve,
 
2309
                                      clients=[])
 
2310
 
 
2311
 
1731
2312
class TestCommand(unittest.TestCase):
1732
2313
    """Abstract class for tests of command classes"""
1733
2314
 
1846
2427
        busname = "se.recompile.Mandos"
1847
2428
        client_interface = "se.recompile.Mandos.Client"
1848
2429
        command.Approve().run(self.bus.clients, self.bus)
 
2430
        self.assertTrue(self.bus.clients)
1849
2431
        for clientpath in self.bus.clients:
1850
2432
            self.assertIn(("Approve", busname, clientpath,
1851
2433
                           client_interface, (True,)), self.bus.calls)
1854
2436
        busname = "se.recompile.Mandos"
1855
2437
        client_interface = "se.recompile.Mandos.Client"
1856
2438
        command.Deny().run(self.bus.clients, self.bus)
 
2439
        self.assertTrue(self.bus.clients)
1857
2440
        for clientpath in self.bus.clients:
1858
2441
            self.assertIn(("Approve", busname, clientpath,
1859
2442
                           client_interface, (False,)),
1860
2443
                          self.bus.calls)
1861
2444
 
1862
2445
    def test_Remove(self):
 
2446
        busname = "se.recompile.Mandos"
 
2447
        server_path = "/"
 
2448
        server_interface = "se.recompile.Mandos"
 
2449
        orig_clients = self.bus.clients.copy()
1863
2450
        command.Remove().run(self.bus.clients, self.bus)
1864
 
        for clientpath in self.bus.clients:
1865
 
            self.assertIn(("RemoveClient", dbus_busname,
1866
 
                           dbus_server_path, dbus_server_interface,
 
2451
        self.assertFalse(self.bus.clients)
 
2452
        for clientpath in orig_clients:
 
2453
            self.assertIn(("RemoveClient", busname,
 
2454
                           server_path, server_interface,
1867
2455
                           (clientpath,)), self.bus.calls)
1868
2456
 
1869
2457
    expected_json = {
2071
2659
        else:
2072
2660
            cmd_args = [() for x in range(len(self.values_to_get))]
2073
2661
            values_to_get = self.values_to_get
 
2662
        self.assertTrue(values_to_get)
2074
2663
        for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2075
2664
            for clientpath in self.bus.clients:
2076
2665
                self.bus.clients[clientpath][self.propname] = (
2077
2666
                    Unique())
2078
2667
            self.command(*cmd_arg).run(self.bus.clients, self.bus)
 
2668
            self.assertTrue(self.bus.clients)
2079
2669
            for clientpath in self.bus.clients:
2080
2670
                value = (self.bus.clients[clientpath]
2081
2671
                         [self.propname])
2140
2730
class TestSetSecretCmd(TestPropertySetterCmd):
2141
2731
    command = command.SetSecret
2142
2732
    propname = "Secret"
2143
 
    values_to_set = [io.BytesIO(b""),
2144
 
                     io.BytesIO(b"secret\0xyzzy\nbar")]
2145
 
    values_to_get = [f.getvalue() for f in values_to_set]
 
2733
    def __init__(self, *args, **kwargs):
 
2734
        self.values_to_set = [io.BytesIO(b""),
 
2735
                              io.BytesIO(b"secret\0xyzzy\nbar")]
 
2736
        self.values_to_get = [f.getvalue() for f in
 
2737
                              self.values_to_set]
 
2738
        super(TestSetSecretCmd, self).__init__(*args, **kwargs)
2146
2739
 
2147
2740
 
2148
2741
class TestSetTimeoutCmd(TestPropertySetterCmd):
2201
2794
 
2202
2795
 
2203
2796
 
2204
 
def should_only_run_tests():
 
2797
def parse_test_args():
 
2798
    # type: () -> argparse.Namespace
2205
2799
    parser = argparse.ArgumentParser(add_help=False)
2206
 
    parser.add_argument("--check", action='store_true')
 
2800
    parser.add_argument("--check", action="store_true")
 
2801
    parser.add_argument("--prefix", )
2207
2802
    args, unknown_args = parser.parse_known_args()
2208
 
    run_tests = args.check
2209
 
    if run_tests:
2210
 
        # Remove --check argument from sys.argv
 
2803
    if args.check:
 
2804
        # Remove test options from sys.argv
2211
2805
        sys.argv[1:] = unknown_args
2212
 
    return run_tests
 
2806
    return args
2213
2807
 
2214
2808
# Add all tests from doctest strings
2215
2809
def load_tests(loader, tests, none):
2218
2812
    return tests
2219
2813
 
2220
2814
if __name__ == "__main__":
 
2815
    options = parse_test_args()
2221
2816
    try:
2222
 
        if should_only_run_tests():
2223
 
            # Call using ./tdd-python-script --check [--verbose]
2224
 
            unittest.main()
 
2817
        if options.check:
 
2818
            extra_test_prefix = options.prefix
 
2819
            if extra_test_prefix is not None:
 
2820
                if not (unittest.main(argv=[""], exit=False)
 
2821
                        .result.wasSuccessful()):
 
2822
                    sys.exit(1)
 
2823
                class ExtraTestLoader(unittest.TestLoader):
 
2824
                    testMethodPrefix = extra_test_prefix
 
2825
                # Call using ./scriptname --check [--verbose]
 
2826
                unittest.main(argv=[""], testLoader=ExtraTestLoader())
 
2827
            else:
 
2828
                unittest.main(argv=[""])
2225
2829
        else:
2226
2830
            main()
2227
2831
    finally:
2228
2832
        logging.shutdown()
 
2833
 
 
2834
# Local Variables:
 
2835
# run-tests:
 
2836
# (lambda (&optional extra)
 
2837
#   (if (not (funcall run-tests-in-test-buffer default-directory
 
2838
#             extra))
 
2839
#       (funcall show-test-buffer-in-test-window)
 
2840
#     (funcall remove-test-window)
 
2841
#     (if extra (message "Extra tests run successfully!"))))
 
2842
# run-tests-in-test-buffer:
 
2843
# (lambda (dir &optional extra)
 
2844
#   (with-current-buffer (get-buffer-create "*Test*")
 
2845
#     (setq buffer-read-only nil
 
2846
#           default-directory dir)
 
2847
#     (erase-buffer)
 
2848
#     (compilation-mode))
 
2849
#   (let ((process-result
 
2850
#          (let ((inhibit-read-only t))
 
2851
#            (process-file-shell-command
 
2852
#             (funcall get-command-line extra) nil "*Test*"))))
 
2853
#     (and (numberp process-result)
 
2854
#          (= process-result 0))))
 
2855
# get-command-line:
 
2856
# (lambda (&optional extra)
 
2857
#   (let ((quoted-script
 
2858
#          (shell-quote-argument (funcall get-script-name))))
 
2859
#     (format
 
2860
#      (concat "%s --check" (if extra " --prefix=atest" ""))
 
2861
#      quoted-script)))
 
2862
# get-script-name:
 
2863
# (lambda ()
 
2864
#   (if (fboundp 'file-local-name)
 
2865
#       (file-local-name (buffer-file-name))
 
2866
#     (or (file-remote-p (buffer-file-name) 'localname)
 
2867
#         (buffer-file-name))))
 
2868
# remove-test-window:
 
2869
# (lambda ()
 
2870
#   (let ((test-window (get-buffer-window "*Test*")))
 
2871
#     (if test-window (delete-window test-window))))
 
2872
# show-test-buffer-in-test-window:
 
2873
# (lambda ()
 
2874
#   (when (not (get-buffer-window-list "*Test*"))
 
2875
#     (setq next-error-last-buffer (get-buffer "*Test*"))
 
2876
#     (let* ((side (if (>= (window-width) 146) 'right 'bottom))
 
2877
#            (display-buffer-overriding-action
 
2878
#             `((display-buffer-in-side-window) (side . ,side)
 
2879
#               (window-height . fit-window-to-buffer)
 
2880
#               (window-width . fit-window-to-buffer))))
 
2881
#       (display-buffer "*Test*"))))
 
2882
# eval:
 
2883
# (progn
 
2884
#   (let* ((run-extra-tests (lambda () (interactive)
 
2885
#                             (funcall run-tests t)))
 
2886
#          (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
 
2887
#          (outer-keymap `(keymap (3 . ,inner-keymap))))     ; C-c
 
2888
#     (setq minor-mode-overriding-map-alist
 
2889
#           (cons `(run-tests . ,outer-keymap)
 
2890
#                 minor-mode-overriding-map-alist)))
 
2891
#   (add-hook 'after-save-hook run-tests 90 t))
 
2892
# End: