1
#!/usr/bin/python3 -bbI
2
# -*- after-save-hook: (lambda () (let ((command (if (fboundp 'file-local-name) (file-local-name (buffer-file-name)) (or (file-remote-p (buffer-file-name) 'localname) (buffer-file-name))))) (if (= (progn (if (get-buffer "*Test*") (kill-buffer "*Test*")) (process-file-shell-command (format "%s --check" (shell-quote-argument command)) nil "*Test*")) 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w))) (progn (with-current-buffer "*Test*" (compilation-mode)) (display-buffer "*Test*" '(display-buffer-in-side-window)))))); coding: utf-8 -*-
2
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
4
4
# Mandos Monitor - Control and monitor the Mandos server
76
72
logging.captureWarnings(True) # Show warnings via the logging system
78
74
if sys.version_info.major == 2:
80
77
io.StringIO = StringIO.StringIO
82
79
locale.setlocale(locale.LC_ALL, "")
253
250
def rfc3339_duration_to_delta(duration):
254
251
"""Parse an RFC 3339 "duration" and return a datetime.timedelta
256
>>> rfc3339_duration_to_delta("P7D") == datetime.timedelta(7)
258
>>> rfc3339_duration_to_delta("PT60S") == datetime.timedelta(0, 60)
260
>>> rfc3339_duration_to_delta("PT60M") == datetime.timedelta(hours=1)
263
>>> rfc3339_duration_to_delta("P60M") == datetime.timedelta(1680)
265
>>> rfc3339_duration_to_delta("PT24H") == datetime.timedelta(1)
267
>>> rfc3339_duration_to_delta("P1W") == datetime.timedelta(7)
269
>>> rfc3339_duration_to_delta("PT5M30S") == datetime.timedelta(0, 330)
271
>>> rfc3339_duration_to_delta("P1DT3M20S") == datetime.timedelta(1, 200)
253
>>> rfc3339_duration_to_delta("P7D")
254
datetime.timedelta(7)
255
>>> rfc3339_duration_to_delta("PT60S")
256
datetime.timedelta(0, 60)
257
>>> rfc3339_duration_to_delta("PT60M")
258
datetime.timedelta(0, 3600)
259
>>> rfc3339_duration_to_delta("P60M")
260
datetime.timedelta(1680)
261
>>> rfc3339_duration_to_delta("PT24H")
262
datetime.timedelta(1)
263
>>> rfc3339_duration_to_delta("P1W")
264
datetime.timedelta(7)
265
>>> rfc3339_duration_to_delta("PT5M30S")
266
datetime.timedelta(0, 330)
267
>>> rfc3339_duration_to_delta("P1DT3M20S")
268
datetime.timedelta(1, 200)
273
269
>>> # Can not be empty:
274
270
>>> rfc3339_duration_to_delta("")
275
271
Traceback (most recent call last):
385
381
"""Parse an interval string as documented by Mandos before 1.6.1,
386
382
and return a datetime.timedelta
388
>>> parse_pre_1_6_1_interval('7d') == datetime.timedelta(days=7)
390
>>> parse_pre_1_6_1_interval('60s') == datetime.timedelta(0, 60)
392
>>> parse_pre_1_6_1_interval('60m') == datetime.timedelta(hours=1)
394
>>> parse_pre_1_6_1_interval('24h') == datetime.timedelta(days=1)
396
>>> parse_pre_1_6_1_interval('1w') == datetime.timedelta(days=7)
398
>>> parse_pre_1_6_1_interval('5m 30s') == datetime.timedelta(0, 330)
400
>>> parse_pre_1_6_1_interval('') == datetime.timedelta(0)
384
>>> parse_pre_1_6_1_interval('7d')
385
datetime.timedelta(7)
386
>>> parse_pre_1_6_1_interval('60s')
387
datetime.timedelta(0, 60)
388
>>> parse_pre_1_6_1_interval('60m')
389
datetime.timedelta(0, 3600)
390
>>> parse_pre_1_6_1_interval('24h')
391
datetime.timedelta(1)
392
>>> parse_pre_1_6_1_interval('1w')
393
datetime.timedelta(7)
394
>>> parse_pre_1_6_1_interval('5m 30s')
395
datetime.timedelta(0, 330)
396
>>> parse_pre_1_6_1_interval('')
397
datetime.timedelta(0)
402
398
>>> # Ignore unknown characters, allow any order and repetitions
403
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m') == datetime.timedelta(2, 480, 18000)
399
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
400
datetime.timedelta(2, 480, 18000)
575
571
for key, subval in value.items()}
578
def set_client_property(self, objectpath, key, value):
580
if not isinstance(value, bytes):
581
value = value.encode("utf-8")
582
value = self.dbus_python.ByteArray(value)
583
return self.set_property(self.busname, objectpath,
584
self.client_interface, key,
575
class SilenceLogger(object):
588
576
"Simple context manager to silence a particular logger"
589
577
def __init__(self, loggername):
590
578
self.logger = logging.getLogger(loggername)
706
class command(object):
719
707
"""A namespace for command classes"""
722
710
"""Abstract base class for commands"""
723
711
def run(self, clients, bus=None):
724
712
"""Normal commands should implement run_on_one_client(),
1314
1302
class Test_dbus_python_adapter_SystemBus(TestCaseWithAssertLogs):
1316
1304
def MockDBusPython_func(self, func):
1317
class mock_dbus_python:
1305
class mock_dbus_python(object):
1318
1306
"""mock dbus-python module"""
1307
class exceptions(object):
1320
1308
"""Pseudo-namespace"""
1321
1309
class DBusException(Exception):
1311
class SystemBus(object):
1325
1313
def get_object(busname, objectpath):
1326
1314
DBusObject = collections.namedtuple(
1327
"DBusObject", ("methodname", "Set"))
1315
"DBusObject", ("methodname",))
1328
1316
def method(*args, **kwargs):
1329
1317
self.assertEqual({"dbus_interface":
1332
1320
return func(*args)
1333
def set_property(interface, key, value,
1334
dbus_interface=None):
1336
"org.freedesktop.DBus.Properties",
1338
self.assertEqual("Secret", key)
1339
return func(interface, key, value,
1340
dbus_interface=dbus_interface)
1341
return DBusObject(methodname=method,
1321
return DBusObject(methodname=method)
1322
class Boolean(object):
1344
1323
def __init__(self, value):
1345
1324
self.value = bool(value)
1346
1325
def __bool__(self):
1535
1512
# Make sure the dbus logger was suppressed
1536
1513
self.assertEqual(0, counting_handler.count)
1538
def test_Set_Secret_sends_bytearray(self):
1540
def func(*args, **kwargs):
1541
ret[0] = (args, kwargs)
1542
mock_dbus_python = self.MockDBusPython_func(func)
1543
bus = dbus_python_adapter.SystemBus(mock_dbus_python)
1544
bus.set_client_property("objectpath", "Secret", "value")
1545
expected_call = (("se.recompile.Mandos.Client", "Secret",
1546
mock_dbus_python.ByteArray(b"value")),
1548
"org.freedesktop.DBus.Properties"})
1549
self.assertEqual(expected_call, ret[0])
1550
if sys.version_info.major == 2:
1551
self.assertIsInstance(ret[0][0][-1],
1552
mock_dbus_python.ByteArray)
1554
1515
def test_get_object_converts_to_correct_exception(self):
1555
1516
bus = dbus_python_adapter.SystemBus(
1556
1517
self.fake_dbus_python_raises_exception_on_connect)
1558
1519
self.call_method(bus, "methodname", "busname",
1559
1520
"objectpath", "interface")
1561
class fake_dbus_python_raises_exception_on_connect:
1522
class fake_dbus_python_raises_exception_on_connect(object):
1562
1523
"""fake dbus-python module"""
1524
class exceptions(object):
1564
1525
"""Pseudo-namespace"""
1565
1526
class DBusException(Exception):
1576
1537
class Test_dbus_python_adapter_CachingBus(unittest.TestCase):
1577
class mock_dbus_python:
1538
class mock_dbus_python(object):
1578
1539
"""mock dbus-python modules"""
1540
class SystemBus(object):
1581
1542
def get_object(busname, objectpath):
1582
1543
return Unique()
1628
1589
class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs):
1630
1591
def Stub_pydbus_func(self, func):
1592
class stub_pydbus(object):
1632
1593
"""stub pydbus module"""
1594
class SystemBus(object):
1635
1596
def get(busname, objectpath):
1636
1597
DBusObject = collections.namedtuple(
1682
1643
self.call_method(bus, "methodname", "busname",
1683
1644
"objectpath", "interface")
1685
class fake_pydbus_raises_exception_on_connect:
1646
class fake_pydbus_raises_exception_on_connect(object):
1686
1647
"""fake dbus-python module"""
1688
1649
def SystemBus(cls):
1692
1653
return Bus(get=get)
1694
1655
def test_set_property_uses_setattr(self):
1656
class Object(object):
1659
class pydbus_spy(object):
1660
class SystemBus(object):
1701
1662
def get(busname, objectpath):
1702
1663
return {"interface": obj}
1709
1670
def test_get_suppresses_xml_deprecation_warning(self):
1710
1671
if sys.version_info.major >= 3:
1712
class stub_pydbus_get:
1673
class stub_pydbus_get(object):
1674
class SystemBus(object):
1715
1676
def get(busname, objectpath):
1716
1677
warnings.warn_explicit(
1784
1745
self.assert_command_from_args(["--is-enabled", "client"],
1785
1746
command.IsEnabled)
1787
def assert_command_from_args(self, args, command_cls, length=1,
1788
clients=None, **cmd_attrs):
1748
def assert_command_from_args(self, args, command_cls,
1789
1750
"""Assert that parsing ARGS should result in an instance of
1790
1751
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1791
1752
options = self.parser.parse_args(args)
1792
1753
check_option_syntax(self.parser, options)
1793
1754
commands = commands_from_options(options)
1794
self.assertEqual(length, len(commands))
1795
for command in commands:
1796
if isinstance(command, command_cls):
1799
self.assertIsInstance(command, command_cls)
1800
if clients is not None:
1801
self.assertEqual(clients, options.client)
1755
self.assertEqual(1, len(commands))
1756
command = commands[0]
1757
self.assertIsInstance(command, command_cls)
1802
1758
for key, value in cmd_attrs.items():
1803
1759
self.assertEqual(value, getattr(command, key))
1805
def assert_commands_from_args(self, args, commands, clients=None):
1806
for cmd in commands:
1807
self.assert_command_from_args(args, cmd,
1808
length=len(commands),
1811
1761
def test_is_enabled_short(self):
1812
1762
self.assert_command_from_args(["-V", "client"],
1813
1763
command.IsEnabled)
2007
def test_manual_page_example_1(self):
2008
self.assert_command_from_args("",
2013
def test_manual_page_example_2(self):
2014
self.assert_command_from_args(
2015
"--verbose foo1.example.org foo2.example.org".split(),
2016
command.PrintTable, clients=["foo1.example.org",
2017
"foo2.example.org"],
2020
def test_manual_page_example_3(self):
2021
self.assert_command_from_args("--enable --all".split(),
2025
def test_manual_page_example_4(self):
2026
self.assert_commands_from_args(
2027
("--timeout=PT5M --interval=PT1M foo1.example.org"
2028
" foo2.example.org").split(),
2029
[command.SetTimeout, command.SetInterval],
2030
clients=["foo1.example.org", "foo2.example.org"])
2032
def test_manual_page_example_5(self):
2033
self.assert_command_from_args("--approve --all".split(),
2038
1957
class TestCommand(unittest.TestCase):
2039
1958
"""Abstract class for tests of command classes"""