/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-31 04:47:26 UTC
  • Revision ID: teddy@recompile.se-20190331044726-po11kmjmug07mw00
mandos-ctl: Remove import of unused module

* mandos-ctl: Remove "import abc".

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/python3 -bbI
2
 
# -*- coding: utf-8; lexical-binding: t -*-
3
 
#
4
 
# Mandos Control - Control or query the Mandos server
5
 
#
6
 
# Copyright © 2008-2022 Teddy Hogeborn
7
 
# Copyright © 2008-2022 Björn Påhlsson
 
1
#!/usr/bin/python
 
2
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
 
3
#
 
4
# Mandos Monitor - Control and monitor the Mandos server
 
5
#
 
6
# Copyright © 2008-2019 Teddy Hogeborn
 
7
# Copyright © 2008-2019 Björn Påhlsson
8
8
#
9
9
# This file is part of Mandos.
10
10
#
23
23
#
24
24
# Contact the authors at <mandos@recompile.se>.
25
25
#
 
26
 
26
27
from __future__ import (division, absolute_import, print_function,
27
28
                        unicode_literals)
28
29
 
32
33
    pass
33
34
 
34
35
import sys
35
 
import unittest
36
36
import argparse
37
 
import logging
38
 
import os
39
37
import locale
40
38
import datetime
41
39
import re
 
40
import os
42
41
import collections
43
42
import json
 
43
import unittest
 
44
import logging
44
45
import io
45
46
import tempfile
46
47
import contextlib
47
48
 
48
 
if sys.version_info.major == 2:
49
 
    __metaclass__ = type
50
 
    str = unicode
51
 
    input = raw_input
52
 
 
53
 
 
54
 
class gi:
55
 
    """Dummy gi module, for the tests"""
56
 
    class repository:
57
 
        class GLib:
58
 
            class Error(Exception):
59
 
                pass
60
 
 
61
 
 
62
 
dbussy = None
63
 
ravel = None
64
 
dbus_python = None
65
 
pydbus = None
66
 
 
67
 
try:
68
 
    import dbussy
69
 
    import ravel
70
 
except ImportError:
71
 
    try:
72
 
        import pydbus
73
 
        import gi
74
 
    except ImportError:
75
 
        import dbus as dbus_python
76
 
 
 
49
import dbus as dbus_python
77
50
 
78
51
# Show warnings by default
79
52
if not sys.warnoptions:
80
53
    import warnings
81
54
    warnings.simplefilter("default")
82
55
 
83
 
log = logging.getLogger(os.path.basename(sys.argv[0]))
84
 
logging.basicConfig(level="INFO",         # Show info level messages
 
56
log = logging.getLogger(sys.argv[0])
 
57
logging.basicConfig(level="INFO", # Show info level messages
85
58
                    format="%(message)s") # Show basic log messages
86
59
 
87
60
logging.captureWarnings(True)   # Show warnings via the logging system
88
61
 
89
62
if sys.version_info.major == 2:
 
63
    str = unicode
90
64
    import StringIO
91
65
    io.StringIO = StringIO.StringIO
92
66
 
93
67
locale.setlocale(locale.LC_ALL, "")
94
68
 
95
 
version = "1.8.16"
 
69
version = "1.8.3"
96
70
 
97
71
 
98
72
def main():
105
79
    clientnames = options.client
106
80
 
107
81
    if options.debug:
108
 
        logging.getLogger("").setLevel(logging.DEBUG)
 
82
        log.setLevel(logging.DEBUG)
109
83
 
110
 
    if dbussy is not None and ravel is not None:
111
 
        bus = dbussy_adapter.CachingBus(dbussy, ravel)
112
 
    elif pydbus is not None:
113
 
        bus = pydbus_adapter.CachingBus(pydbus)
114
 
    else:
115
 
        bus = dbus_python_adapter.CachingBus(dbus_python)
 
84
    bus = dbus_python_adapter.CachingBus(dbus_python)
116
85
 
117
86
    try:
118
87
        all_clients = bus.get_clients_and_properties()
259
228
        return rfc3339_duration_to_delta(interval)
260
229
    except ValueError as e:
261
230
        log.warning("%s - Parsing as pre-1.6.1 interval instead",
262
 
                    " ".join(e.args))
 
231
                    ' '.join(e.args))
263
232
    return parse_pre_1_6_1_interval(interval)
264
233
 
265
234
 
266
235
def rfc3339_duration_to_delta(duration):
267
236
    """Parse an RFC 3339 "duration" and return a datetime.timedelta
268
237
 
269
 
    >>> rfc3339_duration_to_delta("P7D") == datetime.timedelta(7)
270
 
    True
271
 
    >>> rfc3339_duration_to_delta("PT60S") == datetime.timedelta(0, 60)
272
 
    True
273
 
    >>> rfc3339_duration_to_delta("PT60M") == datetime.timedelta(hours=1)
274
 
    True
275
 
    >>> # 60 months
276
 
    >>> rfc3339_duration_to_delta("P60M") == datetime.timedelta(1680)
277
 
    True
278
 
    >>> rfc3339_duration_to_delta("PT24H") == datetime.timedelta(1)
279
 
    True
280
 
    >>> rfc3339_duration_to_delta("P1W") == datetime.timedelta(7)
281
 
    True
282
 
    >>> rfc3339_duration_to_delta("PT5M30S") == datetime.timedelta(0, 330)
283
 
    True
284
 
    >>> rfc3339_duration_to_delta("P1DT3M20S") == datetime.timedelta(1, 200)
285
 
    True
 
238
    >>> rfc3339_duration_to_delta("P7D")
 
239
    datetime.timedelta(7)
 
240
    >>> rfc3339_duration_to_delta("PT60S")
 
241
    datetime.timedelta(0, 60)
 
242
    >>> rfc3339_duration_to_delta("PT60M")
 
243
    datetime.timedelta(0, 3600)
 
244
    >>> rfc3339_duration_to_delta("P60M")
 
245
    datetime.timedelta(1680)
 
246
    >>> rfc3339_duration_to_delta("PT24H")
 
247
    datetime.timedelta(1)
 
248
    >>> rfc3339_duration_to_delta("P1W")
 
249
    datetime.timedelta(7)
 
250
    >>> rfc3339_duration_to_delta("PT5M30S")
 
251
    datetime.timedelta(0, 330)
 
252
    >>> rfc3339_duration_to_delta("P1DT3M20S")
 
253
    datetime.timedelta(1, 200)
286
254
    >>> # Can not be empty:
287
255
    >>> rfc3339_duration_to_delta("")
288
256
    Traceback (most recent call last):
395
363
 
396
364
 
397
365
def parse_pre_1_6_1_interval(interval):
398
 
    r"""Parse an interval string as documented by Mandos before 1.6.1,
 
