1
1
#!/usr/bin/python3 -bbI
2
# -*- after-save-hook: (lambda () (let ((command (if (fboundp 'file-local-name) (file-local-name (buffer-file-name)) (or (file-remote-p (buffer-file-name) 'localname) (buffer-file-name))))) (if (= (progn (if (get-buffer "*Test*") (kill-buffer "*Test*")) (process-file-shell-command (format "%s --check" (shell-quote-argument command)) nil "*Test*")) 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w))) (progn (with-current-buffer "*Test*" (compilation-mode)) (display-buffer "*Test*" '(display-buffer-in-side-window)))))); coding: utf-8 -*-
2
# -*- coding: utf-8; lexical-binding: t -*-
4
4
# Mandos Control - Control or query the Mandos server
6
# Copyright © 2008-2020 Teddy Hogeborn
7
# Copyright © 2008-2020 Björn Påhlsson
6
# Copyright © 2008-2022 Teddy Hogeborn
7
# Copyright © 2008-2022 Björn Påhlsson
9
9
# This file is part of Mandos.
78
81
warnings.simplefilter("default")
80
log = logging.getLogger(sys.argv[0])
81
logging.basicConfig(level="INFO", # Show info level messages
83
log = logging.getLogger(os.path.basename(sys.argv[0]))
84
logging.basicConfig(level="INFO", # Show info level messages
82
85
format="%(message)s") # Show basic log messages
84
87
logging.captureWarnings(True) # Show warnings via the logging system
394
397
def parse_pre_1_6_1_interval(interval):
395
"""Parse an interval string as documented by Mandos before 1.6.1,
398
r"""Parse an interval string as documented by Mandos before 1.6.1,
396
399
and return a datetime.timedelta
398
>>> parse_pre_1_6_1_interval('7d') == datetime.timedelta(days=7)
400
>>> parse_pre_1_6_1_interval('60s') == datetime.timedelta(0, 60)
402
>>> parse_pre_1_6_1_interval('60m') == datetime.timedelta(hours=1)
404
>>> parse_pre_1_6_1_interval('24h') == datetime.timedelta(days=1)
406
>>> parse_pre_1_6_1_interval('1w') == datetime.timedelta(days=7)
408
>>> parse_pre_1_6_1_interval('5m 30s') == datetime.timedelta(0, 330)
410
>>> parse_pre_1_6_1_interval('') == datetime.timedelta(0)
401
>>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7)
403
>>> parse_pre_1_6_1_interval("60s") == datetime.timedelta(0, 60)
405
>>> parse_pre_1_6_1_interval("60m") == datetime.timedelta(hours=1)
407
>>> parse_pre_1_6_1_interval("24h") == datetime.timedelta(days=1)
409
>>> parse_pre_1_6_1_interval("1w") == datetime.timedelta(days=7)
411
>>> parse_pre_1_6_1_interval("5m 30s") == datetime.timedelta(0, 330)
413
>>> parse_pre_1_6_1_interval("") == datetime.timedelta(0)
412
415
>>> # Ignore unknown characters, allow any order and repetitions
413
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m') == datetime.timedelta(2, 480, 18000)
416
>>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") \
417
... == datetime.timedelta(2, 480, 18000)
487
491
object_manager_iface = "org.freedesktop.DBus.ObjectManager"
488
493
def get_managed_objects(self, busname, objectpath):
489
494
return self.call_method("GetManagedObjects", busname,
491
496
self.object_manager_iface)
493
498
properties_iface = "org.freedesktop.DBus.Properties"
494
500
def set_property(self, busname, objectpath, interface, key,
496
502
self.call_method("Set", busname, objectpath,
615
621
def __exit__(self, exc_type, exc_val, exc_tb):
616
622
self.logger.removeFilter(self.nullfilter)
619
624
class CachingBus(SystemBus):
620
625
"""A caching layer for dbus_python_adapter.SystemBus"""
621
627
def __init__(self, *args, **kwargs):
622
628
self.object_cache = {}
623
629
super(dbus_python_adapter.CachingBus,
624
630
self).__init__(*args, **kwargs)
625
632
def get_object(self, busname, objectpath):
627
634
return self.object_cache[(busname, objectpath)]
683
690
class CachingBus(SystemBus):
684
691
"""A caching layer for pydbus_adapter.SystemBus"""
685
693
def __init__(self, *args, **kwargs):
686
694
self.object_cache = {}
687
695
super(pydbus_adapter.CachingBus,
688
696
self).__init__(*args, **kwargs)
689
698
def get(self, busname, objectpath):
691
700
return self.object_cache[(busname, objectpath)]
693
702
new_object = (super(pydbus_adapter.CachingBus, self)
694
703
.get(busname, objectpath))
695
self.object_cache[(busname, objectpath)] = new_object
704
self.object_cache[(busname, objectpath)] = new_object
696
705
return new_object
772
781
class CachingBus(MandosBus):
773
782
"""A caching layer for dbussy_adapter.MandosBus"""
774
784
def __init__(self, *args, **kwargs):
775
785
self.object_cache = {}
776
786
super(dbussy_adapter.CachingBus, self).__init__(*args,
778
789
def get_object(self, busname, objectpath):
780
791
return self.object_cache[(busname, objectpath)]
845
855
class Approve(Base):
846
856
def run_on_one_client(self, client, properties):
847
857
self.bus.call_client_method(client, "Approve", True)
850
859
class Deny(Base):
851
860
def run_on_one_client(self, client, properties):
852
861
self.bus.call_client_method(client, "Approve", False)
855
863
class Remove(Base):
856
864
def run(self, clients, bus):
857
865
for clientpath in frozenset(clients.keys()):
858
866
bus.call_server_method("RemoveClient", clientpath)
861
868
class Output(Base):
862
869
"""Abstract class for commands outputting client details"""
863
870
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
869
876
"Checker", "ExtendedTimeout", "Expires",
870
877
"LastCheckerStatus")
873
879
class DumpJSON(Output):
874
880
def run(self, clients, bus=None):
875
881
data = {properties["Name"]:
876
882
{key: properties[key]
877
883
for key in self.all_keywords}
878
884
for properties in clients.values()}
879
print(json.dumps(data, indent=4, separators=(',', ': ')))
885
print(json.dumps(data, indent=4, separators=(",", ": ")))
882
887
class PrintTable(Output):
883
888
def __init__(self, verbose=False):
988
993
def propname(self):
989
994
raise NotImplementedError()
992
996
class Enable(PropertySetter):
993
997
propname = "Enabled"
994
998
value_to_set = True
997
1000
class Disable(PropertySetter):
998
1001
propname = "Enabled"
999
1002
value_to_set = False
1002
1004
class BumpTimeout(PropertySetter):
1003
1005
propname = "LastCheckedOK"
1004
1006
value_to_set = ""
1007
1008
class StartChecker(PropertySetter):
1008
1009
propname = "CheckerRunning"
1009
1010
value_to_set = True
1012
1012
class StopChecker(PropertySetter):
1013
1013
propname = "CheckerRunning"
1014
1014
value_to_set = False
1017
1016
class ApproveByDefault(PropertySetter):
1018
1017
propname = "ApprovedByDefault"
1019
1018
value_to_set = True
1022
1020
class DenyByDefault(PropertySetter):
1023
1021
propname = "ApprovedByDefault"
1024
1022
value_to_set = False
1027
1024
class PropertySetterValue(PropertySetter):
1028
1025
"""Abstract class for PropertySetter recieving a value as
1029
1026
constructor argument instead of a class attribute."""
1030
1028
def __init__(self, value):
1031
1029
self.value_to_set = value
1072
1067
"When setting, convert value from a datetime.timedelta"
1073
1068
self._vts = int(round(value.total_seconds() * 1000))
1076
1070
class SetTimeout(PropertySetterValueMilliseconds):
1077
1071
propname = "Timeout"
1080
1073
class SetExtendedTimeout(PropertySetterValueMilliseconds):
1081
1074
propname = "ExtendedTimeout"
1084
1076
class SetInterval(PropertySetterValueMilliseconds):
1085
1077
propname = "Interval"
1088
1079
class SetApprovalDelay(PropertySetterValueMilliseconds):
1089
1080
propname = "ApprovalDelay"
1092
1082
class SetApprovalDuration(PropertySetterValueMilliseconds):
1093
1083
propname = "ApprovalDuration"
2439
2427
busname = "se.recompile.Mandos"
2440
2428
client_interface = "se.recompile.Mandos.Client"
2441
2429
command.Approve().run(self.bus.clients, self.bus)
2430
self.assertTrue(self.bus.clients)
2442
2431
for clientpath in self.bus.clients:
2443
2432
self.assertIn(("Approve", busname, clientpath,
2444
2433
client_interface, (True,)), self.bus.calls)
2447
2436
busname = "se.recompile.Mandos"
2448
2437
client_interface = "se.recompile.Mandos.Client"
2449
2438
command.Deny().run(self.bus.clients, self.bus)
2439
self.assertTrue(self.bus.clients)
2450
2440
for clientpath in self.bus.clients:
2451
2441
self.assertIn(("Approve", busname, clientpath,
2452
2442
client_interface, (False,)),
2453
2443
self.bus.calls)
2455
2445
def test_Remove(self):
2446
busname = "se.recompile.Mandos"
2448
server_interface = "se.recompile.Mandos"
2449
orig_clients = self.bus.clients.copy()
2456
2450
command.Remove().run(self.bus.clients, self.bus)
2457
for clientpath in self.bus.clients:
2458
self.assertIn(("RemoveClient", dbus_busname,
2459
dbus_server_path, dbus_server_interface,
2451
self.assertFalse(self.bus.clients)
2452
for clientpath in orig_clients:
2453
self.assertIn(("RemoveClient", busname,
2454
server_path, server_interface,
2460
2455
(clientpath,)), self.bus.calls)
2462
2457
expected_json = {
2665
2660
cmd_args = [() for x in range(len(self.values_to_get))]
2666
2661
values_to_get = self.values_to_get
2662
self.assertTrue(values_to_get)
2667
2663
for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2668
2664
for clientpath in self.bus.clients:
2669
2665
self.bus.clients[clientpath][self.propname] = (
2671
2667
self.command(*cmd_arg).run(self.bus.clients, self.bus)
2668
self.assertTrue(self.bus.clients)
2672
2669
for clientpath in self.bus.clients:
2673
2670
value = (self.bus.clients[clientpath]
2674
2671
[self.propname])
2733
2730
class TestSetSecretCmd(TestPropertySetterCmd):
2734
2731
command = command.SetSecret
2735
2732
propname = "Secret"
2736
values_to_set = [io.BytesIO(b""),
2737
io.BytesIO(b"secret\0xyzzy\nbar")]
2738
values_to_get = [f.getvalue() for f in values_to_set]
2733
def __init__(self, *args, **kwargs):
2734
self.values_to_set = [io.BytesIO(b""),
2735
io.BytesIO(b"secret\0xyzzy\nbar")]
2736
self.values_to_get = [f.getvalue() for f in
2738
super(TestSetSecretCmd, self).__init__(*args, **kwargs)
2741
2741
class TestSetTimeoutCmd(TestPropertySetterCmd):
2797
def should_only_run_tests():
2797
def parse_test_args():
2798
# type: () -> argparse.Namespace
2798
2799
parser = argparse.ArgumentParser(add_help=False)
2799
parser.add_argument("--check", action='store_true')
2800
parser.add_argument("--check", action="store_true")
2801
parser.add_argument("--prefix", )
2800
2802
args, unknown_args = parser.parse_known_args()
2801
run_tests = args.check
2803
# Remove --check argument from sys.argv
2804
# Remove test options from sys.argv
2804
2805
sys.argv[1:] = unknown_args
2807
2808
# Add all tests from doctest strings
2808
2809
def load_tests(loader, tests, none):
2813
2814
if __name__ == "__main__":
2815
options = parse_test_args()
2815
if should_only_run_tests():
2816
# Call using ./tdd-python-script --check [--verbose]
2818
extra_test_prefix = options.prefix
2819
if extra_test_prefix is not None:
2820
if not (unittest.main(argv=[""], exit=False)
2821
.result.wasSuccessful()):
2823
class ExtraTestLoader(unittest.TestLoader):
2824
testMethodPrefix = extra_test_prefix
2825
# Call using ./scriptname --check [--verbose]
2826
unittest.main(argv=[""], testLoader=ExtraTestLoader())
2828
unittest.main(argv=[""])
2821
2832
logging.shutdown()
2836
# (lambda (&optional extra)
2837
# (if (not (funcall run-tests-in-test-buffer default-directory
2839
# (funcall show-test-buffer-in-test-window)
2840
# (funcall remove-test-window)
2841
# (if extra (message "Extra tests run successfully!"))))
2842
# run-tests-in-test-buffer:
2843
# (lambda (dir &optional extra)
2844
# (with-current-buffer (get-buffer-create "*Test*")
2845
# (setq buffer-read-only nil
2846
# default-directory dir)
2848
# (compilation-mode))
2849
# (let ((process-result
2850
# (let ((inhibit-read-only t))
2851
# (process-file-shell-command
2852
# (funcall get-command-line extra) nil "*Test*"))))
2853
# (and (numberp process-result)
2854
# (= process-result 0))))
2856
# (lambda (&optional extra)
2857
# (let ((quoted-script
2858
# (shell-quote-argument (funcall get-script-name))))
2860
# (concat "%s --check" (if extra " --prefix=atest" ""))
2864
# (if (fboundp 'file-local-name)
2865
# (file-local-name (buffer-file-name))
2866
# (or (file-remote-p (buffer-file-name) 'localname)
2867
# (buffer-file-name))))
2868
# remove-test-window:
2870
# (let ((test-window (get-buffer-window "*Test*")))
2871
# (if test-window (delete-window test-window))))
2872
# show-test-buffer-in-test-window:
2874
# (when (not (get-buffer-window-list "*Test*"))
2875
# (setq next-error-last-buffer (get-buffer "*Test*"))
2876
# (let* ((side (if (>= (window-width) 146) 'right 'bottom))
2877
# (display-buffer-overriding-action
2878
# `((display-buffer-in-side-window) (side . ,side)
2879
# (window-height . fit-window-to-buffer)
2880
# (window-width . fit-window-to-buffer))))
2881
# (display-buffer "*Test*"))))
2884
# (let* ((run-extra-tests (lambda () (interactive)
2885
# (funcall run-tests t)))
2886
# (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
2887
# (outer-keymap `(keymap (3 . ,inner-keymap)))) ; C-c
2888
# (setq minor-mode-overriding-map-alist
2889
# (cons `(run-tests . ,outer-keymap)
2890
# minor-mode-overriding-map-alist)))
2891
# (add-hook 'after-save-hook run-tests 90 t))