/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2022-04-25 20:09:43 UTC
  • mfrom: (407 release)
  • mto: This revision was merged to the branch mainline in revision 408.
  • Revision ID: teddy@recompile.se-20220425200943-e154ee7jf6df1uso
Merge from release branch

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/python3 -bbI
2
 
# -*- after-save-hook: (lambda () (let ((command (if (fboundp 'file-local-name) (file-local-name (buffer-file-name)) (or (file-remote-p (buffer-file-name) 'localname) (buffer-file-name))))) (if (= (progn (if (get-buffer "*Test*") (kill-buffer "*Test*")) (process-file-shell-command (format "%s --check" (shell-quote-argument command)) nil "*Test*")) 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w))) (progn (with-current-buffer "*Test*" (compilation-mode)) (display-buffer "*Test*" '(display-buffer-in-side-window)))))); coding: utf-8 -*-
3
 
#
4
 
# Mandos Monitor - Control and monitor the Mandos server
5
 
#
6
 
# Copyright © 2008-2019 Teddy Hogeborn
7
 
# Copyright © 2008-2019 Björn Påhlsson
 
2
# -*- coding: utf-8; lexical-binding: t -*-
 
3
#
 
4
# Mandos Control - Control or query the Mandos server
 
5
#
 
6
# Copyright © 2008-2022 Teddy Hogeborn
 
7
# Copyright © 2008-2022 Björn Påhlsson
8
8
#
9
9
# This file is part of Mandos.
10
10
#
23
23
#
24
24
# Contact the authors at <mandos@recompile.se>.
25
25
#
26
 
 
27
26
from __future__ import (division, absolute_import, print_function,
28
27
                        unicode_literals)
29
28
 
33
32
    pass
34
33
 
35
34
import sys
 
35
import unittest
36
36
import argparse
 
37
import logging
 
38
import os
37
39
import locale
38
40
import datetime
39
41
import re
40
 
import os
41
42
import collections
42
43
import json
43
 
import unittest
44
 
import logging
45
44
import io
46
45
import tempfile
47
46
import contextlib
49
48
if sys.version_info.major == 2:
50
49
    __metaclass__ = type
51
50
    str = unicode
 
51
    input = raw_input
52
52
 
53
53
class gi:
54
54
    """Dummy gi module, for the tests"""
77
77
    import warnings
78
78
    warnings.simplefilter("default")
79
79
 
80
 
log = logging.getLogger(sys.argv[0])
 
80
log = logging.getLogger(os.path.basename(sys.argv[0]))
81
81
logging.basicConfig(level="INFO", # Show info level messages
82
82
                    format="%(message)s") # Show basic log messages
83
83
 
89
89
 
90
90
locale.setlocale(locale.LC_ALL, "")
91
91
 
92
 
version = "1.8.9"
 
92
version = "1.8.15"
93
93
 
94
94
 
95
95
def main():
102
102
    clientnames = options.client
103
103
 
104
104
    if options.debug:
105
 
        log.setLevel(logging.DEBUG)
 
105
        logging.getLogger("").setLevel(logging.DEBUG)
106
106
 
107
107
    if dbussy is not None and ravel is not None:
108
108
        bus = dbussy_adapter.CachingBus(dbussy, ravel)
256
256
        return rfc3339_duration_to_delta(interval)
257
257
    except ValueError as e:
258
258
        log.warning("%s - Parsing as pre-1.6.1 interval instead",
259
 
                    ' '.join(e.args))
 
259
                    " ".join(e.args))
260
260
    return parse_pre_1_6_1_interval(interval)
261
261
 
262
262
 