366
    """Parse an interval string as documented by Mandos before 1.6.1,
399
367
    and return a datetime.timedelta
400
368
 
401
 
    >>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7)
402
 
    True
403
 
    >>> parse_pre_1_6_1_interval("60s") == datetime.timedelta(0, 60)
404
 
    True
405
 
    >>> parse_pre_1_6_1_interval("60m") == datetime.timedelta(hours=1)
406
 
    True
407
 
    >>> parse_pre_1_6_1_interval("24h") == datetime.timedelta(days=1)
408
 
    True
409
 
    >>> parse_pre_1_6_1_interval("1w") == datetime.timedelta(days=7)
410
 
    True
411
 
    >>> parse_pre_1_6_1_interval("5m 30s") == datetime.timedelta(0, 330)
412
 
    True
413
 
    >>> parse_pre_1_6_1_interval("") == datetime.timedelta(0)
414
 
    True
 
369
    >>> parse_pre_1_6_1_interval('7d')
 
370
    datetime.timedelta(7)
 
371
    >>> parse_pre_1_6_1_interval('60s')
 
372
    datetime.timedelta(0, 60)
 
373
    >>> parse_pre_1_6_1_interval('60m')
 
374
    datetime.timedelta(0, 3600)
 
375
    >>> parse_pre_1_6_1_interval('24h')
 
376
    datetime.timedelta(1)
 
377
    >>> parse_pre_1_6_1_interval('1w')
 
378
    datetime.timedelta(7)
 
379
    >>> parse_pre_1_6_1_interval('5m 30s')
 
380
    datetime.timedelta(0, 330)
 
381
    >>> parse_pre_1_6_1_interval('')
 
382
    datetime.timedelta(0)
415
383
    >>> # Ignore unknown characters, allow any order and repetitions
416
 
    >>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") \
417
 
    ... == datetime.timedelta(2, 480, 18000)
418
 
    True
 
384
    >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
 
385
    datetime.timedelta(2, 480, 18000)
419
386
 
420
387
    """
421
388
 
484
451
        parser.error("--remove can only be combined with --deny")
485
452
 
486
453
 
487
 
class dbus:
 
454
class dbus(object):
488
455
 
489
 
    class SystemBus:
 
456
    class SystemBus(object):
490
457
 
491
458
        object_manager_iface = "org.freedesktop.DBus.ObjectManager"
492
 
 
493
459
        def get_managed_objects(self, busname, objectpath):
494
460
            return self.call_method("GetManagedObjects", busname,
495
461
                                    objectpath,
496
462
                                    self.object_manager_iface)
497
463
 
498
464
        properties_iface = "org.freedesktop.DBus.Properties"
499
 
 
500
465
        def set_property(self, busname, objectpath, interface, key,
501
466
                         value):
502
467
            self.call_method("Set", busname, objectpath,
503
468
                             self.properties_iface, interface, key,
504
469
                             value)
505
470
 
506
 
        def call_method(self, methodname, busname, objectpath,
507
 
                        interface, *args):
508
 
            raise NotImplementedError()
509
471
 
510
472
    class MandosBus(SystemBus):
511
473
        busname_domain = "se.recompile"
543
505
        pass
544
506
 
545
507
 
546
 
class dbus_python_adapter:
 
508
class dbus_python_adapter(object):
547
509
 
548
510
    class SystemBus(dbus.MandosBus):
549
511
        """Use dbus-python"""
594
556
                        for key, subval in value.items()}
595
557
            return value
596
558
 
597
 
        def set_client_property(self, objectpath, key, value):
598
 
            if key == "Secret":
599
 
                if not isinstance(value, bytes):
600
 
                    value = value.encode("utf-8")
601
 
                value = self.dbus_python.ByteArray(value)
602
 
            return self.set_property(self.busname, objectpath,
603
 
                                     self.client_interface, key,
604
 
                                     value)
605
559
 
606
 
    class SilenceLogger:
 
560
    class SilenceLogger(object):
607
561
        "Simple context manager to silence a particular logger"
608
 
 
609
562
        def __init__(self, loggername):
610
563
            self.logger = logging.getLogger(loggername)
611
564
 
621
574
        def __exit__(self, exc_type, exc_val, exc_tb):
622
575
            self.logger.removeFilter(self.nullfilter)
623
576
 
 
577
 
624
578
    class CachingBus(SystemBus):
625
579
        """A caching layer for dbus_python_adapter.SystemBus"""
626
 
 
627
580
        def __init__(self, *args, **kwargs):
628
581
            self.object_cache = {}
629
582
            super(dbus_python_adapter.CachingBus,
630
583
                  self).__init__(*args, **kwargs)
631
 
 
632
584
        def get_object(self, busname, objectpath):
633
585
            try:
634
586
                return self.object_cache[(busname, objectpath)]
636
588
                new_object = super(
637
589
                    dbus_python_adapter.CachingBus,
638
590
                    self).get_object(busname, objectpath)
639
 
                self.object_cache[(busname, objectpath)] = new_object
640
 
                return new_object
641
 
 
642
 
 
643
 
class pydbus_adapter:
644
 
    class SystemBus(dbus.MandosBus):
645
 
        def __init__(self, module=pydbus):
646
 
            self.pydbus = module
647
 
            self.bus = self.pydbus.SystemBus()
648
 
 
649
 
        @contextlib.contextmanager
650
 
        def convert_exception(self, exception_class=dbus.Error):
651
 
            try:
652
 
                yield
653
 
            except gi.repository.GLib.Error as e:
654
 
                # This does what "raise from" would do
655
 
                exc = exception_class(*e.args)
656
 
                exc.__cause__ = e
657
 
                raise exc
658
 
 
659
 
        def call_method(self, methodname, busname, objectpath,
660
 
                        interface, *args):
661
 
            proxy_object = self.get(busname, objectpath)
662
 
            log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
663
 
                      interface, methodname,
664
 
                      ", ".join(repr(a) for a in args))
665
 
            method = getattr(proxy_object[interface], methodname)
666
 
            with self.convert_exception():
667
 
                return method(*args)
668
 
 
669
 
        def get(self, busname, objectpath):
670
 
            log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
671
 
                      busname, objectpath)
672
 
            with self.convert_exception(dbus.ConnectFailed):
673
 
                if sys.version_info.major <= 2:
674
 
                    with warnings.catch_warnings():
675
 
                        warnings.filterwarnings(
676
 
                            "ignore", "", DeprecationWarning,
677
 
                            r"^xml\.etree\.ElementTree$")
678
 
                        return self.bus.get(busname, objectpath)
679
 
                else:
680
 
                    return self.bus.get(busname, objectpath)
681
 
 
682
 
        def set_property(self, busname, objectpath, interface, key,
683
 
                         value):
684
 
            proxy_object = self.get(busname, objectpath)
685
 
            log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
686
 
                      objectpath, self.properties_iface, interface,
687
 
                      key, value)
688
 
            setattr(proxy_object[interface], key, value)
689
 
 
690
 
    class CachingBus(SystemBus):
691
 
        """A caching layer for pydbus_adapter.SystemBus"""
692
 
 
693
 
        def __init__(self, *args, **kwargs):
694
 
            self.object_cache = {}
695
 
            super(pydbus_adapter.CachingBus,
696
 
                  self).__init__(*args, **kwargs)
697
 
 
698
 
        def get(self, busname, objectpath):
699
 
            try:
700
 
                return self.object_cache[(busname, objectpath)]
701
 
            except KeyError:
