253
236
def rfc3339_duration_to_delta(duration):
254
237
"""Parse an RFC 3339 "duration" and return a datetime.timedelta
256
>>> rfc3339_duration_to_delta("P7D") == datetime.timedelta(7)
258
>>> rfc3339_duration_to_delta("PT60S") == datetime.timedelta(0, 60)
260
>>> rfc3339_duration_to_delta("PT60M") == datetime.timedelta(hours=1)
263
>>> rfc3339_duration_to_delta("P60M") == datetime.timedelta(1680)
265
>>> rfc3339_duration_to_delta("PT24H") == datetime.timedelta(1)
267
>>> rfc3339_duration_to_delta("P1W") == datetime.timedelta(7)
269
>>> rfc3339_duration_to_delta("PT5M30S") == datetime.timedelta(0, 330)
271
>>> rfc3339_duration_to_delta("P1DT3M20S") == datetime.timedelta(1, 200)
239
>>> rfc3339_duration_to_delta("P7D")
240
datetime.timedelta(7)
241
>>> rfc3339_duration_to_delta("PT60S")
242
datetime.timedelta(0, 60)
243
>>> rfc3339_duration_to_delta("PT60M")
244
datetime.timedelta(0, 3600)
245
>>> rfc3339_duration_to_delta("P60M")
246
datetime.timedelta(1680)
247
>>> rfc3339_duration_to_delta("PT24H")
248
datetime.timedelta(1)
249
>>> rfc3339_duration_to_delta("P1W")
250
datetime.timedelta(7)
251
>>> rfc3339_duration_to_delta("PT5M30S")
252
datetime.timedelta(0, 330)
253
>>> rfc3339_duration_to_delta("P1DT3M20S")
254
datetime.timedelta(1, 200)
273
255
>>> # Can not be empty:
274
256
>>> rfc3339_duration_to_delta("")
275
257
Traceback (most recent call last):
385
367
"""Parse an interval string as documented by Mandos before 1.6.1,
386
368
and return a datetime.timedelta
388
>>> parse_pre_1_6_1_interval('7d') == datetime.timedelta(days=7)
390
>>> parse_pre_1_6_1_interval('60s') == datetime.timedelta(0, 60)
392
>>> parse_pre_1_6_1_interval('60m') == datetime.timedelta(hours=1)
394
>>> parse_pre_1_6_1_interval('24h') == datetime.timedelta(days=1)
396
>>> parse_pre_1_6_1_interval('1w') == datetime.timedelta(days=7)
398
>>> parse_pre_1_6_1_interval('5m 30s') == datetime.timedelta(0, 330)
400
>>> parse_pre_1_6_1_interval('') == datetime.timedelta(0)
370
>>> parse_pre_1_6_1_interval('7d')
371
datetime.timedelta(7)
372
>>> parse_pre_1_6_1_interval('60s')
373
datetime.timedelta(0, 60)
374
>>> parse_pre_1_6_1_interval('60m')
375
datetime.timedelta(0, 3600)
376
>>> parse_pre_1_6_1_interval('24h')
377
datetime.timedelta(1)
378
>>> parse_pre_1_6_1_interval('1w')
379
datetime.timedelta(7)
380
>>> parse_pre_1_6_1_interval('5m 30s')
381
datetime.timedelta(0, 330)
382
>>> parse_pre_1_6_1_interval('')
383
datetime.timedelta(0)
402
384
>>> # Ignore unknown characters, allow any order and repetitions
403
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m') == datetime.timedelta(2, 480, 18000)
385
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
386
datetime.timedelta(2, 480, 18000)
619
593
return new_object
622
class pydbus_adapter:
623
class SystemBus(dbus.MandosBus):
624
def __init__(self, module=pydbus):
626
self.bus = self.pydbus.SystemBus()
628
@contextlib.contextmanager
629
def convert_exception(self, exception_class=dbus.Error):
632
except gi.repository.GLib.Error as e:
633
# This does what "raise from" would do
634
exc = exception_class(*e.args)
638
def call_method(self, methodname, busname, objectpath,
640
proxy_object = self.get(busname, objectpath)
641
log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
642
interface, methodname,
643
", ".join(repr(a) for a in args))
644
method = getattr(proxy_object[interface], methodname)
645
with self.convert_exception():
648
def get(self, busname, objectpath):
649
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
651
with self.convert_exception(dbus.ConnectFailed):
652
if sys.version_info.major <= 2:
653
with warnings.catch_warnings():
654
warnings.filterwarnings(
655
"ignore", "", DeprecationWarning,
656
r"^xml\.etree\.ElementTree$")
657
return self.bus.get(busname, objectpath)
659
return self.bus.get(busname, objectpath)
661
def set_property(self, busname, objectpath, interface, key,
663
proxy_object = self.get(busname, objectpath)
664
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
665
objectpath, self.properties_iface, interface,
667
setattr(proxy_object[interface], key, value)
669
class CachingBus(SystemBus):
670
"""A caching layer for pydbus_adapter.SystemBus"""
671
def __init__(self, *args, **kwargs):
672
self.object_cache = {}
673
super(pydbus_adapter.CachingBus,
674
self).__init__(*args, **kwargs)
675
def get(self, busname, objectpath):
677
return self.object_cache[(busname, objectpath)]
679
new_object = (super(pydbus_adapter.CachingBus, self)
680
.get(busname, objectpath))
681
self.object_cache[(busname, objectpath)] = new_object
685
596
def commands_from_options(options):
687
598
commands = list(options.commands)
1314
1225
class Test_dbus_python_adapter_SystemBus(TestCaseWithAssertLogs):
1316
1227
def MockDBusPython_func(self, func):
1317
class mock_dbus_python:
1228
class mock_dbus_python(object):
1318
1229
"""mock dbus-python module"""
1230
class exceptions(object):
1320
1231
"""Pseudo-namespace"""
1321
1232
class DBusException(Exception):
1234
class SystemBus(object):
1325
1236
def get_object(busname, objectpath):
1326
1237
DBusObject = collections.namedtuple(
1327
"DBusObject", ("methodname", "Set"))
1238
"DBusObject", ("methodname",))
1328
1239
def method(*args, **kwargs):
1329
1240
self.assertEqual({"dbus_interface":
1332
1243
return func(*args)
1333
def set_property(interface, key, value,
1334
dbus_interface=None):
1336
"org.freedesktop.DBus.Properties",
1338
self.assertEqual("Secret", key)
1339
return func(interface, key, value,
1340
dbus_interface=dbus_interface)
1341
return DBusObject(methodname=method,
1244
return DBusObject(methodname=method)
1245
class Boolean(object):
1344
1246
def __init__(self, value):
1345
1247
self.value = bool(value)
1346
1248
def __bool__(self):
1625
1509
self.assertIs(obj1, obj1b)
1628
class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs):
1630
def Stub_pydbus_func(self, func):
1632
"""stub pydbus module"""
1635
def get(busname, objectpath):
1636
DBusObject = collections.namedtuple(
1637
"DBusObject", ("methodname",))
1638
return {"interface":
1639
DBusObject(methodname=func)}
1642
def call_method(self, bus, methodname, busname, objectpath,
1644
with self.assertLogs(log, logging.DEBUG):
1645
return bus.call_method(methodname, busname, objectpath,
1648
def test_call_method_returns(self):
1649
expected_method_return = Unique()
1650
method_args = (Unique(), Unique())
1652
self.assertEqual(len(method_args), len(args))
1653
for marg, arg in zip(method_args, args):
1654
self.assertIs(marg, arg)
1655
return expected_method_return
1656
stub_pydbus = self.Stub_pydbus_func(func)
1657
bus = pydbus_adapter.SystemBus(stub_pydbus)
1658
ret = self.call_method(bus, "methodname", "busname",
1659
"objectpath", "interface",
1661
self.assertIs(ret, expected_method_return)
1663
def test_call_method_handles_exception(self):
1664
dbus_logger = logging.getLogger("dbus.proxies")
1667
raise gi.repository.GLib.Error()
1669
stub_pydbus = self.Stub_pydbus_func(func)
1670
bus = pydbus_adapter.SystemBus(stub_pydbus)
1672
with self.assertRaises(dbus.Error) as e:
1673
self.call_method(bus, "methodname", "busname",
1674
"objectpath", "interface")
1676
self.assertNotIsInstance(e, dbus.ConnectFailed)
1678
def test_get_converts_to_correct_exception(self):
1679
bus = pydbus_adapter.SystemBus(
1680
self.fake_pydbus_raises_exception_on_connect)
1681
with self.assertRaises(dbus.ConnectFailed):
1682
self.call_method(bus, "methodname", "busname",
1683
"objectpath", "interface")
1685
class fake_pydbus_raises_exception_on_connect:
1686
"""fake dbus-python module"""
1689
def get(busname, objectpath):
1690
raise gi.repository.GLib.Error()
1691
Bus = collections.namedtuple("Bus", ["get"])
1694
def test_set_property_uses_setattr(self):
1701
def get(busname, objectpath):
1702
return {"interface": obj}
1703
bus = pydbus_adapter.SystemBus(pydbus_spy)
1705
bus.set_property("busname", "objectpath", "interface", "key",
1707
self.assertIs(value, obj.key)
1709
def test_get_suppresses_xml_deprecation_warning(self):
1710
if sys.version_info.major >= 3:
1712
class stub_pydbus_get:
1715
def get(busname, objectpath):
1716
warnings.warn_explicit(
1717
"deprecated", DeprecationWarning,
1718
"xml.etree.ElementTree", 0)
1719
bus = pydbus_adapter.SystemBus(stub_pydbus_get)
1720
with warnings.catch_warnings(record=True) as w:
1721
warnings.simplefilter("always")
1722
bus.get("busname", "objectpath")
1723
self.assertEqual(0, len(w))
1726
class Test_pydbus_adapter_CachingBus(unittest.TestCase):
1728
"""stub pydbus module"""
1731
def get(busname, objectpath):
1735
self.bus = pydbus_adapter.CachingBus(self.stub_pydbus)
1737
def test_returns_distinct_objectpaths(self):
1738
obj1 = self.bus.get("busname", "objectpath1")
1739
self.assertIsInstance(obj1, Unique)
1740
obj2 = self.bus.get("busname", "objectpath2")
1741
self.assertIsInstance(obj2, Unique)
1742
self.assertIsNot(obj1, obj2)
1744
def test_returns_distinct_busnames(self):
1745
obj1 = self.bus.get("busname1", "objectpath")
1746
self.assertIsInstance(obj1, Unique)
1747
obj2 = self.bus.get("busname2", "objectpath")
1748
self.assertIsInstance(obj2, Unique)
1749
self.assertIsNot(obj1, obj2)
1751
def test_returns_distinct_both(self):
1752
obj1 = self.bus.get("busname1", "objectpath")
1753
self.assertIsInstance(obj1, Unique)
1754
obj2 = self.bus.get("busname2", "objectpath")
1755
self.assertIsInstance(obj2, Unique)
1756
self.assertIsNot(obj1, obj2)
1758
def test_returns_same(self):
1759
obj1 = self.bus.get("busname", "objectpath")
1760
self.assertIsInstance(obj1, Unique)
1761
obj2 = self.bus.get("busname", "objectpath")
1762
self.assertIsInstance(obj2, Unique)
1763
self.assertIs(obj1, obj2)
1765
def test_returns_same_old(self):
1766
obj1 = self.bus.get("busname1", "objectpath1")
1767
self.assertIsInstance(obj1, Unique)
1768
obj2 = self.bus.get("busname2", "objectpath2")
1769
self.assertIsInstance(obj2, Unique)
1770
obj1b = self.bus.get("busname1", "objectpath1")
1771
self.assertIsInstance(obj1b, Unique)
1772
self.assertIsNot(obj1, obj2)
1773
self.assertIsNot(obj2, obj1b)
1774
self.assertIs(obj1, obj1b)
1777
1512
class Test_commands_from_options(unittest.TestCase):
1779
1514
def setUp(self):
1784
1519
self.assert_command_from_args(["--is-enabled", "client"],
1785
1520
command.IsEnabled)
1787
def assert_command_from_args(self, args, command_cls, length=1,
1788
clients=None, **cmd_attrs):
1522
def assert_command_from_args(self, args, command_cls,
1789
1524
"""Assert that parsing ARGS should result in an instance of
1790
1525
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1791
1526
options = self.parser.parse_args(args)
1792
1527
check_option_syntax(self.parser, options)
1793
1528
commands = commands_from_options(options)
1794
self.assertEqual(length, len(commands))
1795
for command in commands:
1796
if isinstance(command, command_cls):
1799
self.assertIsInstance(command, command_cls)
1800
if clients is not None:
1801
self.assertEqual(clients, options.client)
1529
self.assertEqual(1, len(commands))
1530
command = commands[0]
1531
self.assertIsInstance(command, command_cls)
1802
1532
for key, value in cmd_attrs.items():
1803
1533
self.assertEqual(value, getattr(command, key))
1805
def assert_commands_from_args(self, args, commands, clients=None):
1806
for cmd in commands:
1807
self.assert_command_from_args(args, cmd,
1808
length=len(commands),
1811
1535
def test_is_enabled_short(self):
1812
1536
self.assert_command_from_args(["-V", "client"],
1813
1537
command.IsEnabled)
2007
def test_manual_page_example_1(self):
2008
self.assert_command_from_args("",
2013
def test_manual_page_example_2(self):
2014
self.assert_command_from_args(
2015
"--verbose foo1.example.org foo2.example.org".split(),
2016
command.PrintTable, clients=["foo1.example.org",
2017
"foo2.example.org"],
2020
def test_manual_page_example_3(self):
2021
self.assert_command_from_args("--enable --all".split(),
2025
def test_manual_page_example_4(self):
2026
self.assert_commands_from_args(
2027
("--timeout=PT5M --interval=PT1M foo1.example.org"
2028
" foo2.example.org").split(),
2029
[command.SetTimeout, command.SetInterval],
2030
clients=["foo1.example.org", "foo2.example.org"])
2032
def test_manual_page_example_5(self):
2033
self.assert_command_from_args("--approve --all".split(),
2038
1731
class TestCommand(unittest.TestCase):
2039
1732
"""Abstract class for tests of command classes"""