395
395
    """Parse an interval string as documented by Mandos before 1.6.1,
396
396
    and return a datetime.timedelta
397
397
 
398
 
    >>> parse_pre_1_6_1_interval('7d') == datetime.timedelta(days=7)
399
 
    True
400
 
    >>> parse_pre_1_6_1_interval('60s') == datetime.timedelta(0, 60)
401
 
    True
402
 
    >>> parse_pre_1_6_1_interval('60m') == datetime.timedelta(hours=1)
403
 
    True
404
 
    >>> parse_pre_1_6_1_interval('24h') == datetime.timedelta(days=1)
405
 
    True
406
 
    >>> parse_pre_1_6_1_interval('1w') == datetime.timedelta(days=7)
407
 
    True
408
 
    >>> parse_pre_1_6_1_interval('5m 30s') == datetime.timedelta(0, 330)
409
 
    True
410
 
    >>> parse_pre_1_6_1_interval('') == datetime.timedelta(0)
 
398
    >>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7)
 
399
    True
 
400
    >>> parse_pre_1_6_1_interval("60s") == datetime.timedelta(0, 60)
 
401
    True
 
402
    >>> parse_pre_1_6_1_interval("60m") == datetime.timedelta(hours=1)
 
403
    True
 
404
    >>> parse_pre_1_6_1_interval("24h") == datetime.timedelta(days=1)
 
405
    True
 
406
    >>> parse_pre_1_6_1_interval("1w") == datetime.timedelta(days=7)
 
407
    True
 
408
    >>> parse_pre_1_6_1_interval("5m 30s") == datetime.timedelta(0, 330)
 
409
    True
 
410
    >>> parse_pre_1_6_1_interval("") == datetime.timedelta(0)
411
411
    True
412
412
    >>> # Ignore unknown characters, allow any order and repetitions
413
 
    >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m') == datetime.timedelta(2, 480, 18000)
 
413
    >>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") == datetime.timedelta(2, 480, 18000)
414
414
    True
415
415
 
416
416
    """
497
497
                             self.properties_iface, interface, key,
498
498
                             value)
499
499
 
 
500
        def call_method(self, methodname, busname, objectpath,
 
501
                        interface, *args):
 
502
            raise NotImplementedError()
 
503
 
 
504
 
500
505
    class MandosBus(SystemBus):
501
506
        busname_domain = "se.recompile"
502
507
        busname = busname_domain + ".Mandos"
721
726
            with self.convert_exception(dbus.Error):
722
727
                value =  method(*args)
723
728
            # DBussy returns values either as an empty list or as a
724
 
            # tuple: (signature, value)
 
729
            # list of one element with the return value
725
730
            if value:
726
731
                return self.type_filter(value[0])
727
732
 
733
738
 
734
739
        def type_filter(self, value):
735
740
            """Convert the most bothersome types to Python types"""
 
741
            # A D-Bus Variant value is represented as the Python type
 
742
            # Tuple[dbussy.DBUS.Signature, Any]
736
743
            if isinstance(value, tuple):