702
 
                new_object = (super(pydbus_adapter.CachingBus, self)
703
 
                              .get(busname, objectpath))
704
 
                self.object_cache[(busname, objectpath)] = new_object
705
 
                return new_object
706
 
 
707
 
 
708
 
class dbussy_adapter:
709
 
    class SystemBus(dbus.SystemBus):
710
 
        """Use DBussy"""
711
 
 
712
 
        def __init__(self, dbussy, ravel):
713
 
            self.dbussy = dbussy
714
 
            self.ravel = ravel
715
 
            self.bus = ravel.system_bus()
716
 
 
717
 
        @contextlib.contextmanager
718
 
        def convert_exception(self, exception_class=dbus.Error):
719
 
            try:
720
 
                yield
721
 
            except self.dbussy.DBusError as e:
722
 
                # This does what "raise from" would do
723
 
                exc = exception_class(*e.args)
724
 
                exc.__cause__ = e
725
 
                raise exc
726
 
 
727
 
        def call_method(self, methodname, busname, objectpath,
728
 
                        interface, *args):
729
 
            proxy_object = self.get_object(busname, objectpath)
730
 
            log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
731
 
                      interface, methodname,
732
 
                      ", ".join(repr(a) for a in args))
733
 
            iface = proxy_object.get_interface(interface)
734
 
            method = getattr(iface, methodname)
735
 
            with self.convert_exception(dbus.Error):
736
 
                value = method(*args)
737
 
            # DBussy returns values either as an empty list or as a
738
 
            # list of one element with the return value
739
 
            if value:
740
 
                return self.type_filter(value[0])
741
 
 
742
 
        def get_object(self, busname, objectpath):
743
 
            log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
744
 
                      busname, objectpath)
745
 
            with self.convert_exception(dbus.ConnectFailed):
746
 
                return self.bus[busname][objectpath]
747
 
 
748
 
        def type_filter(self, value):
749
 
            """Convert the most bothersome types to Python types"""
750
 
            # A D-Bus Variant value is represented as the Python type
751
 
            # Tuple[dbussy.DBUS.Signature, Any]
752
 
            if isinstance(value, tuple):
753
 
                if (len(value) == 2
754
 
                    and isinstance(value[0],
755
 
                                   self.dbussy.DBUS.Signature)):
756
 
                    return self.type_filter(value[1])
757
 
            elif isinstance(value, self.dbussy.DBUS.ObjectPath):
758
 
                return str(value)
759
 
            # Also recurse into dictionaries
760
 
            elif isinstance(value, dict):
761
 
                return {self.type_filter(key):
762
 
                        self.type_filter(subval)
763
 
                        for key, subval in value.items()}
764
 
            return value
765
 
 
766
 
        def set_property(self, busname, objectpath, interface, key,
767
 
                         value):
768
 
            proxy_object = self.get_object(busname, objectpath)
769
 
            log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
770
 
                      objectpath, self.properties_iface, interface,
771
 
                      key, value)
772
 
            if key == "Secret":
773
 
                # DBussy wants a Byte Array to be a sequence of
774
 
                # values, not a byte string
775
 
                value = tuple(value)
776
 
            setattr(proxy_object.get_interface(interface), key, value)
777
 
 
778
 
    class MandosBus(SystemBus, dbus.MandosBus):
779
 
        pass
780
 
 
781
 
    class CachingBus(MandosBus):
782
 
        """A caching layer for dbussy_adapter.MandosBus"""
783
 
 
784
 
        def __init__(self, *args, **kwargs):
785
 
            self.object_cache = {}
786
 
            super(dbussy_adapter.CachingBus, self).__init__(*args,
787
 
                                                            **kwargs)
788
 
 
789
 
        def get_object(self, busname, objectpath):
790
 
            try:
791
 
                return self.object_cache[(busname, objectpath)]
792
 
            except KeyError:
793
 
                new_object = super(
794
 
                    dbussy_adapter.CachingBus,
795
 
                    self).get_object(busname, objectpath)
796
 
                self.object_cache[(busname, objectpath)] = new_object
 
591
                self.object_cache[(busname, objectpath)]  = new_object
797
592
                return new_object
798
593
 
799
594
 
830
625
    return commands
831
626
 
832
627
 
833
 
class command:
 
628
class command(object):
834
629
    """A namespace for command classes"""
835
630
 
836
 
    class Base:
 
631
    class Base(object):
837
632
        """Abstract base class for commands"""
838
 
 
839
633
        def run(self, clients, bus=None):
840
634
            """Normal commands should implement run_on_one_client(),
841
635
but commands which want to operate on all clients at the same time can
845
639
            for client, properties in clients.items():
846
640
                self.run_on_one_client(client, properties)
847
641
 
 
642
 
848
643
    class IsEnabled(Base):
849
644
        def run(self, clients, bus=None):
850
645
            properties = next(iter(clients.values()))
852
647
                sys.exit(0)
853
648
            sys.exit(1)
854
649
 
 
650
 
855
651
    class Approve(Base):
856
652
        def run_on_one_client(self, client, properties):
857
653
            self.bus.call_client_method(client, "Approve", True)
858
654
 
 
655
 
859
656
    class Deny(Base):
860
657
        def run_on_one_client(self, client, properties):
861
658
            self.bus.call_client_method(client, "Approve", False)
862
659
 
 
660
 
863
661
    class Remove(Base):
864
662
        def run(self, clients, bus):
865
663
            for clientpath in frozenset(clients.keys()):
866
664
                bus.call_server_method("RemoveClient", clientpath)
867
665
 
 
666
 
868
667
    class Output(Base):
869
668
        """Abstract class for commands outputting client details"""
870
669
        all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
876
675
                        "Checker", "ExtendedTimeout", "Expires",
877
676
                        "LastCheckerStatus")
878
677
 
 
678
 
879
679
    class DumpJSON(Output):
880
680
        def run(self, clients, bus=None):
881
681
            data = {properties["Name"]:
882
682
                    {key: properties[key]
883
683
                     for key in self.all_keywords}
884
684
                    for properties in clients.values()}
885
 
            print(json.dumps(data, indent=4, separators=(",", ": ")))
 
685
            print(json.dumps(data, indent=4, separators=(',', ': ')))
 
686
 
886
687
 
887
688
    class PrintTable(Output):
888
689
        def __init__(self, verbose=False):
896
697
                keywords = self.all_keywords
897
698
            print(self.TableOfClients(clients.values(), keywords))
898
699
 
899
 
        class TableOfClients:
 
700
        class TableOfClients(object):
900
701
            tableheaders = {
901
702
                "Name": "Name",
902
703
                "Enabled": "Enabled",
929
730
 
930
731
            if sys.version_info.major == 2:
931
732
                __unicode__ = __str__
932
 
 
933
733
                def __str__(self):
934
734
                    return str(self).encode(
935
735
                        locale.getpreferredencoding())
981
781
                                minutes=(td.seconds % 3600) // 60,
982
782
                                seconds=td.seconds % 60))
983
783
 
 
784
 
984
785
    class PropertySetter(Base):
985
786
        "Abstract class for Actions for setting one client property"
986
787
 
993
794
        def propname(self):
994
795
            raise NotImplementedError()
995
796
 
 
797
 
996
798
    class Enable(PropertySetter):
997
799
        propname = "Enabled"
998
800
        value_to_set = True
999
801
 
 
802
 
1000
803
    class Disable(PropertySetter):
1001
804
        propname = "Enabled"
1002
805
        value_to_set = False
1003
806
 
 
807
 
1004
808
    class BumpTimeout(PropertySetter):
1005
809
        propname = "LastCheckedOK"
1006
810
        value_to_set = ""
1007
811
 
 
812
 
1008
813
    class StartChecker(PropertySetter):
1009
814
        propname = "CheckerRunning"
1010
815
        value_to_set = True
1011
816
 
 
817
 
1012
818
    class StopChecker(PropertySetter):
1013
819
        propname = "CheckerRunning"
1014
820
        value_to_set = False
1015
821
 
 
822
 
1016
823
    class ApproveByDefault(PropertySetter):
1017
824
        propname = "ApprovedByDefault"
1018
825
        value_to_set = True
1019
826
 
 
827
 
1020
828
    class DenyByDefault(PropertySetter):
1021
829
        propname = "ApprovedByDefault"
1022
830
        value_to_set = False
1023
831
 
 
832
 
1024
833
    class PropertySetterValue(PropertySetter):
1025
834
        """Abstract class for PropertySetter recieving a value as
1026
835
constructor argument instead of a class attribute."""
1027
 
 
1028
836
        def __init__(self, value):
1029
837
            self.value_to_set = value
1030
838
 
1037
845
    class SetChecker(PropertySetterValue):
1038
846
        propname = "Checker"
1039
847
 
 
848
 
1040
849
    class SetHost(PropertySetterValue):
1041
850
        propname = "Host"
1042
851
 
 
852
 
1043
853
    class SetSecret(PropertySetterValue):
1044
854
        propname = "Secret"
1045
855
 
1053
863
            self._vts = value.read()
1054
864
            value.close()
1055
865
 
 
866
 
1056
867
    class PropertySetterValueMilliseconds(PropertySetterValue):
1057
868
        """Abstract class for PropertySetterValue taking a value
1058
869
argument as a datetime.timedelta() but should store it as
1067
878
            "When setting, convert value from a datetime.timedelta"
1068
879
            self._vts = int(round(value.total_seconds() * 1000))
1069
880
 
 
881
 
1070
882
    class SetTimeout(PropertySetterValueMilliseconds):
1071
883
        propname = "Timeout"
1072
884
 
 
885
 
1073
886
    class SetExtendedTimeout(PropertySetterValueMilliseconds):
1074
887
        propname = "ExtendedTimeout"
1075
888
 
 
889
 
1076
890
    class SetInterval(PropertySetterValueMilliseconds):
1077
891
        propname = "Interval"
1078
892
 
 
893
 
1079
894
    class SetApprovalDelay(PropertySetterValueMilliseconds):
1080
895
        propname = "ApprovalDelay"
1081
896
 
 
897
 
1082
898
    class SetApprovalDuration(PropertySetterValueMilliseconds):
1083
899
        propname = "ApprovalDuration"
1084
900
 
1118
934
                                                     "output"))
1119
935
 
1120
936
 
1121
 
class Unique:
 
937
class Unique(object):
1122
938
    """Class for objects which exist only to be unique objects, since
1123
939
unittest.mock.sentinel only exists in Python 3.3"""
1124
940
 
1408
1224
class Test_dbus_python_adapter_SystemBus(TestCaseWithAssertLogs):
1409
1225
 
1410
1226
    def MockDBusPython_func(self, func):
1411
 
        class mock_dbus_python:
 
1227
        class mock_dbus_python(object):
1412
1228
            """mock dbus-python module"""
1413
 
            class exceptions:
 
1229
            class exceptions(object):
1414
1230
                """Pseudo-namespace"""
1415
1231
                class DBusException(Exception):
1416
1232
                    pass
1417
 
            class SystemBus:
 
1233
            class SystemBus(object):
1418
1234
                @staticmethod
1419
1235
                def get_object(busname, objectpath):
1420
1236
                    DBusObject = collections.namedtuple(
1421
 
                        "DBusObject", ("methodname", "Set"))
 
1237
                        "DBusObject", ("methodname",))
1422
1238
                    def method(*args, **kwargs):
1423
1239
                        self.assertEqual({"dbus_interface":
1424
1240
                                          "interface"},
1425
1241
                                         kwargs)
1426
1242
                        return func(*args)
1427
 
                    def set_property(interface, key, value,
1428
 
                                     dbus_interface=None):
1429
 
                        self.assertEqual(
1430
 
                            "org.freedesktop.DBus.Properties",
1431
 
                            dbus_interface)
1432
 
                        self.assertEqual("Secret", key)
1433
 
                        return func(interface, key, value,
1434
 
                                    dbus_interface=dbus_interface)
1435
 
                    return DBusObject(methodname=method,
1436
 
                                      Set=set_property)
1437
 
            class Boolean:
 
1243
                    return DBusObject(methodname=method)
 
1244
            class Boolean(object):
1438
1245
                def __init__(self, value):
1439
1246
                    self.value = bool(value)
1440
1247
                def __bool__(self):
1445
1252
                pass
1446
1253
            class Dictionary(dict):
1447
1254
                pass
1448
 
            class ByteArray(bytes):
1449
 
                pass
1450
1255
        return mock_dbus_python
1451
1256
 
1452
1257
    def call_method(self, bus, methodname, busname, objectpath,
1624
1429
        finally:
1625
1430
            dbus_logger.removeFilter(counting_handler)
1626
1431
 
1627
 
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
 
1432
        self.assertNotIsInstance(e, dbus.ConnectFailed)
1628
1433
 
1629
1434
        # Make sure the dbus logger was suppressed
1630
1435
        self.assertEqual(0, counting_handler.count)
1631
1436
 
1632
 
    def test_Set_Secret_sends_bytearray(self):
1633
 
        ret = [None]
1634
 
        def func(*args, **kwargs):
1635
 
            ret[0] = (args, kwargs)
1636
 
        mock_dbus_python = self.MockDBusPython_func(func)
1637
 
        bus = dbus_python_adapter.SystemBus(mock_dbus_python)
1638
 
        bus.set_client_property("objectpath", "Secret", "value")
1639
 
        expected_call = (("se.recompile.Mandos.Client", "Secret",
1640
 
                          mock_dbus_python.ByteArray(b"value")),
1641
 
                         {"dbus_interface":
1642
 
                          "org.freedesktop.DBus.Properties"})
1643
 
        self.assertEqual(expected_call, ret[0])
1644
 
        if sys.version_info.major == 2:
1645
 
            self.assertIsInstance(ret[0][0][-1],
1646
 
                                  mock_dbus_python.ByteArray)
1647
 
 
1648
1437
    def test_get_object_converts_to_correct_exception(self):
1649
1438
        bus = dbus_python_adapter.SystemBus(
1650
1439
            self.fake_dbus_python_raises_exception_on_connect)
1652
1441
            self.call_method(bus, "methodname", "busname",
1653
1442
                             "objectpath", "interface")
1654
1443
 
1655
 
    class fake_dbus_python_raises_exception_on_connect:
 
1444
    class fake_dbus_python_raises_exception_on_connect(object):
1656
1445
        """fake dbus-python module"""
1657
 
        class exceptions:
 
1446
        class exceptions(object):
1658
1447
            """Pseudo-namespace"""
1659
1448
            class DBusException(Exception):
1660
1449
                pass
1668
1457
 
1669
1458
 
1670
1459
class Test_dbus_python_adapter_CachingBus(unittest.TestCase):
1671
 
    class mock_dbus_python:
 
1460
    class mock_dbus_python(object):
1672
1461
        """mock dbus-python modules"""
1673
 
        class SystemBus:
 
1462
        class SystemBus(object):
1674
1463
            @staticmethod
1675
1464
            def get_object(busname, objectpath):
1676
1465
                return Unique()
1719
1508
        self.assertIs(obj1, obj1b)
1720
1509
 
1721
1510
 
1722
 
class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs):
1723
 
 
1724
 
    def Stub_pydbus_func(self, func):
1725
 
        class stub_pydbus:
1726
 
            """stub pydbus module"""
1727
 
            class SystemBus:
1728
 
                @staticmethod
1729
 
                def get(busname, objectpath):
1730
 
                    DBusObject = collections.namedtuple(
1731
 
                        "DBusObject", ("methodname",))
1732
 
                    return {"interface":
1733
 
                            DBusObject(methodname=func)}
1734
 
        return stub_pydbus
1735
 
 
1736
 
    def call_method(self, bus, methodname, busname, objectpath,
1737
 
                    interface, *args):
1738
 
        with self.assertLogs(log, logging.DEBUG):
1739
 
            return bus.call_method(methodname, busname, objectpath,
1740
 
                                   interface, *args)
1741
 
 
1742
 
    def test_call_method_returns(self):
1743
 
        expected_method_return = Unique()
1744
 
        method_args = (Unique(), Unique())
1745
 
        def func(*args):
1746
 
            self.assertEqual(len(method_args), len(args))
1747
 
            for marg, arg in zip(method_args, args):
1748
 
                self.assertIs(marg, arg)
1749
 
            return expected_method_return
1750
 
        stub_pydbus = self.Stub_pydbus_func(func)
1751
 
        bus = pydbus_adapter.SystemBus(stub_pydbus)
1752
 
        ret = self.call_method(bus, "methodname", "busname",
1753
 
                               "objectpath", "interface",
1754
 
                               *method_args)
1755
 
        self.assertIs(ret, expected_method_return)
1756
 
 
1757
 
    def test_call_method_handles_exception(self):
1758
 
        def func():
1759
 
            raise gi.repository.GLib.Error()
1760
 
 
1761
 
        stub_pydbus = self.Stub_pydbus_func(func)
1762
 
        bus = pydbus_adapter.SystemBus(stub_pydbus)
1763
 
 
1764
 
        with self.assertRaises(dbus.Error) as e:
1765
 
            self.call_method(bus, "methodname", "busname",
1766
 
                             "objectpath", "interface")
1767
 
 
1768
 
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
1769
 
 
1770
 
    def test_get_converts_to_correct_exception(self):
1771
 
        bus = pydbus_adapter.SystemBus(
1772
 
            self.fake_pydbus_raises_exception_on_connect)
1773
 
        with self.assertRaises(dbus.ConnectFailed):
1774
 
            self.call_method(bus, "methodname", "busname",
1775
 
                             "objectpath", "interface")
1776
 
 
1777
 
    class fake_pydbus_raises_exception_on_connect:
1778
 
        """fake dbus-python module"""
1779
 
        @classmethod
1780
 
        def SystemBus(cls):
1781
 
            def get(busname, objectpath):
1782
 
                raise gi.repository.GLib.Error()
1783
 
            Bus = collections.namedtuple("Bus", ["get"])
1784
 
            return Bus(get=get)
1785
 
 
1786
 
    def test_set_property_uses_setattr(self):
1787
 
        class Object:
1788
 
            pass
1789
 
        obj = Object()
1790
 
        class pydbus_spy:
1791
 
            class SystemBus:
1792
 
                @staticmethod
1793
 
                def get(busname, objectpath):
1794
 
                    return {"interface": obj}
1795
 
        bus = pydbus_adapter.SystemBus(pydbus_spy)
1796
 
        value = Unique()
1797
 
        bus.set_property("busname", "objectpath", "interface", "key",
1798
 
                         value)
1799
 
        self.assertIs(value, obj.key)
1800
 
 
1801
 
    def test_get_suppresses_xml_deprecation_warning(self):
1802
 
        if sys.version_info.major >= 3:
1803
 
            return
1804
 
        class stub_pydbus_get:
1805
 
            class SystemBus:
1806
 
                @staticmethod
1807
 
                def get(busname, objectpath):
1808
 
                    warnings.warn_explicit(
1809
 
                        "deprecated", DeprecationWarning,
1810
 
                        "xml.etree.ElementTree", 0)
1811
 
        bus = pydbus_adapter.SystemBus(stub_pydbus_get)
1812
 
        with warnings.catch_warnings(record=True) as w:
1813
 
            warnings.simplefilter("always")
1814
 
            bus.get("busname", "objectpath")
1815
 
            self.assertEqual(0, len(w))
1816
 
 
1817
 
 
1818
 
class Test_pydbus_adapter_CachingBus(unittest.TestCase):
1819
 
    class stub_pydbus:
1820
 
        """stub pydbus module"""
1821
 
        class SystemBus:
1822
 
            @staticmethod
1823
 
            def get(busname, objectpath):
1824
 
                return Unique()
1825
 
 
1826
 
    def setUp(self):
1827
 
        self.bus = pydbus_adapter.CachingBus(self.stub_pydbus)
1828
 
 
1829
 
    def test_returns_distinct_objectpaths(self):
1830
 
        obj1 = self.bus.get("busname", "objectpath1")
1831
 
        self.assertIsInstance(obj1, Unique)
1832
 
        obj2 = self.bus.get("busname", "objectpath2")
1833
 
        self.assertIsInstance(obj2, Unique)
1834
 
        self.assertIsNot(obj1, obj2)
1835
 
 
1836
 
    def test_returns_distinct_busnames(self):
1837
 
        obj1 = self.bus.get("busname1", "objectpath")
1838
 
        self.assertIsInstance(obj1, Unique)
1839
 
        obj2 = self.bus.get("busname2", "objectpath")
1840
 
        self.assertIsInstance(obj2, Unique)
1841
 
        self.assertIsNot(obj1, obj2)
1842
 
 
1843
 
    def test_returns_distinct_both(self):
1844
 
        obj1 = self.bus.get("busname1", "objectpath")
1845
 
        self.assertIsInstance(obj1, Unique)
1846
 
        obj2 = self.bus.get("busname2", "objectpath")
1847
 
        self.assertIsInstance(obj2, Unique)
1848
 
        self.assertIsNot(obj1, obj2)
1849
 
 
1850
 
    def test_returns_same(self):
1851
 
        obj1 = self.bus.get("busname", "objectpath")
1852
 
        self.assertIsInstance(obj1, Unique)
1853
 
        obj2 = self.bus.get("busname", "objectpath")
1854
 
        self.assertIsInstance(obj2, Unique)
1855
 
        self.assertIs(obj1, obj2)
1856
 
 
1857
 
    def test_returns_same_old(self):
1858
 
        obj1 = self.bus.get("busname1", "objectpath1")
1859
 
        self.assertIsInstance(obj1, Unique)
1860
 
        obj2 = self.bus.get("busname2", "objectpath2")
1861
 
        self.assertIsInstance(obj2, Unique)
1862
 
        obj1b = self.bus.get("busname1", "objectpath1")
1863
 
        self.assertIsInstance(obj1b, Unique)
1864
 
        self.assertIsNot(obj1, obj2)
1865
 
        self.assertIsNot(obj2, obj1b)
1866
 
        self.assertIs(obj1, obj1b)
1867
 
 
1868
 
 
1869
 
class Test_dbussy_adapter_SystemBus(TestCaseWithAssertLogs):
1870
 
 
1871
 
    class dummy_dbussy:
1872
 
        class DBUS:
1873
 
            class ObjectPath(str):
1874
 
                pass
1875
 
        class DBusError(Exception):
1876
 
            pass
1877
 
 
1878
 
    def fake_ravel_func(self, func):
1879
 
        class fake_ravel:
1880
 
            @staticmethod
1881
 
            def system_bus():
1882
 
                class DBusInterfaceProxy:
1883
 
                    @staticmethod
1884
 
                    def methodname(*args):
1885
 
                        return [func(*args)]
1886
 
                class DBusObject:
1887
 
                    @staticmethod
1888
 
                    def get_interface(interface):
1889
 
                        if interface == "interface":
1890
 
                            return DBusInterfaceProxy()
1891
 
                return {"busname": {"objectpath": DBusObject()}}
1892
 
        return fake_ravel
1893
 
 
1894
 
    def call_method(self, bus, methodname, busname, objectpath,
1895
 
                    interface, *args):
1896
 
        with self.assertLogs(log, logging.DEBUG):
1897
 
            return bus.call_method(methodname, busname, objectpath,
1898
 
                                   interface, *args)
1899
 
 
1900
 
    def test_call_method_returns(self):
1901
 
        expected_method_return = Unique()
1902
 
        method_args = (Unique(), Unique())
1903
 
        def func(*args):
1904
 
            self.assertEqual(len(method_args), len(args))
1905
 
            for marg, arg in zip(method_args, args):
1906
 
                self.assertIs(marg, arg)
1907
 
            return expected_method_return
1908
 
        fake_ravel = self.fake_ravel_func(func)
1909
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1910
 
        ret = self.call_method(bus, "methodname", "busname",
1911
 
                               "objectpath", "interface",
1912
 
                               *method_args)
1913
 
        self.assertIs(ret, expected_method_return)
1914
 
 
1915
 
    def test_call_method_filters_objectpath(self):
1916
 
        def func():
1917
 
            return method_return
1918
 
        fake_ravel = self.fake_ravel_func(func)
1919
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1920
 
        method_return = (self.dummy_dbussy.DBUS
1921
 
                         .ObjectPath("objectpath"))
1922
 
        ret = self.call_method(bus, "methodname", "busname",
1923
 
                               "objectpath", "interface")
1924
 
        self.assertEqual("objectpath", ret)
1925
 
        self.assertNotIsInstance(ret,
1926
 
                                 self.dummy_dbussy.DBUS.ObjectPath)
1927
 
 
1928
 
    def test_call_method_filters_objectpaths_in_dict(self):
1929
 
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1930
 
        def func():
1931
 
            return method_return
1932
 
        fake_ravel = self.fake_ravel_func(func)
1933
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1934
 
        method_return = {
1935
 
            ObjectPath("objectpath_key_1"):
1936
 
            ObjectPath("objectpath_value_1"),
1937
 
            ObjectPath("objectpath_key_2"):
1938
 
            ObjectPath("objectpath_value_2"),
1939
 
        }
1940
 
        ret = self.call_method(bus, "methodname", "busname",
1941
 
                               "objectpath", "interface")
1942
 
        expected_method_return = {str(key): str(value)
1943
 
                                  for key, value in
1944
 
                                  method_return.items()}
1945
 
        for key, value in ret.items():
1946
 
            self.assertNotIsInstance(key, ObjectPath)
1947
 
            self.assertNotIsInstance(value, ObjectPath)
1948
 
        self.assertEqual(expected_method_return, ret)
1949
 
        self.assertIsInstance(ret, dict)
1950
 
 
1951
 
    def test_call_method_filters_objectpaths_in_dict_in_dict(self):
1952
 
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1953
 
        def func():
1954
 
            return method_return
1955
 
        fake_ravel = self.fake_ravel_func(func)
1956
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1957
 
        method_return = {
1958
 
            ObjectPath("key1"): {
1959
 
                ObjectPath("key11"): ObjectPath("value11"),
1960
 
                ObjectPath("key12"): ObjectPath("value12"),
1961
 
            },
1962
 
            ObjectPath("key2"): {
1963
 
                ObjectPath("key21"): ObjectPath("value21"),
1964
 
                ObjectPath("key22"): ObjectPath("value22"),
1965
 
            },
1966
 
        }
1967
 
        ret = self.call_method(bus, "methodname", "busname",
1968
 
                               "objectpath", "interface")
1969
 
        expected_method_return = {
1970
 
            "key1": {"key11": "value11",
1971
 
                     "key12": "value12"},
1972
 
            "key2": {"key21": "value21",
1973
 
                     "key22": "value22"},
1974
 
        }
1975
 
        self.assertEqual(expected_method_return, ret)
1976
 
        for key, value in ret.items():
1977
 
            self.assertIsInstance(value, dict)
1978
 
            self.assertEqual(expected_method_return[key], value)
1979
 
            self.assertNotIsInstance(key, ObjectPath)
1980
 
            for inner_key, inner_value in value.items():
1981
 
                self.assertIsInstance(value, dict)
1982
 
                self.assertEqual(
1983
 
                    expected_method_return[key][inner_key],
1984
 
                    inner_value)
1985
 
                self.assertNotIsInstance(key, ObjectPath)
1986
 
 
1987
 
    def test_call_method_filters_objectpaths_in_dict_three_deep(self):
1988
 
        ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1989
 
        def func():
1990
 
            return method_return
1991
 
        fake_ravel = self.fake_ravel_func(func)
1992
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1993
 
        method_return = {
1994
 
            ObjectPath("key1"): {
1995
 
                ObjectPath("key2"): {
1996
 
                    ObjectPath("key3"): ObjectPath("value"),
1997
 
                },
1998
 
            },
1999
 
        }
2000
 
        ret = self.call_method(bus, "methodname", "busname",
2001
 
                               "objectpath", "interface")
2002
 
        expected_method_return = {"key1": {"key2": {"key3": "value"}}}
