1
1
#!/usr/bin/python3 -bbI
2
# -*- coding: utf-8; lexical-binding: t -*-
4
# Mandos Control - Control or query the Mandos server
6
# Copyright © 2008-2022 Teddy Hogeborn
7
# Copyright © 2008-2022 Björn Påhlsson
2
# -*- after-save-hook: (lambda () (let ((command (if (fboundp 'file-local-name) (file-local-name (buffer-file-name)) (or (file-remote-p (buffer-file-name) 'localname) (buffer-file-name))))) (if (= (progn (if (get-buffer "*Test*") (kill-buffer "*Test*")) (process-file-shell-command (format "%s --check" (shell-quote-argument command)) nil "*Test*")) 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w))) (progn (with-current-buffer "*Test*" (compilation-mode)) (display-buffer "*Test*" '(display-buffer-in-side-window)))))); coding: utf-8 -*-
4
# Mandos Monitor - Control and monitor the Mandos server
6
# Copyright © 2008-2019 Teddy Hogeborn
7
# Copyright © 2008-2019 Björn Påhlsson
9
9
# This file is part of Mandos.
81
78
warnings.simplefilter("default")
83
log = logging.getLogger(os.path.basename(sys.argv[0]))
84
logging.basicConfig(level="INFO", # Show info level messages
80
log = logging.getLogger(sys.argv[0])
81
logging.basicConfig(level="INFO", # Show info level messages
85
82
format="%(message)s") # Show basic log messages
87
84
logging.captureWarnings(True) # Show warnings via the logging system
397
394
def parse_pre_1_6_1_interval(interval):
398
r"""Parse an interval string as documented by Mandos before 1.6.1,
395
"""Parse an interval string as documented by Mandos before 1.6.1,
399
396
and return a datetime.timedelta
401
>>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7)
403
>>> parse_pre_1_6_1_interval("60s") == datetime.timedelta(0, 60)
405
>>> parse_pre_1_6_1_interval("60m") == datetime.timedelta(hours=1)
407
>>> parse_pre_1_6_1_interval("24h") == datetime.timedelta(days=1)
409
>>> parse_pre_1_6_1_interval("1w") == datetime.timedelta(days=7)
411
>>> parse_pre_1_6_1_interval("5m 30s") == datetime.timedelta(0, 330)
413
>>> parse_pre_1_6_1_interval("") == datetime.timedelta(0)
398
>>> parse_pre_1_6_1_interval('7d') == datetime.timedelta(days=7)
400
>>> parse_pre_1_6_1_interval('60s') == datetime.timedelta(0, 60)
402
>>> parse_pre_1_6_1_interval('60m') == datetime.timedelta(hours=1)
404
>>> parse_pre_1_6_1_interval('24h') == datetime.timedelta(days=1)
406
>>> parse_pre_1_6_1_interval('1w') == datetime.timedelta(days=7)
408
>>> parse_pre_1_6_1_interval('5m 30s') == datetime.timedelta(0, 330)
410
>>> parse_pre_1_6_1_interval('') == datetime.timedelta(0)
415
412
>>> # Ignore unknown characters, allow any order and repetitions
416
>>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") \
417
... == datetime.timedelta(2, 480, 18000)
413
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m') == datetime.timedelta(2, 480, 18000)
491
487
object_manager_iface = "org.freedesktop.DBus.ObjectManager"
493
488
def get_managed_objects(self, busname, objectpath):
494
489
return self.call_method("GetManagedObjects", busname,
496
491
self.object_manager_iface)
498
493
properties_iface = "org.freedesktop.DBus.Properties"
500
494
def set_property(self, busname, objectpath, interface, key,
502
496
self.call_method("Set", busname, objectpath,
503
497
self.properties_iface, interface, key,
506
def call_method(self, methodname, busname, objectpath,
508
raise NotImplementedError()
510
500
class MandosBus(SystemBus):
511
501
busname_domain = "se.recompile"
512
502
busname = busname_domain + ".Mandos"
621
610
def __exit__(self, exc_type, exc_val, exc_tb):
622
611
self.logger.removeFilter(self.nullfilter)
624
614
class CachingBus(SystemBus):
625
615
"""A caching layer for dbus_python_adapter.SystemBus"""
627
616
def __init__(self, *args, **kwargs):
628
617
self.object_cache = {}
629
618
super(dbus_python_adapter.CachingBus,
630
619
self).__init__(*args, **kwargs)
632
620
def get_object(self, busname, objectpath):
634
622
return self.object_cache[(busname, objectpath)]
690
678
class CachingBus(SystemBus):
691
679
"""A caching layer for pydbus_adapter.SystemBus"""
693
680
def __init__(self, *args, **kwargs):
694
681
self.object_cache = {}
695
682
super(pydbus_adapter.CachingBus,
696
683
self).__init__(*args, **kwargs)
698
684
def get(self, busname, objectpath):
700
686
return self.object_cache[(busname, objectpath)]
702
688
new_object = (super(pydbus_adapter.CachingBus, self)
703
689
.get(busname, objectpath))
704
self.object_cache[(busname, objectpath)] = new_object
690
self.object_cache[(busname, objectpath)] = new_object
705
691
return new_object
733
719
iface = proxy_object.get_interface(interface)
734
720
method = getattr(iface, methodname)
735
721
with self.convert_exception(dbus.Error):
736
value = method(*args)
722
value = method(*args)
737
723
# DBussy returns values either as an empty list or as a
738
# list of one element with the return value
724
# tuple: (signature, value)
740
726
return self.type_filter(value[0])
781
765
class CachingBus(MandosBus):
782
766
"""A caching layer for dbussy_adapter.MandosBus"""
784
767
def __init__(self, *args, **kwargs):
785
768
self.object_cache = {}
786
769
super(dbussy_adapter.CachingBus, self).__init__(*args,
789
771
def get_object(self, busname, objectpath):
791
773
return self.object_cache[(busname, objectpath)]
855
838
class Approve(Base):
856
839
def run_on_one_client(self, client, properties):
857
840
self.bus.call_client_method(client, "Approve", True)
859
843
class Deny(Base):
860
844
def run_on_one_client(self, client, properties):
861
845
self.bus.call_client_method(client, "Approve", False)
863
848
class Remove(Base):
864
849
def run(self, clients, bus):
865
850
for clientpath in frozenset(clients.keys()):
866
851
bus.call_server_method("RemoveClient", clientpath)
868
854
class Output(Base):
869
855
"""Abstract class for commands outputting client details"""
870
856
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
876
862
"Checker", "ExtendedTimeout", "Expires",
877
863
"LastCheckerStatus")
879
866
class DumpJSON(Output):
880
867
def run(self, clients, bus=None):
881
868
data = {properties["Name"]:
882
869
{key: properties[key]
883
870
for key in self.all_keywords}
884
871
for properties in clients.values()}
885
print(json.dumps(data, indent=4, separators=(",", ": ")))
872
print(json.dumps(data, indent=4, separators=(',', ': ')))
887
875
class PrintTable(Output):
888
876
def __init__(self, verbose=False):
993
981
def propname(self):
994
982
raise NotImplementedError()
996
985
class Enable(PropertySetter):
997
986
propname = "Enabled"
998
987
value_to_set = True
1000
990
class Disable(PropertySetter):
1001
991
propname = "Enabled"
1002
992
value_to_set = False
1004
995
class BumpTimeout(PropertySetter):
1005
996
propname = "LastCheckedOK"
1006
997
value_to_set = ""
1008
1000
class StartChecker(PropertySetter):
1009
1001
propname = "CheckerRunning"
1010
1002
value_to_set = True
1012
1005
class StopChecker(PropertySetter):
1013
1006
propname = "CheckerRunning"
1014
1007
value_to_set = False
1016
1010
class ApproveByDefault(PropertySetter):
1017
1011
propname = "ApprovedByDefault"
1018
1012
value_to_set = True
1020
1015
class DenyByDefault(PropertySetter):
1021
1016
propname = "ApprovedByDefault"
1022
1017
value_to_set = False
1024
1020
class PropertySetterValue(PropertySetter):
1025
1021
"""Abstract class for PropertySetter recieving a value as
1026
1022
constructor argument instead of a class attribute."""
1028
1023
def __init__(self, value):
1029
1024
self.value_to_set = value
1067
1065
"When setting, convert value from a datetime.timedelta"
1068
1066
self._vts = int(round(value.total_seconds() * 1000))
1070
1069
class SetTimeout(PropertySetterValueMilliseconds):
1071
1070
propname = "Timeout"
1073
1073
class SetExtendedTimeout(PropertySetterValueMilliseconds):
1074
1074
propname = "ExtendedTimeout"
1076
1077
class SetInterval(PropertySetterValueMilliseconds):
1077
1078
propname = "Interval"
1079
1081
class SetApprovalDelay(PropertySetterValueMilliseconds):
1080
1082
propname = "ApprovalDelay"
1082
1085
class SetApprovalDuration(PropertySetterValueMilliseconds):
1083
1086
propname = "ApprovalDuration"
1765
1770
self.call_method(bus, "methodname", "busname",
1766
1771
"objectpath", "interface")
1768
self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
1773
self.assertNotIsInstance(e, dbus.ConnectFailed)
1770
1775
def test_get_converts_to_correct_exception(self):
1771
1776
bus = pydbus_adapter.SystemBus(
2427
2432
busname = "se.recompile.Mandos"
2428
2433
client_interface = "se.recompile.Mandos.Client"
2429
2434
command.Approve().run(self.bus.clients, self.bus)
2430
self.assertTrue(self.bus.clients)
2431
2435
for clientpath in self.bus.clients:
2432
2436
self.assertIn(("Approve", busname, clientpath,
2433
2437
client_interface, (True,)), self.bus.calls)
2436
2440
busname = "se.recompile.Mandos"
2437
2441
client_interface = "se.recompile.Mandos.Client"
2438
2442
command.Deny().run(self.bus.clients, self.bus)
2439
self.assertTrue(self.bus.clients)
2440
2443
for clientpath in self.bus.clients:
2441
2444
self.assertIn(("Approve", busname, clientpath,
2442
2445
client_interface, (False,)),
2443
2446
self.bus.calls)
2445
2448
def test_Remove(self):
2446
busname = "se.recompile.Mandos"
2448
server_interface = "se.recompile.Mandos"
2449
orig_clients = self.bus.clients.copy()
2450
2449
command.Remove().run(self.bus.clients, self.bus)
2451
self.assertFalse(self.bus.clients)
2452
for clientpath in orig_clients:
2453
self.assertIn(("RemoveClient", busname,
2454
server_path, server_interface,
2450
for clientpath in self.bus.clients:
2451
self.assertIn(("RemoveClient", dbus_busname,
2452
dbus_server_path, dbus_server_interface,
2455
2453
(clientpath,)), self.bus.calls)
2457
2455
expected_json = {
2660
2658
cmd_args = [() for x in range(len(self.values_to_get))]
2661
2659
values_to_get = self.values_to_get
2662
self.assertTrue(values_to_get)
2663
2660
for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2664
2661
for clientpath in self.bus.clients:
2665
2662
self.bus.clients[clientpath][self.propname] = (
2667
2664
self.command(*cmd_arg).run(self.bus.clients, self.bus)
2668
self.assertTrue(self.bus.clients)
2669
2665
for clientpath in self.bus.clients:
2670
2666
value = (self.bus.clients[clientpath]
2671
2667
[self.propname])
2730
2726
class TestSetSecretCmd(TestPropertySetterCmd):
2731
2727
command = command.SetSecret
2732
2728
propname = "Secret"
2733
def __init__(self, *args, **kwargs):
2734
self.values_to_set = [io.BytesIO(b""),
2735
io.BytesIO(b"secret\0xyzzy\nbar")]
2736
self.values_to_get = [f.getvalue() for f in
2738
super(TestSetSecretCmd, self).__init__(*args, **kwargs)
2729
values_to_set = [io.BytesIO(b""),
2730
io.BytesIO(b"secret\0xyzzy\nbar")]
2731
values_to_get = [f.getvalue() for f in values_to_set]
2741
2734
class TestSetTimeoutCmd(TestPropertySetterCmd):
2797
def parse_test_args():
2798
# type: () -> argparse.Namespace
2790
def should_only_run_tests():
2799
2791
parser = argparse.ArgumentParser(add_help=False)
2800
parser.add_argument("--check", action="store_true")
2801
parser.add_argument("--prefix", )
2792
parser.add_argument("--check", action='store_true')
2802
2793
args, unknown_args = parser.parse_known_args()
2804
# Remove test options from sys.argv
2794
run_tests = args.check
2796
# Remove --check argument from sys.argv
2805
2797
sys.argv[1:] = unknown_args
2808
2800
# Add all tests from doctest strings
2809
2801
def load_tests(loader, tests, none):
2814
2806
if __name__ == "__main__":
2815
options = parse_test_args()
2818
extra_test_prefix = options.prefix
2819
if extra_test_prefix is not None:
2820
if not (unittest.main(argv=[""], exit=False)
2821
.result.wasSuccessful()):
2823
class ExtraTestLoader(unittest.TestLoader):
2824
testMethodPrefix = extra_test_prefix
2825
# Call using ./scriptname --check [--verbose]
2826
unittest.main(argv=[""], testLoader=ExtraTestLoader())
2828
unittest.main(argv=[""])
2808
if should_only_run_tests():
2809
# Call using ./tdd-python-script --check [--verbose]
2832
2814
logging.shutdown()
2836
# (lambda (&optional extra)
2837
# (if (not (funcall run-tests-in-test-buffer default-directory
2839
# (funcall show-test-buffer-in-test-window)
2840
# (funcall remove-test-window)
2841
# (if extra (message "Extra tests run successfully!"))))
2842
# run-tests-in-test-buffer:
2843
# (lambda (dir &optional extra)
2844
# (with-current-buffer (get-buffer-create "*Test*")
2845
# (setq buffer-read-only nil
2846
# default-directory dir)
2848
# (compilation-mode))
2849
# (let ((process-result
2850
# (let ((inhibit-read-only t))
2851
# (process-file-shell-command
2852
# (funcall get-command-line extra) nil "*Test*"))))
2853
# (and (numberp process-result)
2854
# (= process-result 0))))
2856
# (lambda (&optional extra)
2857
# (let ((quoted-script
2858
# (shell-quote-argument (funcall get-script-name))))
2860
# (concat "%s --check" (if extra " --prefix=atest" ""))
2864
# (if (fboundp 'file-local-name)
2865
# (file-local-name (buffer-file-name))
2866
# (or (file-remote-p (buffer-file-name) 'localname)
2867
# (buffer-file-name))))
2868
# remove-test-window:
2870
# (let ((test-window (get-buffer-window "*Test*")))
2871
# (if test-window (delete-window test-window))))
2872
# show-test-buffer-in-test-window:
2874
# (when (not (get-buffer-window-list "*Test*"))
2875
# (setq next-error-last-buffer (get-buffer "*Test*"))
2876
# (let* ((side (if (>= (window-width) 146) 'right 'bottom))
2877
# (display-buffer-overriding-action
2878
# `((display-buffer-in-side-window) (side . ,side)
2879
# (window-height . fit-window-to-buffer)
2880
# (window-width . fit-window-to-buffer))))
2881
# (display-buffer "*Test*"))))
2884
# (let* ((run-extra-tests (lambda () (interactive)
2885
# (funcall run-tests t)))
2886
# (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
2887
# (outer-keymap `(keymap (3 . ,inner-keymap)))) ; C-c
2888
# (setq minor-mode-overriding-map-alist
2889
# (cons `(run-tests . ,outer-keymap)
2890
# minor-mode-overriding-map-alist)))
2891
# (add-hook 'after-save-hook run-tests 90 t))