737
744
                if (len(value) == 2
738
745
                    and isinstance(value[0],
869
876
                    {key: properties[key]
870
877
                     for key in self.all_keywords}
871
878
                    for properties in clients.values()}
872
 
            print(json.dumps(data, indent=4, separators=(',', ': ')))
 
879
            print(json.dumps(data, indent=4, separators=(",", ": ")))
873
880
 
874
881
 
875
882
    class PrintTable(Output):
1627
1634
        finally:
1628
1635
            dbus_logger.removeFilter(counting_handler)
1629
1636
 
1630
 
        self.assertNotIsInstance(e, dbus.ConnectFailed)
 
1637
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
1631
1638
 
1632
1639
        # Make sure the dbus logger was suppressed
1633
1640
        self.assertEqual(0, counting_handler.count)
1770
1777
            self.call_method(bus, "methodname", "busname",
1771
1778
                             "objectpath", "interface")
1772
1779
 
1773
 
        self.assertNotIsInstance(e, dbus.ConnectFailed)
 
1780
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
1774
1781
 
1775
1782
    def test_get_converts_to_correct_exception(self):
1776
1783
        bus = pydbus_adapter.SystemBus(
2432
2439
        busname = "se.recompile.Mandos"
2433
2440
        client_interface = "se.recompile.Mandos.Client"
2434
2441
        command.Approve().run(self.bus.clients, self.bus)
 
2442
        self.assertTrue(self.bus.clients)
2435
2443
        for clientpath in self.bus.clients:
2436
2444
            self.assertIn(("Approve", busname, clientpath,
2437
2445
                           client_interface, (True,)), self.bus.calls)
2440
2448
        busname = "se.recompile.Mandos"
2441
2449
        client_interface = "se.recompile.Mandos.Client"
2442
2450
        command.Deny().run(self.bus.clients, self.bus)
 
2451
        self.assertTrue(self.bus.clients)
2443
2452
        for clientpath in self.bus.clients:
2444
2453
            self.assertIn(("Approve", busname, clientpath,
2445
2454
                           client_interface, (False,)),
2446
2455
                          self.bus.calls)
2447
2456
 
2448
2457
    def test_Remove(self):
 
2458
        busname = "se.recompile.Mandos"
 
2459
        server_path = "/"
 
2460
        server_interface = "se.recompile.Mandos"
 
2461
        orig_clients = self.bus.clients.copy()
2449
2462
        command.Remove().run(self.bus.clients, self.bus)
2450
 
        for clientpath in self.bus.clients:
2451
 
            self.assertIn(("RemoveClient", dbus_busname,
2452
 
                           dbus_server_path, dbus_server_interface,
 
2463
        self.assertFalse(self.bus.clients)
 
2464
        for clientpath in orig_clients:
 
2465
            self.assertIn(("RemoveClient", busname,
 
2466
                           server_path, server_interface,
2453
2467
                           (clientpath,)), self.bus.calls)
2454
2468
 
2455
2469
    expected_json = {
2657
2671
        else:
2658
2672
            cmd_args = [() for x in range(len(self.values_to_get))]
2659
2673
            values_to_get = self.values_to_get
 
2674
        self.assertTrue(values_to_get)
2660
2675
        for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2661
2676
            for clientpath in self.bus.clients:
2662
2677
                self.bus.clients[clientpath][self.propname] = (
2663
2678
                    Unique())
2664
2679
            self.command(*cmd_arg).run(self.bus.clients, self.bus)
 
2680
            self.assertTrue(self.bus.clients)
2665
2681
            for clientpath in self.bus.clients:
2666
2682
                value = (self.bus.clients[clientpath]
2667
2683
                         [self.propname])
2726
2742
class TestSetSecretCmd(TestPropertySetterCmd):
2727
2743
    command = command.SetSecret
2728
2744
    propname = "Secret"
2729
 
    values_to_set = [io.BytesIO(b""),
2730
 
                     io.BytesIO(b"secret\0xyzzy\nbar")]
2731
 
    values_to_get = [f.getvalue() for f in values_to_set]
 
2745
    def __init__(self, *args, **kwargs):
 
2746
        self.values_to_set = [io.BytesIO(b""),
 
2747
                              io.BytesIO(b"secret\0xyzzy\nbar")]
 
2748
        self.values_to_get = [f.getvalue() for f in
 
2749
                              self.values_to_set]
 
2750
        super(TestSetSecretCmd, self).__init__(*args, **kwargs)
2732
2751
 
2733
2752
 
2734
2753
class TestSetTimeoutCmd(TestPropertySetterCmd):
2787
2806
 
2788
2807
 
2789
2808
 
2790
 
def should_only_run_tests():
 
2809
def parse_test_args():
 
2810
    # type: () -> argparse.Namespace
2791
2811
    parser = argparse.ArgumentParser(add_help=False)
2792
 
    parser.add_argument("--check", action='store_true')
 
2812
    parser.add_argument("--check", action="store_true")
 
2813
    parser.add_argument("--prefix", )
2793
2814
    args, unknown_args = parser.parse_known_args()
2794
 
    run_tests = args.check
2795
 
    if run_tests:
2796
 
        # Remove --check argument from sys.argv
 
2815
    if args.check:
 
2816
        # Remove test options from sys.argv
2797
2817
        sys.argv[1:] = unknown_args
2798
 
    return run_tests
 
2818
    return args
2799
2819
 
2800
2820
# Add all tests from doctest strings
2801
2821
def load_tests(loader, tests, none):
2804
2824
    return tests
2805
2825
 
2806
2826
if __name__ == "__main__":
 
2827
    options = parse_test_args()
2807
2828
    try:
2808
 
        if should_only_run_tests():
2809
 
            # Call using ./tdd-python-script --check [--verbose]
2810
 
            unittest.main()
 
2829
        if options.check:
 
2830
            extra_test_prefix = options.prefix
 
2831
            if extra_test_prefix is not None:
 
2832
                if not (unittest.main(argv=[""], exit=False)
 
2833
                        .result.wasSuccessful()):
 
2834
                    sys.exit(1)
 
2835
                class ExtraTestLoader(unittest.TestLoader):
 
2836
                    testMethodPrefix = extra_test_prefix
 
2837
                # Call using ./scriptname --check [--verbose]
 
2838
                unittest.main(argv=[""], testLoader=ExtraTestLoader())
 
2839
            else:
 
2840
                unittest.main(argv=[""])
2811
2841
        else:
2812
2842
            main()
2813
2843
    finally:
2814
2844
        logging.shutdown()
 
2845
 
 
2846
# Local Variables:
 
2847
# run-tests:
 
2848
# (lambda (&optional extra)
 
2849
#   (if (not (funcall run-tests-in-test-buffer default-directory
 
2850
#             extra))
 
2851
#       (funcall show-test-buffer-in-test-window)
 
2852
#     (funcall remove-test-window)
 
2853
#     (if extra (message "Extra tests run successfully!"))))
 
2854
# run-tests-in-test-buffer:
 
2855
# (lambda (dir &optional extra)
 
2856
#   (with-current-buffer (get-buffer-create "*Test*")
 
2857
#     (setq buffer-read-only nil
 
2858
#           default-directory dir)
 
2859
#     (erase-buffer)
 
2860
#     (compilation-mode))
 
2861
#   (let ((process-result
 
2862
#          (let ((inhibit-read-only t))
 
2863
#            (process-file-shell-command
 
2864
#             (funcall get-command-line extra) nil "*Test*"))))
 
2865
#     (and (numberp process-result)
 
2866
#          (= process-result 0))))
 
2867
# get-command-line:
 
2868
# (lambda (&optional extra)
 
2869
#   (let ((quoted-script
 
2870
#          (shell-quote-argument (funcall get-script-name))))
 
2871
#     (format
 
2872
#      (concat "%s --check" (if extra " --prefix=atest" ""))
 
2873
#      quoted-script)))
 
2874
# get-script-name:
 
2875
# (lambda ()
 
2876
#   (if (fboundp 'file-local-name)
 
2877
#       (file-local-name (buffer-file-name))
 
2878
#     (or (file-remote-p (buffer-file-name) 'localname)
 
2879
#         (buffer-file-name))))
 
2880
# remove-test-window:
 
2881
# (lambda ()
 
2882
#   (let ((test-window (get-buffer-window "*Test*")))
 
2883
#     (if test-window (delete-window test-window))))
 
2884
# show-test-buffer-in-test-window:
 
2885
# (lambda ()
 
2886
#   (when (not (get-buffer-window-list "*Test*"))
 
2887
#     (setq next-error-last-buffer (get-buffer "*Test*"))
 
2888
#     (let* ((side (if (>= (window-width) 146) 'right 'bottom))
 
2889
#            (display-buffer-overriding-action
 
2890
#             `((display-buffer-in-side-window) (side . ,side)
 
2891
#               (window-height . fit-window-to-buffer)
 
2892
#               (window-width . fit-window-to-buffer))))
 
2893
#       (display-buffer "*Test*"))))
 
2894
# eval:
 
2895
# (progn
 
2896
#   (let* ((run-extra-tests (lambda () (interactive)
 
2897
#                             (funcall run-tests t)))
 
2898
#          (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
 
2899
#          (outer-keymap `(keymap (3 . ,inner-keymap))))     ; C-c
 
2900
#     (setq minor-mode-overriding-map-alist
 
2901
#           (cons `(run-tests . ,outer-keymap)
 
2902
#                 minor-mode-overriding-map-alist)))
 
2903
#   (add-hook 'after-save-hook run-tests 90 t))
 
2904
# End: