243
256
return rfc3339_duration_to_delta(interval)
244
257
except ValueError as e:
245
258
log.warning("%s - Parsing as pre-1.6.1 interval instead",
247
260
return parse_pre_1_6_1_interval(interval)
250
263
def rfc3339_duration_to_delta(duration):
251
264
"""Parse an RFC 3339 "duration" and return a datetime.timedelta
253
>>> rfc3339_duration_to_delta("P7D")
254
datetime.timedelta(7)
255
>>> rfc3339_duration_to_delta("PT60S")
256
datetime.timedelta(0, 60)
257
>>> rfc3339_duration_to_delta("PT60M")
258
datetime.timedelta(0, 3600)
259
>>> rfc3339_duration_to_delta("P60M")
260
datetime.timedelta(1680)
261
>>> rfc3339_duration_to_delta("PT24H")
262
datetime.timedelta(1)
263
>>> rfc3339_duration_to_delta("P1W")
264
datetime.timedelta(7)
265
>>> rfc3339_duration_to_delta("PT5M30S")
266
datetime.timedelta(0, 330)
267
>>> rfc3339_duration_to_delta("P1DT3M20S")
268
datetime.timedelta(1, 200)
266
>>> rfc3339_duration_to_delta("P7D") == datetime.timedelta(7)
268
>>> rfc3339_duration_to_delta("PT60S") == datetime.timedelta(0, 60)
270
>>> rfc3339_duration_to_delta("PT60M") == datetime.timedelta(hours=1)
273
>>> rfc3339_duration_to_delta("P60M") == datetime.timedelta(1680)
275
>>> rfc3339_duration_to_delta("PT24H") == datetime.timedelta(1)
277
>>> rfc3339_duration_to_delta("P1W") == datetime.timedelta(7)
279
>>> rfc3339_duration_to_delta("PT5M30S") == datetime.timedelta(0, 330)
281
>>> rfc3339_duration_to_delta("P1DT3M20S") == datetime.timedelta(1, 200)
269
283
>>> # Can not be empty:
270
284
>>> rfc3339_duration_to_delta("")
271
285
Traceback (most recent call last):
381
395
"""Parse an interval string as documented by Mandos before 1.6.1,
382
396
and return a datetime.timedelta
384
>>> parse_pre_1_6_1_interval('7d')
385
datetime.timedelta(7)
386
>>> parse_pre_1_6_1_interval('60s')
387
datetime.timedelta(0, 60)
388
>>> parse_pre_1_6_1_interval('60m')
389
datetime.timedelta(0, 3600)
390
>>> parse_pre_1_6_1_interval('24h')
391
datetime.timedelta(1)
392
>>> parse_pre_1_6_1_interval('1w')
393
datetime.timedelta(7)
394
>>> parse_pre_1_6_1_interval('5m 30s')
395
datetime.timedelta(0, 330)
396
>>> parse_pre_1_6_1_interval('')
397
datetime.timedelta(0)
398
>>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7)
400
>>> parse_pre_1_6_1_interval("60s") == datetime.timedelta(0, 60)
402
>>> parse_pre_1_6_1_interval("60m") == datetime.timedelta(hours=1)
404
>>> parse_pre_1_6_1_interval("24h") == datetime.timedelta(days=1)
406
>>> parse_pre_1_6_1_interval("1w") == datetime.timedelta(days=7)
408
>>> parse_pre_1_6_1_interval("5m 30s") == datetime.timedelta(0, 330)
410
>>> parse_pre_1_6_1_interval("") == datetime.timedelta(0)
398
412
>>> # Ignore unknown characters, allow any order and repetitions
399
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
400
datetime.timedelta(2, 480, 18000)
413
>>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") == datetime.timedelta(2, 480, 18000)
678
696
return new_object
699
class dbussy_adapter:
700
class SystemBus(dbus.SystemBus):
703
def __init__(self, dbussy, ravel):
706
self.bus = ravel.system_bus()
708
@contextlib.contextmanager
709
def convert_exception(self, exception_class=dbus.Error):
712
except self.dbussy.DBusError as e:
713
# This does what "raise from" would do
714
exc = exception_class(*e.args)
718
def call_method(self, methodname, busname, objectpath,
720
proxy_object = self.get_object(busname, objectpath)
721
log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
722
interface, methodname,
723
", ".join(repr(a) for a in args))
724
iface = proxy_object.get_interface(interface)
725
method = getattr(iface, methodname)
726
with self.convert_exception(dbus.Error):
727
value = method(*args)
728
# DBussy returns values either as an empty list or as a
729
# list of one element with the return value
731
return self.type_filter(value[0])
733
def get_object(self, busname, objectpath):
734
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
736
with self.convert_exception(dbus.ConnectFailed):
737
return self.bus[busname][objectpath]
739
def type_filter(self, value):
740
"""Convert the most bothersome types to Python types"""
741
# A D-Bus Variant value is represented as the Python type
742
# Tuple[dbussy.DBUS.Signature, Any]
743
if isinstance(value, tuple):
745
and isinstance(value[0],
746
self.dbussy.DBUS.Signature)):
747
return self.type_filter(value[1])
748
elif isinstance(value, self.dbussy.DBUS.ObjectPath):
750
# Also recurse into dictionaries
751
elif isinstance(value, dict):
752
return {self.type_filter(key):
753
self.type_filter(subval)
754
for key, subval in value.items()}
757
def set_property(self, busname, objectpath, interface, key,
759
proxy_object = self.get_object(busname, objectpath)
760
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
761
objectpath, self.properties_iface, interface,
764
# DBussy wants a Byte Array to be a sequence of
765
# values, not a byte string
767
setattr(proxy_object.get_interface(interface), key, value)
769
class MandosBus(SystemBus, dbus.MandosBus):
772
class CachingBus(MandosBus):
773
"""A caching layer for dbussy_adapter.MandosBus"""
774
def __init__(self, *args, **kwargs):
775
self.object_cache = {}
776
super(dbussy_adapter.CachingBus, self).__init__(*args,
778
def get_object(self, busname, objectpath):
780
return self.object_cache[(busname, objectpath)]
783
dbussy_adapter.CachingBus,
784
self).get_object(busname, objectpath)
785
self.object_cache[(busname, objectpath)] = new_object
681
789
def commands_from_options(options):
683
791
commands = list(options.commands)
1770
1878
self.assertIs(obj1, obj1b)
1881
class Test_dbussy_adapter_SystemBus(TestCaseWithAssertLogs):
1885
class ObjectPath(str):
1887
class DBusError(Exception):
1890
def fake_ravel_func(self, func):
1894
class DBusInterfaceProxy:
1896
def methodname(*args):
1897
return [func(*args)]
1900
def get_interface(interface):
1901
if interface == "interface":
1902
return DBusInterfaceProxy()
1903
return {"busname": {"objectpath": DBusObject()}}
1906
def call_method(self, bus, methodname, busname, objectpath,
1908
with self.assertLogs(log, logging.DEBUG):
1909
return bus.call_method(methodname, busname, objectpath,
1912
def test_call_method_returns(self):
1913
expected_method_return = Unique()
1914
method_args = (Unique(), Unique())
1916
self.assertEqual(len(method_args), len(args))
1917
for marg, arg in zip(method_args, args):
1918
self.assertIs(marg, arg)
1919
return expected_method_return
1920
fake_ravel = self.fake_ravel_func(func)
1921
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1922
ret = self.call_method(bus, "methodname", "busname",
1923
"objectpath", "interface",
1925
self.assertIs(ret, expected_method_return)
1927
def test_call_method_filters_objectpath(self):
1929
return method_return
1930
fake_ravel = self.fake_ravel_func(func)
1931
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1932
method_return = (self.dummy_dbussy.DBUS
1933
.ObjectPath("objectpath"))
1934
ret = self.call_method(bus, "methodname", "busname",
1935
"objectpath", "interface")
1936
self.assertEqual("objectpath", ret)
1937
self.assertNotIsInstance(ret,
1938
self.dummy_dbussy.DBUS.ObjectPath)
1940
def test_call_method_filters_objectpaths_in_dict(self):
1941
ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1943
return method_return
1944
fake_ravel = self.fake_ravel_func(func)
1945
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1947
ObjectPath("objectpath_key_1"):
1948
ObjectPath("objectpath_value_1"),
1949
ObjectPath("objectpath_key_2"):
1950
ObjectPath("objectpath_value_2"),
1952
ret = self.call_method(bus, "methodname", "busname",
1953
"objectpath", "interface")
1954
expected_method_return = {str(key): str(value)
1956
method_return.items()}
1957
for key, value in ret.items():
1958
self.assertNotIsInstance(key, ObjectPath)
1959
self.assertNotIsInstance(value, ObjectPath)
1960
self.assertEqual(expected_method_return, ret)
1961
self.assertIsInstance(ret, dict)
1963
def test_call_method_filters_objectpaths_in_dict_in_dict(self):
1964
ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1966
return method_return
1967
fake_ravel = self.fake_ravel_func(func)
1968
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1970
ObjectPath("key1"): {
1971
ObjectPath("key11"): ObjectPath("value11"),
1972
ObjectPath("key12"): ObjectPath("value12"),
1974
ObjectPath("key2"): {
1975
ObjectPath("key21"): ObjectPath("value21"),
1976
ObjectPath("key22"): ObjectPath("value22"),
1979
ret = self.call_method(bus, "methodname", "busname",
1980
"objectpath", "interface")
1981
expected_method_return = {
1982
"key1": {"key11": "value11",
1983
"key12": "value12"},
1984
"key2": {"key21": "value21",
1985
"key22": "value22"},
1987
self.assertEqual(expected_method_return, ret)
1988
for key, value in ret.items():
1989
self.assertIsInstance(value, dict)
1990
self.assertEqual(expected_method_return[key], value)
1991
self.assertNotIsInstance(key, ObjectPath)
1992
for inner_key, inner_value in value.items():
1993
self.assertIsInstance(value, dict)
1995
expected_method_return[key][inner_key],
1997
self.assertNotIsInstance(key, ObjectPath)
1999
def test_call_method_filters_objectpaths_in_dict_three_deep(self):
2000
ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
2002
return method_return
2003
fake_ravel = self.fake_ravel_func(func)
2004
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
2006
ObjectPath("key1"): {
2007
ObjectPath("key2"): {
2008
ObjectPath("key3"): ObjectPath("value"),
2012
ret = self.call_method(bus, "methodname", "busname",
2013
"objectpath", "interface")
2014
expected_method_return = {"key1": {"key2": {"key3": "value"}}}
2015
self.assertEqual(expected_method_return, ret)
2016
self.assertIsInstance(ret, dict)
2017
self.assertNotIsInstance(next(iter(ret.keys())), ObjectPath)
2018
self.assertIsInstance(ret["key1"], dict)
2019
self.assertNotIsInstance(next(iter(ret["key1"].keys())),
2021
self.assertIsInstance(ret["key1"]["key2"], dict)
2022
self.assertNotIsInstance(
2023
next(iter(ret["key1"]["key2"].keys())),
2025
self.assertEqual("value", ret["key1"]["key2"]["key3"])
2026
self.assertNotIsInstance(ret["key1"]["key2"]["key3"],
2027
self.dummy_dbussy.DBUS.ObjectPath)
2029
def test_call_method_handles_exception(self):
2031
raise self.dummy_dbussy.DBusError()
2033
fake_ravel = self.fake_ravel_func(func)
2034
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
2036
with self.assertRaises(dbus.Error) as e:
2037
self.call_method(bus, "methodname", "busname",
2038
"objectpath", "interface")
2040
self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
2042
def test_get_object_converts_to_correct_exception(self):
2043
class fake_ravel_raises_exception_on_connect:
2048
def __getitem__(key):
2049
if key == "objectpath":
2050
raise self.dummy_dbussy.DBusError()
2051
raise Exception(key)
2052
return {"busname": Bus()}
2054
raise self.dummy_dbussy.DBusError()
2055
bus = dbussy_adapter.SystemBus(
2057
fake_ravel_raises_exception_on_connect)
2058
with self.assertRaises(dbus.ConnectFailed):
2059
self.call_method(bus, "methodname", "busname",
2060
"objectpath", "interface")
1773
2063
class Test_commands_from_options(unittest.TestCase):
1775
2065
def setUp(self):
2157
2448
busname = "se.recompile.Mandos"
2158
2449
client_interface = "se.recompile.Mandos.Client"
2159
2450
command.Deny().run(self.bus.clients, self.bus)
2451
self.assertTrue(self.bus.clients)
2160
2452
for clientpath in self.bus.clients:
2161
2453
self.assertIn(("Approve", busname, clientpath,
2162
2454
client_interface, (False,)),
2163
2455
self.bus.calls)
2165
2457
def test_Remove(self):
2458
busname = "se.recompile.Mandos"
2460
server_interface = "se.recompile.Mandos"
2461
orig_clients = self.bus.clients.copy()
2166
2462
command.Remove().run(self.bus.clients, self.bus)
2167
for clientpath in self.bus.clients:
2168
self.assertIn(("RemoveClient", dbus_busname,
2169
dbus_server_path, dbus_server_interface,
2463
self.assertFalse(self.bus.clients)
2464
for clientpath in orig_clients:
2465
self.assertIn(("RemoveClient", busname,
2466
server_path, server_interface,
2170
2467
(clientpath,)), self.bus.calls)
2172
2469
expected_json = {