1
1
#!/usr/bin/python3 -bbI
2
# -*- after-save-hook: (lambda () (let ((command (if (fboundp 'file-local-name) (file-local-name (buffer-file-name)) (or (file-remote-p (buffer-file-name) 'localname) (buffer-file-name))))) (if (= (progn (if (get-buffer "*Test*") (kill-buffer "*Test*")) (process-file-shell-command (format "%s --check" (shell-quote-argument command)) nil "*Test*")) 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w))) (progn (with-current-buffer "*Test*" (compilation-mode)) (display-buffer "*Test*" '(display-buffer-in-side-window)))))); coding: utf-8 -*-
4
# Mandos Monitor - Control and monitor the Mandos server
6
# Copyright © 2008-2019 Teddy Hogeborn
7
# Copyright © 2008-2019 Björn Påhlsson
2
# -*- coding: utf-8; lexical-binding: t -*-
4
# Mandos Control - Control or query the Mandos server
6
# Copyright © 2008-2022 Teddy Hogeborn
7
# Copyright © 2008-2022 Björn Påhlsson
9
9
# This file is part of Mandos.
78
81
warnings.simplefilter("default")
80
log = logging.getLogger(sys.argv[0])
81
logging.basicConfig(level="INFO", # Show info level messages
83
log = logging.getLogger(os.path.basename(sys.argv[0]))
84
logging.basicConfig(level="INFO", # Show info level messages
82
85
format="%(message)s") # Show basic log messages
84
87
logging.captureWarnings(True) # Show warnings via the logging system
394
397
def parse_pre_1_6_1_interval(interval):
395
"""Parse an interval string as documented by Mandos before 1.6.1,
398
r"""Parse an interval string as documented by Mandos before 1.6.1,
396
399
and return a datetime.timedelta
398
>>> parse_pre_1_6_1_interval('7d') == datetime.timedelta(days=7)
400
>>> parse_pre_1_6_1_interval('60s') == datetime.timedelta(0, 60)
402
>>> parse_pre_1_6_1_interval('60m') == datetime.timedelta(hours=1)
404
>>> parse_pre_1_6_1_interval('24h') == datetime.timedelta(days=1)
406
>>> parse_pre_1_6_1_interval('1w') == datetime.timedelta(days=7)
408
>>> parse_pre_1_6_1_interval('5m 30s') == datetime.timedelta(0, 330)
410
>>> parse_pre_1_6_1_interval('') == datetime.timedelta(0)
401
>>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7)
403
>>> parse_pre_1_6_1_interval("60s") == datetime.timedelta(0, 60)
405
>>> parse_pre_1_6_1_interval("60m") == datetime.timedelta(hours=1)
407
>>> parse_pre_1_6_1_interval("24h") == datetime.timedelta(days=1)
409
>>> parse_pre_1_6_1_interval("1w") == datetime.timedelta(days=7)
411
>>> parse_pre_1_6_1_interval("5m 30s") == datetime.timedelta(0, 330)
413
>>> parse_pre_1_6_1_interval("") == datetime.timedelta(0)
412
415
>>> # Ignore unknown characters, allow any order and repetitions
413
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m') == datetime.timedelta(2, 480, 18000)
416
>>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") \
417
... == datetime.timedelta(2, 480, 18000)
487
491
object_manager_iface = "org.freedesktop.DBus.ObjectManager"
488
493
def get_managed_objects(self, busname, objectpath):
489
494
return self.call_method("GetManagedObjects", busname,
491
496
self.object_manager_iface)
493
498
properties_iface = "org.freedesktop.DBus.Properties"
494
500
def set_property(self, busname, objectpath, interface, key,
496
502
self.call_method("Set", busname, objectpath,
615
621
def __exit__(self, exc_type, exc_val, exc_tb):
616
622
self.logger.removeFilter(self.nullfilter)
619
624
class CachingBus(SystemBus):
620
625
"""A caching layer for dbus_python_adapter.SystemBus"""
621
627
def __init__(self, *args, **kwargs):
622
628
self.object_cache = {}
623
629
super(dbus_python_adapter.CachingBus,
624
630
self).__init__(*args, **kwargs)
625
632
def get_object(self, busname, objectpath):
627
634
return self.object_cache[(busname, objectpath)]
683
690
class CachingBus(SystemBus):
684
691
"""A caching layer for pydbus_adapter.SystemBus"""
685
693
def __init__(self, *args, **kwargs):
686
694
self.object_cache = {}
687
695
super(pydbus_adapter.CachingBus,
688
696
self).__init__(*args, **kwargs)
689
698
def get(self, busname, objectpath):
691
700
return self.object_cache[(busname, objectpath)]
693
702
new_object = (super(pydbus_adapter.CachingBus, self)
694
703
.get(busname, objectpath))
695
self.object_cache[(busname, objectpath)] = new_object
704
self.object_cache[(busname, objectpath)] = new_object
696
705
return new_object
724
733
iface = proxy_object.get_interface(interface)
725
734
method = getattr(iface, methodname)
726
735
with self.convert_exception(dbus.Error):
727
value = method(*args)
736
value = method(*args)
728
737
# DBussy returns values either as an empty list or as a
729
# tuple: (signature, value)
738
# list of one element with the return value
731
740
return self.type_filter(value[0])
770
781
class CachingBus(MandosBus):
771
782
"""A caching layer for dbussy_adapter.MandosBus"""
772
784
def __init__(self, *args, **kwargs):
773
785
self.object_cache = {}
774
786
super(dbussy_adapter.CachingBus, self).__init__(*args,
776
789
def get_object(self, busname, objectpath):
778
791
return self.object_cache[(busname, objectpath)]
843
855
class Approve(Base):
844
856
def run_on_one_client(self, client, properties):
845
857
self.bus.call_client_method(client, "Approve", True)
848
859
class Deny(Base):
849
860
def run_on_one_client(self, client, properties):
850
861
self.bus.call_client_method(client, "Approve", False)
853
863
class Remove(Base):
854
864
def run(self, clients, bus):
855
865
for clientpath in frozenset(clients.keys()):
856
866
bus.call_server_method("RemoveClient", clientpath)
859
868
class Output(Base):
860
869
"""Abstract class for commands outputting client details"""
861
870
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
867
876
"Checker", "ExtendedTimeout", "Expires",
868
877
"LastCheckerStatus")
871
879
class DumpJSON(Output):
872
880
def run(self, clients, bus=None):
873
881
data = {properties["Name"]:
874
882
{key: properties[key]
875
883
for key in self.all_keywords}
876
884
for properties in clients.values()}
877
print(json.dumps(data, indent=4, separators=(',', ': ')))
885
print(json.dumps(data, indent=4, separators=(",", ": ")))
880
887
class PrintTable(Output):
881
888
def __init__(self, verbose=False):
986
993
def propname(self):
987
994
raise NotImplementedError()
990
996
class Enable(PropertySetter):
991
997
propname = "Enabled"
992
998
value_to_set = True
995
1000
class Disable(PropertySetter):
996
1001
propname = "Enabled"
997
1002
value_to_set = False
1000
1004
class BumpTimeout(PropertySetter):
1001
1005
propname = "LastCheckedOK"
1002
1006
value_to_set = ""
1005
1008
class StartChecker(PropertySetter):
1006
1009
propname = "CheckerRunning"
1007
1010
value_to_set = True
1010
1012
class StopChecker(PropertySetter):
1011
1013
propname = "CheckerRunning"
1012
1014
value_to_set = False
1015
1016
class ApproveByDefault(PropertySetter):
1016
1017
propname = "ApprovedByDefault"
1017
1018
value_to_set = True
1020
1020
class DenyByDefault(PropertySetter):
1021
1021
propname = "ApprovedByDefault"
1022
1022
value_to_set = False
1025
1024
class PropertySetterValue(PropertySetter):
1026
1025
"""Abstract class for PropertySetter recieving a value as
1027
1026
constructor argument instead of a class attribute."""
1028
1028
def __init__(self, value):
1029
1029
self.value_to_set = value
1070
1067
"When setting, convert value from a datetime.timedelta"
1071
1068
self._vts = int(round(value.total_seconds() * 1000))
1074
1070
class SetTimeout(PropertySetterValueMilliseconds):
1075
1071
propname = "Timeout"
1078
1073
class SetExtendedTimeout(PropertySetterValueMilliseconds):
1079
1074
propname = "ExtendedTimeout"
1082
1076
class SetInterval(PropertySetterValueMilliseconds):
1083
1077
propname = "Interval"
1086
1079
class SetApprovalDelay(PropertySetterValueMilliseconds):
1087
1080
propname = "ApprovalDelay"
1090
1082
class SetApprovalDuration(PropertySetterValueMilliseconds):
1091
1083
propname = "ApprovalDuration"
1775
1765
self.call_method(bus, "methodname", "busname",
1776
1766
"objectpath", "interface")
1778
self.assertNotIsInstance(e, dbus.ConnectFailed)
1768
self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
1780
1770
def test_get_converts_to_correct_exception(self):
1781
1771
bus = pydbus_adapter.SystemBus(
2437
2427
busname = "se.recompile.Mandos"
2438
2428
client_interface = "se.recompile.Mandos.Client"
2439
2429
command.Approve().run(self.bus.clients, self.bus)
2430
self.assertTrue(self.bus.clients)
2440
2431
for clientpath in self.bus.clients:
2441
2432
self.assertIn(("Approve", busname, clientpath,
2442
2433
client_interface, (True,)), self.bus.calls)
2445
2436
busname = "se.recompile.Mandos"
2446
2437
client_interface = "se.recompile.Mandos.Client"
2447
2438
command.Deny().run(self.bus.clients, self.bus)
2439
self.assertTrue(self.bus.clients)
2448
2440
for clientpath in self.bus.clients:
2449
2441
self.assertIn(("Approve", busname, clientpath,
2450
2442
client_interface, (False,)),
2451
2443
self.bus.calls)
2453
2445
def test_Remove(self):
2446
busname = "se.recompile.Mandos"
2448
server_interface = "se.recompile.Mandos"
2449
orig_clients = self.bus.clients.copy()
2454
2450
command.Remove().run(self.bus.clients, self.bus)
2455
for clientpath in self.bus.clients:
2456
self.assertIn(("RemoveClient", dbus_busname,
2457
dbus_server_path, dbus_server_interface,
2451
self.assertFalse(self.bus.clients)
2452
for clientpath in orig_clients:
2453
self.assertIn(("RemoveClient", busname,
2454
server_path, server_interface,
2458
2455
(clientpath,)), self.bus.calls)
2460
2457
expected_json = {
2663
2660
cmd_args = [() for x in range(len(self.values_to_get))]
2664
2661
values_to_get = self.values_to_get
2662
self.assertTrue(values_to_get)
2665
2663
for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2666
2664
for clientpath in self.bus.clients:
2667
2665
self.bus.clients[clientpath][self.propname] = (
2669
2667
self.command(*cmd_arg).run(self.bus.clients, self.bus)
2668
self.assertTrue(self.bus.clients)
2670
2669
for clientpath in self.bus.clients:
2671
2670
value = (self.bus.clients[clientpath]
2672
2671
[self.propname])
2731
2730
class TestSetSecretCmd(TestPropertySetterCmd):
2732
2731
command = command.SetSecret
2733
2732
propname = "Secret"
2734
values_to_set = [io.BytesIO(b""),
2735
io.BytesIO(b"secret\0xyzzy\nbar")]
2736
values_to_get = [f.getvalue() for f in values_to_set]
2733
def __init__(self, *args, **kwargs):
2734
self.values_to_set = [io.BytesIO(b""),
2735
io.BytesIO(b"secret\0xyzzy\nbar")]
2736
self.values_to_get = [f.getvalue() for f in
2738
super(TestSetSecretCmd, self).__init__(*args, **kwargs)
2739
2741
class TestSetTimeoutCmd(TestPropertySetterCmd):
2795
def should_only_run_tests():
2797
def parse_test_args():
2798
# type: () -> argparse.Namespace
2796
2799
parser = argparse.ArgumentParser(add_help=False)
2797
parser.add_argument("--check", action='store_true')
2800
parser.add_argument("--check", action="store_true")
2801
parser.add_argument("--prefix", )
2798
2802
args, unknown_args = parser.parse_known_args()
2799
run_tests = args.check
2801
# Remove --check argument from sys.argv
2804
# Remove test options from sys.argv
2802
2805
sys.argv[1:] = unknown_args
2805
2808
# Add all tests from doctest strings
2806
2809
def load_tests(loader, tests, none):
2811
2814
if __name__ == "__main__":
2815
options = parse_test_args()
2813
if should_only_run_tests():
2814
# Call using ./tdd-python-script --check [--verbose]
2818
extra_test_prefix = options.prefix
2819
if extra_test_prefix is not None:
2820
if not (unittest.main(argv=[""], exit=False)
2821
.result.wasSuccessful()):
2823
class ExtraTestLoader(unittest.TestLoader):
2824
testMethodPrefix = extra_test_prefix
2825
# Call using ./scriptname --check [--verbose]
2826
unittest.main(argv=[""], testLoader=ExtraTestLoader())
2828
unittest.main(argv=[""])
2819
2832
logging.shutdown()
2836
# (lambda (&optional extra)
2837
# (if (not (funcall run-tests-in-test-buffer default-directory
2839
# (funcall show-test-buffer-in-test-window)
2840
# (funcall remove-test-window)
2841
# (if extra (message "Extra tests run successfully!"))))
2842
# run-tests-in-test-buffer:
2843
# (lambda (dir &optional extra)
2844
# (with-current-buffer (get-buffer-create "*Test*")
2845
# (setq buffer-read-only nil
2846
# default-directory dir)
2848
# (compilation-mode))
2849
# (let ((process-result
2850
# (let ((inhibit-read-only t))
2851
# (process-file-shell-command
2852
# (funcall get-command-line extra) nil "*Test*"))))
2853
# (and (numberp process-result)
2854
# (= process-result 0))))
2856
# (lambda (&optional extra)
2857
# (let ((quoted-script
2858
# (shell-quote-argument (funcall get-script-name))))
2860
# (concat "%s --check" (if extra " --prefix=atest" ""))
2864
# (if (fboundp 'file-local-name)
2865
# (file-local-name (buffer-file-name))
2866
# (or (file-remote-p (buffer-file-name) 'localname)
2867
# (buffer-file-name))))
2868
# remove-test-window:
2870
# (let ((test-window (get-buffer-window "*Test*")))
2871
# (if test-window (delete-window test-window))))
2872
# show-test-buffer-in-test-window:
2874
# (when (not (get-buffer-window-list "*Test*"))
2875
# (setq next-error-last-buffer (get-buffer "*Test*"))
2876
# (let* ((side (if (>= (window-width) 146) 'right 'bottom))
2877
# (display-buffer-overriding-action
2878
# `((display-buffer-in-side-window) (side . ,side)
2879
# (window-height . fit-window-to-buffer)
2880
# (window-width . fit-window-to-buffer))))
2881
# (display-buffer "*Test*"))))
2884
# (let* ((run-extra-tests (lambda () (interactive)
2885
# (funcall run-tests t)))
2886
# (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
2887
# (outer-keymap `(keymap (3 . ,inner-keymap)))) ; C-c
2888
# (setq minor-mode-overriding-map-alist
2889
# (cons `(run-tests . ,outer-keymap)
2890
# minor-mode-overriding-map-alist)))
2891
# (add-hook 'after-save-hook run-tests 90 t))