2003
 
        self.assertEqual(expected_method_return, ret)
2004
 
        self.assertIsInstance(ret, dict)
2005
 
        self.assertNotIsInstance(next(iter(ret.keys())), ObjectPath)
2006
 
        self.assertIsInstance(ret["key1"], dict)
2007
 
        self.assertNotIsInstance(next(iter(ret["key1"].keys())),
2008
 
                                 ObjectPath)
2009
 
        self.assertIsInstance(ret["key1"]["key2"], dict)
2010
 
        self.assertNotIsInstance(
2011
 
            next(iter(ret["key1"]["key2"].keys())),
2012
 
            ObjectPath)
2013
 
        self.assertEqual("value", ret["key1"]["key2"]["key3"])
2014
 
        self.assertNotIsInstance(ret["key1"]["key2"]["key3"],
2015
 
                                 self.dummy_dbussy.DBUS.ObjectPath)
2016
 
 
2017
 
    def test_call_method_handles_exception(self):
2018
 
        def func():
2019
 
            raise self.dummy_dbussy.DBusError()
2020
 
 
2021
 
        fake_ravel = self.fake_ravel_func(func)
2022
 
        bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
2023
 
 
2024
 
        with self.assertRaises(dbus.Error) as e:
2025
 
            self.call_method(bus, "methodname", "busname",
2026
 
                             "objectpath", "interface")
2027
 
 
2028
 
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
2029
 
 
2030
 
    def test_get_object_converts_to_correct_exception(self):
2031
 
        class fake_ravel_raises_exception_on_connect:
2032
 
            @staticmethod
2033
 
            def system_bus():
2034
 
                class Bus:
2035
 
                    @staticmethod
2036
 
                    def __getitem__(key):
2037
 
                        if key == "objectpath":
2038
 
                            raise self.dummy_dbussy.DBusError()
2039
 
                        raise Exception(key)
2040
 
                return {"busname": Bus()}
2041
 
        def func():
2042
 
            raise self.dummy_dbussy.DBusError()
2043
 
        bus = dbussy_adapter.SystemBus(
2044
 
            self.dummy_dbussy,
2045
 
            fake_ravel_raises_exception_on_connect)
2046
 
        with self.assertRaises(dbus.ConnectFailed):
2047
 
            self.call_method(bus, "methodname", "busname",
2048
 
                             "objectpath", "interface")
2049
 
 
2050
 
 
2051
1511
class Test_commands_from_options(unittest.TestCase):
2052
1512
 
2053
1513
    def setUp(self):
2058
1518
        self.assert_command_from_args(["--is-enabled", "client"],
2059
1519
                                      command.IsEnabled)
2060
1520
 
2061
 
    def assert_command_from_args(self, args, command_cls, length=1,
2062
 
                                 clients=None, **cmd_attrs):
 
1521
    def assert_command_from_args(self, args, command_cls,
 
1522
                                 **cmd_attrs):
2063
1523
        """Assert that parsing ARGS should result in an instance of
2064
1524
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
2065
1525
        options = self.parser.parse_args(args)
2066
1526
        check_option_syntax(self.parser, options)
2067
1527
        commands = commands_from_options(options)
2068
 
        self.assertEqual(length, len(commands))
2069
 
        for command in commands:
2070
 
            if isinstance(command, command_cls):
2071
 
                break
2072
 
        else:
2073
 
            self.assertIsInstance(command, command_cls)
2074
 
        if clients is not None:
2075
 
            self.assertEqual(clients, options.client)
 
1528
        self.assertEqual(1, len(commands))
 
1529
        command = commands[0]
 
1530
        self.assertIsInstance(command, command_cls)
2076
1531
        for key, value in cmd_attrs.items():
2077
1532
            self.assertEqual(value, getattr(command, key))
2078
1533
 
2079
 
    def assert_commands_from_args(self, args, commands, clients=None):
2080
 
        for cmd in commands:
2081
 
            self.assert_command_from_args(args, cmd,
2082
 
                                          length=len(commands),
2083
 
                                          clients=clients)
2084
 
 
2085
1534
    def test_is_enabled_short(self):
2086
1535
        self.assert_command_from_args(["-V", "client"],
2087
1536
                                      command.IsEnabled)
2278
1727
                                      verbose=True)
2279
1728
 
2280
1729
 
2281
 
    def test_manual_page_example_1(self):
2282
 
        self.assert_command_from_args("",
2283
 
                                      command.PrintTable,
2284
 
                                      clients=[],
2285
 
                                      verbose=False)
2286
 
 
2287
 
    def test_manual_page_example_2(self):
2288
 
        self.assert_command_from_args(
2289
 
            "--verbose foo1.example.org foo2.example.org".split(),
2290
 
            command.PrintTable, clients=["foo1.example.org",
2291
 
                                         "foo2.example.org"],
2292
 
            verbose=True)
2293
 
 
2294
 
    def test_manual_page_example_3(self):
2295
 
        self.assert_command_from_args("--enable --all".split(),
2296
 
                                      command.Enable,
2297
 
                                      clients=[])
2298
 
 
2299
 
    def test_manual_page_example_4(self):
2300
 
        self.assert_commands_from_args(
2301
 
            ("--timeout=PT5M --interval=PT1M foo1.example.org"
2302
 
             " foo2.example.org").split(),
2303
 
            [command.SetTimeout, command.SetInterval],
2304
 
            clients=["foo1.example.org", "foo2.example.org"])
2305
 
 
2306
 
    def test_manual_page_example_5(self):
2307
 
        self.assert_command_from_args("--approve --all".split(),
2308
 
                                      command.Approve,
2309
 
                                      clients=[])
2310
 
 
2311
 
 
2312
1730
class TestCommand(unittest.TestCase):
2313
1731
    """Abstract class for tests of command classes"""
2314
1732
 
2427
1845
        busname = "se.recompile.Mandos"
2428
1846
        client_interface = "se.recompile.Mandos.Client"
2429
1847
        command.Approve().run(self.bus.clients, self.bus)
2430
 
        self.assertTrue(self.bus.clients)
2431
1848
        for clientpath in self.bus.clients:
2432
1849
            self.assertIn(("Approve", busname, clientpath,
2433
1850
                           client_interface, (True,)), self.bus.calls)
2436
1853
        busname = "se.recompile.Mandos"
2437
1854
        client_interface = "se.recompile.Mandos.Client"
2438
1855
        command.Deny().run(self.bus.clients, self.bus)
2439
 
        self.assertTrue(self.bus.clients)
2440
1856
        for clientpath in self.bus.clients:
2441
1857
            self.assertIn(("Approve", busname, clientpath,
2442
1858
                           client_interface, (False,)),
2443
1859
                          self.bus.calls)
2444
1860
 
2445
1861
    def test_Remove(self):
2446
 
        busname = "se.recompile.Mandos"
2447
 
        server_path = "/"
2448
 
        server_interface = "se.recompile.Mandos"
2449
 
        orig_clients = self.bus.clients.copy()
2450
1862
        command.Remove().run(self.bus.clients, self.bus)
2451
 
        self.assertFalse(self.bus.clients)
2452
 
        for clientpath in orig_clients:
2453
 
            self.assertIn(("RemoveClient", busname,
2454
 
                           server_path, server_interface,
 
1863
        for clientpath in self.bus.clients:
 
1864
            self.assertIn(("RemoveClient", dbus_busname,
 
1865
                           dbus_server_path, dbus_server_interface,
2455
1866
                           (clientpath,)), self.bus.calls)
2456
1867
 
2457
1868
    expected_json = {
2659
2070
        else:
2660
2071
            cmd_args = [() for x in range(len(self.values_to_get))]
2661
2072
            values_to_get = self.values_to_get
2662
 
        self.assertTrue(values_to_get)
2663
2073
        for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2664
2074
            for clientpath in self.bus.clients:
2665
2075
                self.bus.clients[clientpath][self.propname] = (
2666
2076
                    Unique())
2667
2077
            self.command(*cmd_arg).run(self.bus.clients, self.bus)
2668
 
            self.assertTrue(self.bus.clients)
2669
2078
            for clientpath in self.bus.clients:
2670
2079
                value = (self.bus.clients[clientpath]
2671
2080
                         [self.propname])
2730
2139
class TestSetSecretCmd(TestPropertySetterCmd):
2731
2140
    command = command.SetSecret
2732
2141
    propname = "Secret"
2733
 
    def __init__(self, *args, **kwargs):
2734
 
        self.values_to_set = [io.BytesIO(b""),
2735
 
                              io.BytesIO(b"secret\0xyzzy\nbar")]
2736
 
        self.values_to_get = [f.getvalue() for f in
2737
 
                              self.values_to_set]
2738
 
        super(TestSetSecretCmd, self).__init__(*args, **kwargs)
 
2142
    values_to_set = [io.BytesIO(b""),
 
2143
                     io.BytesIO(b"secret\0xyzzy\nbar")]
 
2144
    values_to_get = [f.getvalue() for f in values_to_set]
2739
2145
 
2740
2146
 
2741
2147
class TestSetTimeoutCmd(TestPropertySetterCmd):
2794
2200
 
2795
2201
 
2796
2202
 
2797
 
def parse_test_args():
2798
 
    # type: () -> argparse.Namespace
 
2203
def should_only_run_tests():
2799
2204
    parser = argparse.ArgumentParser(add_help=False)
2800
 
    parser.add_argument("--check", action="store_true")
2801
 
    parser.add_argument("--prefix", )
 
2205
    parser.add_argument("--check", action='store_true')
2802
2206
    args, unknown_args = parser.parse_known_args()
2803
 
    if args.check:
2804
 
        # Remove test options from sys.argv
 
2207
    run_tests = args.check
 
2208
    if run_tests:
 
2209
        # Remove --check argument from sys.argv
2805
2210
        sys.argv[1:] = unknown_args
2806
 
    return args
 
2211
    return run_tests
2807
2212
 
2808
2213
# Add all tests from doctest strings
2809
2214
def load_tests(loader, tests, none):
2812
2217
    return tests
2813
2218
 
2814
2219
if __name__ == "__main__":
2815
 
    options = parse_test_args()
2816
2220
    try:
2817
 
        if options.check:
2818
 
            extra_test_prefix = options.prefix
2819
 
            if extra_test_prefix is not None:
2820
 
                if not (unittest.main(argv=[""], exit=False)
2821
 
                        .result.wasSuccessful()):
2822
 
                    sys.exit(1)
2823
 
                class ExtraTestLoader(unittest.TestLoader):
2824
 
                    testMethodPrefix = extra_test_prefix
2825
 
                # Call using ./scriptname --check [--verbose]
2826
 
                unittest.main(argv=[""], testLoader=ExtraTestLoader())
2827
 
            else:
2828
 
                unittest.main(argv=[""])
 
2221
        if should_only_run_tests():
 
2222
            # Call using ./tdd-python-script --check [--verbose]
 
2223
            unittest.main()
2829
2224
        else:
2830
2225
            main()
2831
2226
    finally:
2832
2227
        logging.shutdown()
2833
 
 
2834
 
# Local Variables:
2835
 
# run-tests:
2836
 
# (lambda (&optional extra)
2837
 
#   (if (not (funcall run-tests-in-test-buffer default-directory
2838
 
#             extra))
2839
 
#       (funcall show-test-buffer-in-test-window)
2840
 
#     (funcall remove-test-window)
2841
 
#     (if extra (message "Extra tests run successfully!"))))
2842
 
# run-tests-in-test-buffer:
2843
 
# (lambda (dir &optional extra)
2844
 
#   (with-current-buffer (get-buffer-create "*Test*")
2845
 
#     (setq buffer-read-only nil
2846
 
#           default-directory dir)
2847
 
#     (erase-buffer)
2848
 
#     (compilation-mode))
2849
 
#   (let ((process-result
2850
 
#          (let ((inhibit-read-only t))
2851
 
#            (process-file-shell-command
2852
 
#             (funcall get-command-line extra) nil "*Test*"))))
2853
 
#     (and (numberp process-result)
2854
 
#          (= process-result 0))))
2855
 
# get-command-line:
2856
 
# (lambda (&optional extra)
2857
 
#   (let ((quoted-script
2858
 
#          (shell-quote-argument (funcall get-script-name))))
2859
 
#     (format
2860
 
#      (concat "%s --check" (if extra " --prefix=atest" ""))
2861
 
#      quoted-script)))
2862
 
# get-script-name:
2863
 
# (lambda ()
2864
 
#   (if (fboundp 'file-local-name)
2865
 
#       (file-local-name (buffer-file-name))
2866
 
#     (or (file-remote-p (buffer-file-name) 'localname)
2867
 
#         (buffer-file-name))))
2868
 
# remove-test-window:
2869
 
# (lambda ()
2870
 
#   (let ((test-window (get-buffer-window "*Test*")))
2871
 
#     (if test-window (delete-window test-window))))
2872
 
# show-test-buffer-in-test-window:
2873
 
# (lambda ()
2874
 
#   (when (not (get-buffer-window-list "*Test*"))
2875
 
#     (setq next-error-last-buffer (get-buffer "*Test*"))
2876
 
#     (let* ((side (if (>= (window-width) 146) 'right 'bottom))
2877
 
#            (display-buffer-overriding-action
2878
 
#             `((display-buffer-in-side-window) (side . ,side)
2879
 
#               (window-height . fit-window-to-buffer)
2880
 
#               (window-width . fit-window-to-buffer))))
2881
 
#       (display-buffer "*Test*"))))
2882
 
# eval:
2883
 
# (progn
2884
 
#   (let* ((run-extra-tests (lambda () (interactive)
2885
 
#                             (funcall run-tests t)))
2886
 
#          (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
2887
 
#          (outer-keymap `(keymap (3 . ,inner-keymap))))     ; C-c
2888
 
#     (setq minor-mode-overriding-map-alist
2889
 
#           (cons `(run-tests . ,outer-keymap)
2890
 
#                 minor-mode-overriding-map-alist)))
2891
 
#   (add-hook 'after-save-hook run-tests 90 t))
2892
 
# End: