1
1
#!/usr/bin/python3 -bbI
2
# -*- coding: utf-8; lexical-binding: t -*-
4
# Mandos Control - Control or query the Mandos server
6
# Copyright © 2008-2022 Teddy Hogeborn
7
# Copyright © 2008-2022 Björn Påhlsson
2
# -*- after-save-hook: (lambda () (let ((command (if (fboundp 'file-local-name) (file-local-name (buffer-file-name)) (or (file-remote-p (buffer-file-name) 'localname) (buffer-file-name))))) (if (= (progn (if (get-buffer "*Test*") (kill-buffer "*Test*")) (process-file-shell-command (format "%s --check" (shell-quote-argument command)) nil "*Test*")) 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w))) (progn (with-current-buffer "*Test*" (compilation-mode)) (display-buffer "*Test*" '(display-buffer-in-side-window)))))); coding: utf-8 -*-
4
# Mandos Monitor - Control and monitor the Mandos server
6
# Copyright © 2008-2019 Teddy Hogeborn
7
# Copyright © 2008-2019 Björn Påhlsson
9
9
# This file is part of Mandos.
81
78
warnings.simplefilter("default")
83
log = logging.getLogger(os.path.basename(sys.argv[0]))
84
logging.basicConfig(level="INFO", # Show info level messages
80
log = logging.getLogger(sys.argv[0])
81
logging.basicConfig(level="INFO", # Show info level messages
85
82
format="%(message)s") # Show basic log messages
87
84
logging.captureWarnings(True) # Show warnings via the logging system
397
394
def parse_pre_1_6_1_interval(interval):
398
r"""Parse an interval string as documented by Mandos before 1.6.1,
395
"""Parse an interval string as documented by Mandos before 1.6.1,
399
396
and return a datetime.timedelta
401
>>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7)
403
>>> parse_pre_1_6_1_interval("60s") == datetime.timedelta(0, 60)
405
>>> parse_pre_1_6_1_interval("60m") == datetime.timedelta(hours=1)
407
>>> parse_pre_1_6_1_interval("24h") == datetime.timedelta(days=1)
409
>>> parse_pre_1_6_1_interval("1w") == datetime.timedelta(days=7)
411
>>> parse_pre_1_6_1_interval("5m 30s") == datetime.timedelta(0, 330)
413
>>> parse_pre_1_6_1_interval("") == datetime.timedelta(0)
398
>>> parse_pre_1_6_1_interval('7d') == datetime.timedelta(days=7)
400
>>> parse_pre_1_6_1_interval('60s') == datetime.timedelta(0, 60)
402
>>> parse_pre_1_6_1_interval('60m') == datetime.timedelta(hours=1)
404
>>> parse_pre_1_6_1_interval('24h') == datetime.timedelta(days=1)
406
>>> parse_pre_1_6_1_interval('1w') == datetime.timedelta(days=7)
408
>>> parse_pre_1_6_1_interval('5m 30s') == datetime.timedelta(0, 330)
410
>>> parse_pre_1_6_1_interval('') == datetime.timedelta(0)
415
412
>>> # Ignore unknown characters, allow any order and repetitions
416
>>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") \
417
... == datetime.timedelta(2, 480, 18000)
413
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m') == datetime.timedelta(2, 480, 18000)
491
487
object_manager_iface = "org.freedesktop.DBus.ObjectManager"
493
488
def get_managed_objects(self, busname, objectpath):
494
489
return self.call_method("GetManagedObjects", busname,
496
491
self.object_manager_iface)
498
493
properties_iface = "org.freedesktop.DBus.Properties"
500
494
def set_property(self, busname, objectpath, interface, key,
502
496
self.call_method("Set", busname, objectpath,
621
615
def __exit__(self, exc_type, exc_val, exc_tb):
622
616
self.logger.removeFilter(self.nullfilter)
624
619
class CachingBus(SystemBus):
625
620
"""A caching layer for dbus_python_adapter.SystemBus"""
627
621
def __init__(self, *args, **kwargs):
628
622
self.object_cache = {}
629
623
super(dbus_python_adapter.CachingBus,
630
624
self).__init__(*args, **kwargs)
632
625
def get_object(self, busname, objectpath):
634
627
return self.object_cache[(busname, objectpath)]
690
683
class CachingBus(SystemBus):
691
684
"""A caching layer for pydbus_adapter.SystemBus"""
693
685
def __init__(self, *args, **kwargs):
694
686
self.object_cache = {}
695
687
super(pydbus_adapter.CachingBus,
696
688
self).__init__(*args, **kwargs)
698
689
def get(self, busname, objectpath):
700
691
return self.object_cache[(busname, objectpath)]
702
693
new_object = (super(pydbus_adapter.CachingBus, self)
703
694
.get(busname, objectpath))
704
self.object_cache[(busname, objectpath)] = new_object
695
self.object_cache[(busname, objectpath)] = new_object
705
696
return new_object
733
724
iface = proxy_object.get_interface(interface)
734
725
method = getattr(iface, methodname)
735
726
with self.convert_exception(dbus.Error):
736
value = method(*args)
727
value = method(*args)
737
728
# DBussy returns values either as an empty list or as a
738
# list of one element with the return value
729
# tuple: (signature, value)
740
731
return self.type_filter(value[0])
781
770
class CachingBus(MandosBus):
782
771
"""A caching layer for dbussy_adapter.MandosBus"""
784
772
def __init__(self, *args, **kwargs):
785
773
self.object_cache = {}
786
774
super(dbussy_adapter.CachingBus, self).__init__(*args,
789
776
def get_object(self, busname, objectpath):
791
778
return self.object_cache[(busname, objectpath)]
855
843
class Approve(Base):
856
844
def run_on_one_client(self, client, properties):
857
845
self.bus.call_client_method(client, "Approve", True)
859
848
class Deny(Base):
860
849
def run_on_one_client(self, client, properties):
861
850
self.bus.call_client_method(client, "Approve", False)
863
853
class Remove(Base):
864
854
def run(self, clients, bus):
865
855
for clientpath in frozenset(clients.keys()):
866
856
bus.call_server_method("RemoveClient", clientpath)
868
859
class Output(Base):
869
860
"""Abstract class for commands outputting client details"""
870
861
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
876
867
"Checker", "ExtendedTimeout", "Expires",
877
868
"LastCheckerStatus")
879
871
class DumpJSON(Output):
880
872
def run(self, clients, bus=None):
881
873
data = {properties["Name"]:
882
874
{key: properties[key]
883
875
for key in self.all_keywords}
884
876
for properties in clients.values()}
885
print(json.dumps(data, indent=4, separators=(",", ": ")))
877
print(json.dumps(data, indent=4, separators=(',', ': ')))
887
880
class PrintTable(Output):
888
881
def __init__(self, verbose=False):
993
986
def propname(self):
994
987
raise NotImplementedError()
996
990
class Enable(PropertySetter):
997
991
propname = "Enabled"
998
992
value_to_set = True
1000
995
class Disable(PropertySetter):
1001
996
propname = "Enabled"
1002
997
value_to_set = False
1004
1000
class BumpTimeout(PropertySetter):
1005
1001
propname = "LastCheckedOK"
1006
1002
value_to_set = ""
1008
1005
class StartChecker(PropertySetter):
1009
1006
propname = "CheckerRunning"
1010
1007
value_to_set = True
1012
1010
class StopChecker(PropertySetter):
1013
1011
propname = "CheckerRunning"
1014
1012
value_to_set = False
1016
1015
class ApproveByDefault(PropertySetter):
1017
1016
propname = "ApprovedByDefault"
1018
1017
value_to_set = True
1020
1020
class DenyByDefault(PropertySetter):
1021
1021
propname = "ApprovedByDefault"
1022
1022
value_to_set = False
1024
1025
class PropertySetterValue(PropertySetter):
1025
1026
"""Abstract class for PropertySetter recieving a value as
1026
1027
constructor argument instead of a class attribute."""
1028
1028
def __init__(self, value):
1029
1029
self.value_to_set = value
1067
1070
"When setting, convert value from a datetime.timedelta"
1068
1071
self._vts = int(round(value.total_seconds() * 1000))
1070
1074
class SetTimeout(PropertySetterValueMilliseconds):
1071
1075
propname = "Timeout"
1073
1078
class SetExtendedTimeout(PropertySetterValueMilliseconds):
1074
1079
propname = "ExtendedTimeout"
1076
1082
class SetInterval(PropertySetterValueMilliseconds):
1077
1083
propname = "Interval"
1079
1086
class SetApprovalDelay(PropertySetterValueMilliseconds):
1080
1087
propname = "ApprovalDelay"
1082
1090
class SetApprovalDuration(PropertySetterValueMilliseconds):
1083
1091
propname = "ApprovalDuration"
2427
2437
busname = "se.recompile.Mandos"
2428
2438
client_interface = "se.recompile.Mandos.Client"
2429
2439
command.Approve().run(self.bus.clients, self.bus)
2430
self.assertTrue(self.bus.clients)
2431
2440
for clientpath in self.bus.clients:
2432
2441
self.assertIn(("Approve", busname, clientpath,
2433
2442
client_interface, (True,)), self.bus.calls)
2436
2445
busname = "se.recompile.Mandos"
2437
2446
client_interface = "se.recompile.Mandos.Client"
2438
2447
command.Deny().run(self.bus.clients, self.bus)
2439
self.assertTrue(self.bus.clients)
2440
2448
for clientpath in self.bus.clients:
2441
2449
self.assertIn(("Approve", busname, clientpath,
2442
2450
client_interface, (False,)),
2443
2451
self.bus.calls)
2445
2453
def test_Remove(self):
2446
busname = "se.recompile.Mandos"
2448
server_interface = "se.recompile.Mandos"
2449
orig_clients = self.bus.clients.copy()
2450
2454
command.Remove().run(self.bus.clients, self.bus)
2451
self.assertFalse(self.bus.clients)
2452
for clientpath in orig_clients:
2453
self.assertIn(("RemoveClient", busname,
2454
server_path, server_interface,
2455
for clientpath in self.bus.clients:
2456
self.assertIn(("RemoveClient", dbus_busname,
2457
dbus_server_path, dbus_server_interface,
2455
2458
(clientpath,)), self.bus.calls)
2457
2460
expected_json = {
2660
2663
cmd_args = [() for x in range(len(self.values_to_get))]
2661
2664
values_to_get = self.values_to_get
2662
self.assertTrue(values_to_get)
2663
2665
for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2664
2666
for clientpath in self.bus.clients:
2665
2667
self.bus.clients[clientpath][self.propname] = (
2667
2669
self.command(*cmd_arg).run(self.bus.clients, self.bus)
2668
self.assertTrue(self.bus.clients)
2669
2670
for clientpath in self.bus.clients:
2670
2671
value = (self.bus.clients[clientpath]
2671
2672
[self.propname])
2730
2731
class TestSetSecretCmd(TestPropertySetterCmd):
2731
2732
command = command.SetSecret
2732
2733
propname = "Secret"
2733
def __init__(self, *args, **kwargs):
2734
self.values_to_set = [io.BytesIO(b""),
2735
io.BytesIO(b"secret\0xyzzy\nbar")]
2736
self.values_to_get = [f.getvalue() for f in
2738
super(TestSetSecretCmd, self).__init__(*args, **kwargs)
2734
values_to_set = [io.BytesIO(b""),
2735
io.BytesIO(b"secret\0xyzzy\nbar")]
2736
values_to_get = [f.getvalue() for f in values_to_set]
2741
2739
class TestSetTimeoutCmd(TestPropertySetterCmd):
2797
def parse_test_args():
2798
# type: () -> argparse.Namespace
2795
def should_only_run_tests():
2799
2796
parser = argparse.ArgumentParser(add_help=False)
2800
parser.add_argument("--check", action="store_true")
2801
parser.add_argument("--prefix", )
2797
parser.add_argument("--check", action='store_true')
2802
2798
args, unknown_args = parser.parse_known_args()
2804
# Remove test options from sys.argv
2799
run_tests = args.check
2801
# Remove --check argument from sys.argv
2805
2802
sys.argv[1:] = unknown_args
2808
2805
# Add all tests from doctest strings
2809
2806
def load_tests(loader, tests, none):
2814
2811
if __name__ == "__main__":
2815
options = parse_test_args()
2818
extra_test_prefix = options.prefix
2819
if extra_test_prefix is not None:
2820
if not (unittest.main(argv=[""], exit=False)
2821
.result.wasSuccessful()):
2823
class ExtraTestLoader(unittest.TestLoader):
2824
testMethodPrefix = extra_test_prefix
2825
# Call using ./scriptname --check [--verbose]
2826
unittest.main(argv=[""], testLoader=ExtraTestLoader())
2828
unittest.main(argv=[""])
2813
if should_only_run_tests():
2814
# Call using ./tdd-python-script --check [--verbose]
2832
2819
logging.shutdown()
2836
# (lambda (&optional extra)
2837
# (if (not (funcall run-tests-in-test-buffer default-directory
2839
# (funcall show-test-buffer-in-test-window)
2840
# (funcall remove-test-window)
2841
# (if extra (message "Extra tests run successfully!"))))
2842
# run-tests-in-test-buffer:
2843
# (lambda (dir &optional extra)
2844
# (with-current-buffer (get-buffer-create "*Test*")
2845
# (setq buffer-read-only nil
2846
# default-directory dir)
2848
# (compilation-mode))
2849
# (let ((process-result
2850
# (let ((inhibit-read-only t))
2851
# (process-file-shell-command
2852
# (funcall get-command-line extra) nil "*Test*"))))
2853
# (and (numberp process-result)
2854
# (= process-result 0))))
2856
# (lambda (&optional extra)
2857
# (let ((quoted-script
2858
# (shell-quote-argument (funcall get-script-name))))
2860
# (concat "%s --check" (if extra " --prefix=atest" ""))
2864
# (if (fboundp 'file-local-name)
2865
# (file-local-name (buffer-file-name))
2866
# (or (file-remote-p (buffer-file-name) 'localname)
2867
# (buffer-file-name))))
2868
# remove-test-window:
2870
# (let ((test-window (get-buffer-window "*Test*")))
2871
# (if test-window (delete-window test-window))))
2872
# show-test-buffer-in-test-window:
2874
# (when (not (get-buffer-window-list "*Test*"))
2875
# (setq next-error-last-buffer (get-buffer "*Test*"))
2876
# (let* ((side (if (>= (window-width) 146) 'right 'bottom))
2877
# (display-buffer-overriding-action
2878
# `((display-buffer-in-side-window) (side . ,side)
2879
# (window-height . fit-window-to-buffer)
2880
# (window-width . fit-window-to-buffer))))
2881
# (display-buffer "*Test*"))))
2884
# (let* ((run-extra-tests (lambda () (interactive)
2885
# (funcall run-tests t)))
2886
# (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
2887
# (outer-keymap `(keymap (3 . ,inner-keymap)))) ; C-c
2888
# (setq minor-mode-overriding-map-alist
2889
# (cons `(run-tests . ,outer-keymap)
2890
# minor-mode-overriding-map-alist)))
2891
# (add-hook 'after-save-hook run-tests 90 t))