1
1
#!/usr/bin/python3 -bbI
2
# -*- coding: utf-8; lexical-binding: t -*-
2
# -*- after-save-hook: (lambda () (let ((command (if (fboundp 'file-local-name) (file-local-name (buffer-file-name)) (or (file-remote-p (buffer-file-name) 'localname) (buffer-file-name))))) (if (= (progn (if (get-buffer "*Test*") (kill-buffer "*Test*")) (process-file-shell-command (format "%s --check" (shell-quote-argument command)) nil "*Test*")) 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w))) (progn (with-current-buffer "*Test*" (compilation-mode)) (display-buffer "*Test*" '(display-buffer-in-side-window)))))); coding: utf-8 -*-
4
4
# Mandos Control - Control or query the Mandos server
6
# Copyright © 2008-2022 Teddy Hogeborn
7
# Copyright © 2008-2022 Björn Påhlsson
6
# Copyright © 2008-2020 Teddy Hogeborn
7
# Copyright © 2008-2020 Björn Påhlsson
9
9
# This file is part of Mandos.
81
78
warnings.simplefilter("default")
83
log = logging.getLogger(os.path.basename(sys.argv[0]))
84
logging.basicConfig(level="INFO", # Show info level messages
80
log = logging.getLogger(sys.argv[0])
81
logging.basicConfig(level="INFO", # Show info level messages
85
82
format="%(message)s") # Show basic log messages
87
84
logging.captureWarnings(True) # Show warnings via the logging system
397
394
def parse_pre_1_6_1_interval(interval):
398
r"""Parse an interval string as documented by Mandos before 1.6.1,
395
"""Parse an interval string as documented by Mandos before 1.6.1,
399
396
and return a datetime.timedelta
401
>>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7)
403
>>> parse_pre_1_6_1_interval("60s") == datetime.timedelta(0, 60)
405
>>> parse_pre_1_6_1_interval("60m") == datetime.timedelta(hours=1)
407
>>> parse_pre_1_6_1_interval("24h") == datetime.timedelta(days=1)
409
>>> parse_pre_1_6_1_interval("1w") == datetime.timedelta(days=7)
411
>>> parse_pre_1_6_1_interval("5m 30s") == datetime.timedelta(0, 330)
413
>>> parse_pre_1_6_1_interval("") == datetime.timedelta(0)
398
>>> parse_pre_1_6_1_interval('7d') == datetime.timedelta(days=7)
400
>>> parse_pre_1_6_1_interval('60s') == datetime.timedelta(0, 60)
402
>>> parse_pre_1_6_1_interval('60m') == datetime.timedelta(hours=1)
404
>>> parse_pre_1_6_1_interval('24h') == datetime.timedelta(days=1)
406
>>> parse_pre_1_6_1_interval('1w') == datetime.timedelta(days=7)
408
>>> parse_pre_1_6_1_interval('5m 30s') == datetime.timedelta(0, 330)
410
>>> parse_pre_1_6_1_interval('') == datetime.timedelta(0)
415
412
>>> # Ignore unknown characters, allow any order and repetitions
416
>>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") \
417
... == datetime.timedelta(2, 480, 18000)
413
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m') == datetime.timedelta(2, 480, 18000)
491
487
object_manager_iface = "org.freedesktop.DBus.ObjectManager"
493
488
def get_managed_objects(self, busname, objectpath):
494
489
return self.call_method("GetManagedObjects", busname,
496
491
self.object_manager_iface)
498
493
properties_iface = "org.freedesktop.DBus.Properties"
500
494
def set_property(self, busname, objectpath, interface, key,
502
496
self.call_method("Set", busname, objectpath,
621
615
def __exit__(self, exc_type, exc_val, exc_tb):
622
616
self.logger.removeFilter(self.nullfilter)
624
619
class CachingBus(SystemBus):
625
620
"""A caching layer for dbus_python_adapter.SystemBus"""
627
621
def __init__(self, *args, **kwargs):
628
622
self.object_cache = {}
629
623
super(dbus_python_adapter.CachingBus,
630
624
self).__init__(*args, **kwargs)
632
625
def get_object(self, busname, objectpath):
634
627
return self.object_cache[(busname, objectpath)]
690
683
class CachingBus(SystemBus):
691
684
"""A caching layer for pydbus_adapter.SystemBus"""
693
685
def __init__(self, *args, **kwargs):
694
686
self.object_cache = {}
695
687
super(pydbus_adapter.CachingBus,
696
688
self).__init__(*args, **kwargs)
698
689
def get(self, busname, objectpath):
700
691
return self.object_cache[(busname, objectpath)]
702
693
new_object = (super(pydbus_adapter.CachingBus, self)
703
694
.get(busname, objectpath))
704
self.object_cache[(busname, objectpath)] = new_object
695
self.object_cache[(busname, objectpath)] = new_object
705
696
return new_object
781
772
class CachingBus(MandosBus):
782
773
"""A caching layer for dbussy_adapter.MandosBus"""
784
774
def __init__(self, *args, **kwargs):
785
775
self.object_cache = {}
786
776
super(dbussy_adapter.CachingBus, self).__init__(*args,
789
778
def get_object(self, busname, objectpath):
791
780
return self.object_cache[(busname, objectpath)]
855
845
class Approve(Base):
856
846
def run_on_one_client(self, client, properties):
857
847
self.bus.call_client_method(client, "Approve", True)
859
850
class Deny(Base):
860
851
def run_on_one_client(self, client, properties):
861
852
self.bus.call_client_method(client, "Approve", False)
863
855
class Remove(Base):
864
856
def run(self, clients, bus):
865
857
for clientpath in frozenset(clients.keys()):
866
858
bus.call_server_method("RemoveClient", clientpath)
868
861
class Output(Base):
869
862
"""Abstract class for commands outputting client details"""
870
863
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
876
869
"Checker", "ExtendedTimeout", "Expires",
877
870
"LastCheckerStatus")
879
873
class DumpJSON(Output):
880
874
def run(self, clients, bus=None):
881
875
data = {properties["Name"]:
882
876
{key: properties[key]
883
877
for key in self.all_keywords}
884
878
for properties in clients.values()}
885
print(json.dumps(data, indent=4, separators=(",", ": ")))
879
print(json.dumps(data, indent=4, separators=(',', ': ')))
887
882
class PrintTable(Output):
888
883
def __init__(self, verbose=False):
993
988
def propname(self):
994
989
raise NotImplementedError()
996
992
class Enable(PropertySetter):
997
993
propname = "Enabled"
998
994
value_to_set = True
1000
997
class Disable(PropertySetter):
1001
998
propname = "Enabled"
1002
999
value_to_set = False
1004
1002
class BumpTimeout(PropertySetter):
1005
1003
propname = "LastCheckedOK"
1006
1004
value_to_set = ""
1008
1007
class StartChecker(PropertySetter):
1009
1008
propname = "CheckerRunning"
1010
1009
value_to_set = True
1012
1012
class StopChecker(PropertySetter):
1013
1013
propname = "CheckerRunning"
1014
1014
value_to_set = False
1016
1017
class ApproveByDefault(PropertySetter):
1017
1018
propname = "ApprovedByDefault"
1018
1019
value_to_set = True
1020
1022
class DenyByDefault(PropertySetter):
1021
1023
propname = "ApprovedByDefault"
1022
1024
value_to_set = False
1024
1027
class PropertySetterValue(PropertySetter):
1025
1028
"""Abstract class for PropertySetter recieving a value as
1026
1029
constructor argument instead of a class attribute."""
1028
1030
def __init__(self, value):
1029
1031
self.value_to_set = value
1067
1072
"When setting, convert value from a datetime.timedelta"
1068
1073
self._vts = int(round(value.total_seconds() * 1000))
1070
1076
class SetTimeout(PropertySetterValueMilliseconds):
1071
1077
propname = "Timeout"
1073
1080
class SetExtendedTimeout(PropertySetterValueMilliseconds):
1074
1081
propname = "ExtendedTimeout"
1076
1084
class SetInterval(PropertySetterValueMilliseconds):
1077
1085
propname = "Interval"
1079
1088
class SetApprovalDelay(PropertySetterValueMilliseconds):
1080
1089
propname = "ApprovalDelay"
1082
1092
class SetApprovalDuration(PropertySetterValueMilliseconds):
1083
1093
propname = "ApprovalDuration"
2427
2439
busname = "se.recompile.Mandos"
2428
2440
client_interface = "se.recompile.Mandos.Client"
2429
2441
command.Approve().run(self.bus.clients, self.bus)
2430
self.assertTrue(self.bus.clients)
2431
2442
for clientpath in self.bus.clients:
2432
2443
self.assertIn(("Approve", busname, clientpath,
2433
2444
client_interface, (True,)), self.bus.calls)
2436
2447
busname = "se.recompile.Mandos"
2437
2448
client_interface = "se.recompile.Mandos.Client"
2438
2449
command.Deny().run(self.bus.clients, self.bus)
2439
self.assertTrue(self.bus.clients)
2440
2450
for clientpath in self.bus.clients:
2441
2451
self.assertIn(("Approve", busname, clientpath,
2442
2452
client_interface, (False,)),
2443
2453
self.bus.calls)
2445
2455
def test_Remove(self):
2446
busname = "se.recompile.Mandos"
2448
server_interface = "se.recompile.Mandos"
2449
orig_clients = self.bus.clients.copy()
2450
2456
command.Remove().run(self.bus.clients, self.bus)
2451
self.assertFalse(self.bus.clients)
2452
for clientpath in orig_clients:
2453
self.assertIn(("RemoveClient", busname,
2454
server_path, server_interface,
2457
for clientpath in self.bus.clients:
2458
self.assertIn(("RemoveClient", dbus_busname,
2459
dbus_server_path, dbus_server_interface,
2455
2460
(clientpath,)), self.bus.calls)
2457
2462
expected_json = {
2660
2665
cmd_args = [() for x in range(len(self.values_to_get))]
2661
2666
values_to_get = self.values_to_get
2662
self.assertTrue(values_to_get)
2663
2667
for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2664
2668
for clientpath in self.bus.clients:
2665
2669
self.bus.clients[clientpath][self.propname] = (
2667
2671
self.command(*cmd_arg).run(self.bus.clients, self.bus)
2668
self.assertTrue(self.bus.clients)
2669
2672
for clientpath in self.bus.clients:
2670
2673
value = (self.bus.clients[clientpath]
2671
2674
[self.propname])
2730
2733
class TestSetSecretCmd(TestPropertySetterCmd):
2731
2734
command = command.SetSecret
2732
2735
propname = "Secret"
2733
def __init__(self, *args, **kwargs):
2734
self.values_to_set = [io.BytesIO(b""),
2735
io.BytesIO(b"secret\0xyzzy\nbar")]
2736
self.values_to_get = [f.getvalue() for f in
2738
super(TestSetSecretCmd, self).__init__(*args, **kwargs)
2736
values_to_set = [io.BytesIO(b""),
2737
io.BytesIO(b"secret\0xyzzy\nbar")]
2738
values_to_get = [f.getvalue() for f in values_to_set]
2741
2741
class TestSetTimeoutCmd(TestPropertySetterCmd):
2797
def parse_test_args():
2798
# type: () -> argparse.Namespace
2797
def should_only_run_tests():
2799
2798
parser = argparse.ArgumentParser(add_help=False)
2800
parser.add_argument("--check", action="store_true")
2801
parser.add_argument("--prefix", )
2799
parser.add_argument("--check", action='store_true')
2802
2800
args, unknown_args = parser.parse_known_args()
2804
# Remove test options from sys.argv
2801
run_tests = args.check
2803
# Remove --check argument from sys.argv
2805
2804
sys.argv[1:] = unknown_args
2808
2807
# Add all tests from doctest strings
2809
2808
def load_tests(loader, tests, none):
2814
2813
if __name__ == "__main__":
2815
options = parse_test_args()
2818
extra_test_prefix = options.prefix
2819
if extra_test_prefix is not None:
2820
if not (unittest.main(argv=[""], exit=False)
2821
.result.wasSuccessful()):
2823
class ExtraTestLoader(unittest.TestLoader):
2824
testMethodPrefix = extra_test_prefix
2825
# Call using ./scriptname --check [--verbose]
2826
unittest.main(argv=[""], testLoader=ExtraTestLoader())
2828
unittest.main(argv=[""])
2815
if should_only_run_tests():
2816
# Call using ./tdd-python-script --check [--verbose]
2832
2821
logging.shutdown()
2836
# (lambda (&optional extra)
2837
# (if (not (funcall run-tests-in-test-buffer default-directory
2839
# (funcall show-test-buffer-in-test-window)
2840
# (funcall remove-test-window)
2841
# (if extra (message "Extra tests run successfully!"))))
2842
# run-tests-in-test-buffer:
2843
# (lambda (dir &optional extra)
2844
# (with-current-buffer (get-buffer-create "*Test*")
2845
# (setq buffer-read-only nil
2846
# default-directory dir)
2848
# (compilation-mode))
2849
# (let ((process-result
2850
# (let ((inhibit-read-only t))
2851
# (process-file-shell-command
2852
# (funcall get-command-line extra) nil "*Test*"))))
2853
# (and (numberp process-result)
2854
# (= process-result 0))))
2856
# (lambda (&optional extra)
2857
# (let ((quoted-script
2858
# (shell-quote-argument (funcall get-script-name))))
2860
# (concat "%s --check" (if extra " --prefix=atest" ""))
2864
# (if (fboundp 'file-local-name)
2865
# (file-local-name (buffer-file-name))
2866
# (or (file-remote-p (buffer-file-name) 'localname)
2867
# (buffer-file-name))))
2868
# remove-test-window:
2870
# (let ((test-window (get-buffer-window "*Test*")))
2871
# (if test-window (delete-window test-window))))
2872
# show-test-buffer-in-test-window:
2874
# (when (not (get-buffer-window-list "*Test*"))
2875
# (setq next-error-last-buffer (get-buffer "*Test*"))
2876
# (let* ((side (if (>= (window-width) 146) 'right 'bottom))
2877
# (display-buffer-overriding-action
2878
# `((display-buffer-in-side-window) (side . ,side)
2879
# (window-height . fit-window-to-buffer)
2880
# (window-width . fit-window-to-buffer))))
2881
# (display-buffer "*Test*"))))
2884
# (let* ((run-extra-tests (lambda () (interactive)
2885
# (funcall run-tests t)))
2886
# (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
2887
# (outer-keymap `(keymap (3 . ,inner-keymap)))) ; C-c
2888
# (setq minor-mode-overriding-map-alist
2889
# (cons `(run-tests . ,outer-keymap)
2890
# minor-mode-overriding-map-alist)))
2891
# (add-hook 'after-save-hook run-tests 90 t))