235
250
def rfc3339_duration_to_delta(duration):
236
251
"""Parse an RFC 3339 "duration" and return a datetime.timedelta
238
>>> rfc3339_duration_to_delta("P7D")
239
datetime.timedelta(7)
240
>>> rfc3339_duration_to_delta("PT60S")
241
datetime.timedelta(0, 60)
242
>>> rfc3339_duration_to_delta("PT60M")
243
datetime.timedelta(0, 3600)
244
>>> rfc3339_duration_to_delta("P60M")
245
datetime.timedelta(1680)
246
>>> rfc3339_duration_to_delta("PT24H")
247
datetime.timedelta(1)
248
>>> rfc3339_duration_to_delta("P1W")
249
datetime.timedelta(7)
250
>>> rfc3339_duration_to_delta("PT5M30S")
251
datetime.timedelta(0, 330)
252
>>> rfc3339_duration_to_delta("P1DT3M20S")
253
datetime.timedelta(1, 200)
253
>>> rfc3339_duration_to_delta("P7D") == datetime.timedelta(7)
255
>>> rfc3339_duration_to_delta("PT60S") == datetime.timedelta(0, 60)
257
>>> rfc3339_duration_to_delta("PT60M") == datetime.timedelta(hours=1)
260
>>> rfc3339_duration_to_delta("P60M") == datetime.timedelta(1680)
262
>>> rfc3339_duration_to_delta("PT24H") == datetime.timedelta(1)
264
>>> rfc3339_duration_to_delta("P1W") == datetime.timedelta(7)
266
>>> rfc3339_duration_to_delta("PT5M30S") == datetime.timedelta(0, 330)
268
>>> rfc3339_duration_to_delta("P1DT3M20S") == datetime.timedelta(1, 200)
254
270
>>> # Can not be empty:
255
271
>>> rfc3339_duration_to_delta("")
256
272
Traceback (most recent call last):
366
382
"""Parse an interval string as documented by Mandos before 1.6.1,
367
383
and return a datetime.timedelta
369
>>> parse_pre_1_6_1_interval('7d')
370
datetime.timedelta(7)
371
>>> parse_pre_1_6_1_interval('60s')
372
datetime.timedelta(0, 60)
373
>>> parse_pre_1_6_1_interval('60m')
374
datetime.timedelta(0, 3600)
375
>>> parse_pre_1_6_1_interval('24h')
376
datetime.timedelta(1)
377
>>> parse_pre_1_6_1_interval('1w')
378
datetime.timedelta(7)
379
>>> parse_pre_1_6_1_interval('5m 30s')
380
datetime.timedelta(0, 330)
381
>>> parse_pre_1_6_1_interval('')
382
datetime.timedelta(0)
385
>>> parse_pre_1_6_1_interval('7d') == datetime.timedelta(days=7)
387
>>> parse_pre_1_6_1_interval('60s') == datetime.timedelta(0, 60)
389
>>> parse_pre_1_6_1_interval('60m') == datetime.timedelta(hours=1)
391
>>> parse_pre_1_6_1_interval('24h') == datetime.timedelta(days=1)
393
>>> parse_pre_1_6_1_interval('1w') == datetime.timedelta(days=7)
395
>>> parse_pre_1_6_1_interval('5m 30s') == datetime.timedelta(0, 330)
397
>>> parse_pre_1_6_1_interval('') == datetime.timedelta(0)
383
399
>>> # Ignore unknown characters, allow any order and repetitions
384
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
385
datetime.timedelta(2, 480, 18000)
400
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m') == datetime.timedelta(2, 480, 18000)
592
616
return new_object
619
class pydbus_adapter(object):
620
class SystemBus(dbus.MandosBus):
621
def __init__(self, module=pydbus):
623
self.bus = self.pydbus.SystemBus()
625
@contextlib.contextmanager
626
def convert_exception(self, exception_class=dbus.Error):
629
except gi.repository.GLib.Error as e:
630
# This does what "raise from" would do
631
exc = exception_class(*e.args)
635
def call_method(self, methodname, busname, objectpath,
637
proxy_object = self.get(busname, objectpath)
638
log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
639
interface, methodname,
640
", ".join(repr(a) for a in args))
641
method = getattr(proxy_object[interface], methodname)
642
with self.convert_exception():
645
def get(self, busname, objectpath):
646
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
648
with self.convert_exception(dbus.ConnectFailed):
649
if sys.version_info.major <= 2:
650
with warnings.catch_warnings():
651
warnings.filterwarnings(
652
"ignore", "", DeprecationWarning,
653
r"^xml\.etree\.ElementTree$")
654
return self.bus.get(busname, objectpath)
656
return self.bus.get(busname, objectpath)
658
def set_property(self, busname, objectpath, interface, key,
660
proxy_object = self.get(busname, objectpath)
661
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
662
objectpath, self.properties_iface, interface,
664
setattr(proxy_object[interface], key, value)
666
class CachingBus(SystemBus):
667
"""A caching layer for pydbus_adapter.SystemBus"""
668
def __init__(self, *args, **kwargs):
669
self.object_cache = {}
670
super(pydbus_adapter.CachingBus,
671
self).__init__(*args, **kwargs)
672
def get(self, busname, objectpath):
674
return self.object_cache[(busname, objectpath)]
676
new_object = (super(pydbus_adapter.CachingBus, self)
677
.get(busname, objectpath))
678
self.object_cache[(busname, objectpath)] = new_object
595
682
def commands_from_options(options):
597
684
commands = list(options.commands)
1508
1622
self.assertIs(obj1, obj1b)
1625
class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs):
1627
def Stub_pydbus_func(self, func):
1628
class stub_pydbus(object):
1629
"""stub pydbus module"""
1630
class SystemBus(object):
1632
def get(busname, objectpath):
1633
DBusObject = collections.namedtuple(
1634
"DBusObject", ("methodname",))
1635
return {"interface":
1636
DBusObject(methodname=func)}
1639
def call_method(self, bus, methodname, busname, objectpath,
1641
with self.assertLogs(log, logging.DEBUG):
1642
return bus.call_method(methodname, busname, objectpath,
1645
def test_call_method_returns(self):
1646
expected_method_return = Unique()
1647
method_args = (Unique(), Unique())
1649
self.assertEqual(len(method_args), len(args))
1650
for marg, arg in zip(method_args, args):
1651
self.assertIs(marg, arg)
1652
return expected_method_return
1653
stub_pydbus = self.Stub_pydbus_func(func)
1654
bus = pydbus_adapter.SystemBus(stub_pydbus)
1655
ret = self.call_method(bus, "methodname", "busname",
1656
"objectpath", "interface",
1658
self.assertIs(ret, expected_method_return)
1660
def test_call_method_handles_exception(self):
1661
dbus_logger = logging.getLogger("dbus.proxies")
1664
raise gi.repository.GLib.Error()
1666
stub_pydbus = self.Stub_pydbus_func(func)
1667
bus = pydbus_adapter.SystemBus(stub_pydbus)
1669
with self.assertRaises(dbus.Error) as e:
1670
self.call_method(bus, "methodname", "busname",
1671
"objectpath", "interface")
1673
self.assertNotIsInstance(e, dbus.ConnectFailed)
1675
def test_get_converts_to_correct_exception(self):
1676
bus = pydbus_adapter.SystemBus(
1677
self.fake_pydbus_raises_exception_on_connect)
1678
with self.assertRaises(dbus.ConnectFailed):
1679
self.call_method(bus, "methodname", "busname",
1680
"objectpath", "interface")
1682
class fake_pydbus_raises_exception_on_connect(object):
1683
"""fake dbus-python module"""
1686
def get(busname, objectpath):
1687
raise gi.repository.GLib.Error()
1688
Bus = collections.namedtuple("Bus", ["get"])
1691
def test_set_property_uses_setattr(self):
1692
class Object(object):
1695
class pydbus_spy(object):
1696
class SystemBus(object):
1698
def get(busname, objectpath):
1699
return {"interface": obj}
1700
bus = pydbus_adapter.SystemBus(pydbus_spy)
1702
bus.set_property("busname", "objectpath", "interface", "key",
1704
self.assertIs(value, obj.key)
1706
def test_get_suppresses_xml_deprecation_warning(self):
1707
if sys.version_info.major >= 3:
1709
class stub_pydbus_get(object):
1710
class SystemBus(object):
1712
def get(busname, objectpath):
1713
warnings.warn_explicit(
1714
"deprecated", DeprecationWarning,
1715
"xml.etree.ElementTree", 0)
1716
bus = pydbus_adapter.SystemBus(stub_pydbus_get)
1717
with warnings.catch_warnings(record=True) as w:
1718
warnings.simplefilter("always")
1719
bus.get("busname", "objectpath")
1720
self.assertEqual(0, len(w))
1723
class Test_pydbus_adapter_CachingBus(unittest.TestCase):
1724
class stub_pydbus(object):
1725
"""stub pydbus module"""
1726
class SystemBus(object):
1728
def get(busname, objectpath):
1732
self.bus = pydbus_adapter.CachingBus(self.stub_pydbus)
1734
def test_returns_distinct_objectpaths(self):
1735
obj1 = self.bus.get("busname", "objectpath1")
1736
self.assertIsInstance(obj1, Unique)
1737
obj2 = self.bus.get("busname", "objectpath2")
1738
self.assertIsInstance(obj2, Unique)
1739
self.assertIsNot(obj1, obj2)
1741
def test_returns_distinct_busnames(self):
1742
obj1 = self.bus.get("busname1", "objectpath")
1743
self.assertIsInstance(obj1, Unique)
1744
obj2 = self.bus.get("busname2", "objectpath")
1745
self.assertIsInstance(obj2, Unique)
1746
self.assertIsNot(obj1, obj2)
1748
def test_returns_distinct_both(self):
1749
obj1 = self.bus.get("busname1", "objectpath")
1750
self.assertIsInstance(obj1, Unique)
1751
obj2 = self.bus.get("busname2", "objectpath")
1752
self.assertIsInstance(obj2, Unique)
1753
self.assertIsNot(obj1, obj2)
1755
def test_returns_same(self):
1756
obj1 = self.bus.get("busname", "objectpath")
1757
self.assertIsInstance(obj1, Unique)
1758
obj2 = self.bus.get("busname", "objectpath")
1759
self.assertIsInstance(obj2, Unique)
1760
self.assertIs(obj1, obj2)
1762
def test_returns_same_old(self):
1763
obj1 = self.bus.get("busname1", "objectpath1")
1764
self.assertIsInstance(obj1, Unique)
1765
obj2 = self.bus.get("busname2", "objectpath2")
1766
self.assertIsInstance(obj2, Unique)
1767
obj1b = self.bus.get("busname1", "objectpath1")
1768
self.assertIsInstance(obj1b, Unique)
1769
self.assertIsNot(obj1, obj2)
1770
self.assertIsNot(obj2, obj1b)
1771
self.assertIs(obj1, obj1b)
1511
1774
class Test_commands_from_options(unittest.TestCase):
1513
1776
def setUp(self):
1518
1781
self.assert_command_from_args(["--is-enabled", "client"],
1519
1782
command.IsEnabled)
1521
def assert_command_from_args(self, args, command_cls,
1784
def assert_command_from_args(self, args, command_cls, length=1,
1785
clients=None, **cmd_attrs):
1523
1786
"""Assert that parsing ARGS should result in an instance of
1524
1787
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1525
1788
options = self.parser.parse_args(args)
1526
1789
check_option_syntax(self.parser, options)
1527
1790
commands = commands_from_options(options)
1528
self.assertEqual(1, len(commands))
1529
command = commands[0]
1530
self.assertIsInstance(command, command_cls)
1791
self.assertEqual(length, len(commands))
1792
for command in commands:
1793
if isinstance(command, command_cls):
1796
self.assertIsInstance(command, command_cls)
1797
if clients is not None:
1798
self.assertEqual(clients, options.client)
1531
1799
for key, value in cmd_attrs.items():
1532
1800
self.assertEqual(value, getattr(command, key))
1802
def assert_commands_from_args(self, args, commands, clients=None):
1803
for cmd in commands:
1804
self.assert_command_from_args(args, cmd,
1805
length=len(commands),
1534
1808
def test_is_enabled_short(self):
1535
1809
self.assert_command_from_args(["-V", "client"],
1536
1810
command.IsEnabled)