122
152
help="Select all clients")
123
153
parser.add_argument("-v", "--verbose", action="store_true",
124
154
help="Print all fields")
125
parser.add_argument("-j", "--dump-json", action="store_true",
155
parser.add_argument("-j", "--dump-json", dest="commands",
156
action="append_const", default=[],
157
const=command.DumpJSON(),
126
158
help="Dump client data in JSON format")
127
159
enable_disable = parser.add_mutually_exclusive_group()
128
enable_disable.add_argument("-e", "--enable", action="store_true",
160
enable_disable.add_argument("-e", "--enable", dest="commands",
161
action="append_const", default=[],
162
const=command.Enable(),
129
163
help="Enable client")
130
enable_disable.add_argument("-d", "--disable",
164
enable_disable.add_argument("-d", "--disable", dest="commands",
165
action="append_const", default=[],
166
const=command.Disable(),
132
167
help="disable client")
133
parser.add_argument("-b", "--bump-timeout", action="store_true",
168
parser.add_argument("-b", "--bump-timeout", dest="commands",
169
action="append_const", default=[],
170
const=command.BumpTimeout(),
134
171
help="Bump timeout for client")
135
172
start_stop_checker = parser.add_mutually_exclusive_group()
136
173
start_stop_checker.add_argument("--start-checker",
175
action="append_const", default=[],
176
const=command.StartChecker(),
138
177
help="Start checker for client")
139
start_stop_checker.add_argument("--stop-checker",
178
start_stop_checker.add_argument("--stop-checker", dest="commands",
179
action="append_const", default=[],
180
const=command.StopChecker(),
141
181
help="Stop checker for client")
142
parser.add_argument("-V", "--is-enabled", action="store_true",
182
parser.add_argument("-V", "--is-enabled", dest="commands",
183
action="append_const", default=[],
184
const=command.IsEnabled(),
143
185
help="Check if client is enabled")
144
parser.add_argument("-r", "--remove", action="store_true",
186
parser.add_argument("-r", "--remove", dest="commands",
187
action="append_const", default=[],
188
const=command.Remove(),
145
189
help="Remove client")
146
parser.add_argument("-c", "--checker",
190
parser.add_argument("-c", "--checker", dest="commands",
191
action="append", default=[],
192
metavar="COMMAND", type=command.SetChecker,
147
193
help="Set checker command for client")
148
parser.add_argument("-t", "--timeout", type=string_to_delta,
149
help="Set timeout for client")
150
parser.add_argument("--extended-timeout", type=string_to_delta,
151
help="Set extended timeout for client")
152
parser.add_argument("-i", "--interval", type=string_to_delta,
153
help="Set checker interval for client")
195
"-t", "--timeout", dest="commands", action="append",
196
default=[], metavar="TIME",
197
type=command.SetTimeout.argparse(string_to_delta),
198
help="Set timeout for client")
200
"--extended-timeout", dest="commands", action="append",
201
default=[], metavar="TIME",
202
type=command.SetExtendedTimeout.argparse(string_to_delta),
203
help="Set extended timeout for client")
205
"-i", "--interval", dest="commands", action="append",
206
default=[], metavar="TIME",
207
type=command.SetInterval.argparse(string_to_delta),
208
help="Set checker interval for client")
154
209
approve_deny_default = parser.add_mutually_exclusive_group()
155
210
approve_deny_default.add_argument(
156
"--approve-by-default", action="store_true",
157
default=None, dest="approved_by_default",
211
"--approve-by-default", dest="commands",
212
action="append_const", default=[],
213
const=command.ApproveByDefault(),
158
214
help="Set client to be approved by default")
159
215
approve_deny_default.add_argument(
160
"--deny-by-default", action="store_false",
161
dest="approved_by_default",
216
"--deny-by-default", dest="commands",
217
action="append_const", default=[],
218
const=command.DenyByDefault(),
162
219
help="Set client to be denied by default")
163
parser.add_argument("--approval-delay", type=string_to_delta,
164
help="Set delay before client approve/deny")
165
parser.add_argument("--approval-duration", type=string_to_delta,
166
help="Set duration of one client approval")
167
parser.add_argument("-H", "--host", help="Set host for client")
168
parser.add_argument("-s", "--secret",
169
type=argparse.FileType(mode="rb"),
170
help="Set password blob (file) for client")
221
"--approval-delay", dest="commands", action="append",
222
default=[], metavar="TIME",
223
type=command.SetApprovalDelay.argparse(string_to_delta),
224
help="Set delay before client approve/deny")
226
"--approval-duration", dest="commands", action="append",
227
default=[], metavar="TIME",
228
type=command.SetApprovalDuration.argparse(string_to_delta),
229
help="Set duration of one client approval")
230
parser.add_argument("-H", "--host", dest="commands",
231
action="append", default=[], metavar="STRING",
232
type=command.SetHost,
233
help="Set host for client")
235
"-s", "--secret", dest="commands", action="append",
236
default=[], metavar="FILENAME",
237
type=command.SetSecret.argparse(argparse.FileType(mode="rb")),
238
help="Set password blob (file) for client")
171
239
approve_deny = parser.add_mutually_exclusive_group()
172
240
approve_deny.add_argument(
173
"-A", "--approve", action="store_true",
241
"-A", "--approve", dest="commands", action="append_const",
242
default=[], const=command.Approve(),
174
243
help="Approve any current client request")
175
approve_deny.add_argument("-D", "--deny", action="store_true",
244
approve_deny.add_argument("-D", "--deny", dest="commands",
245
action="append_const", default=[],
246
const=command.Deny(),
176
247
help="Deny any current client request")
177
248
parser.add_argument("--debug", action="store_true",
178
249
help="Debug mode (show D-Bus commands)")
369
442
"""Apply additional restrictions on options, not expressible in
372
def has_actions(options):
373
return any((options.enable,
375
options.bump_timeout,
376
options.start_checker,
377
options.stop_checker,
380
options.checker is not None,
381
options.timeout is not None,
382
options.extended_timeout is not None,
383
options.interval is not None,
384
options.approved_by_default is not None,
385
options.approval_delay is not None,
386
options.approval_duration is not None,
387
options.host is not None,
388
options.secret is not None,
445
def has_commands(options, commands=None):
447
commands = (command.Enable,
450
command.StartChecker,
456
command.SetExtendedTimeout,
458
command.ApproveByDefault,
459
command.DenyByDefault,
460
command.SetApprovalDelay,
461
command.SetApprovalDuration,
466
return any(isinstance(cmd, commands)
467
for cmd in options.commands)
392
if has_actions(options) and not (options.client or options.all):
469
if has_commands(options) and not (options.client or options.all):
393
470
parser.error("Options require clients names or --all.")
394
if options.verbose and has_actions(options):
471
if options.verbose and has_commands(options):
395
472
parser.error("--verbose can only be used alone.")
396
if options.dump_json and (options.verbose
397
or has_actions(options)):
473
if (has_commands(options, (command.DumpJSON,))
474
and (options.verbose or len(options.commands) > 1)):
398
475
parser.error("--dump-json can only be used alone.")
399
if options.all and not has_actions(options):
476
if options.all and not has_commands(options):
400
477
parser.error("--all requires an action.")
401
if options.is_enabled and len(options.client) > 1:
478
if (has_commands(options, (command.IsEnabled,))
479
and len(options.client) > 1):
402
480
parser.error("--is-enabled requires exactly one client")
404
options.remove = False
405
if has_actions(options) and not options.deny:
406
parser.error("--remove can only be combined with --deny")
407
options.remove = True
413
class SystemBus(object):
481
if (len(options.commands) > 1
482
and has_commands(options, (command.Remove,))
483
and not has_commands(options, (command.Deny,))):
484
parser.error("--remove can only be combined with --deny")
415
491
object_manager_iface = "org.freedesktop.DBus.ObjectManager"
416
493
def get_managed_objects(self, busname, objectpath):
417
494
return self.call_method("GetManagedObjects", busname,
419
496
self.object_manager_iface)
421
498
properties_iface = "org.freedesktop.DBus.Properties"
422
500
def set_property(self, busname, objectpath, interface, key,
424
502
self.call_method("Set", busname, objectpath,
425
503
self.properties_iface, interface, key,
506
def call_method(self, methodname, busname, objectpath,
508
raise NotImplementedError()
429
510
class MandosBus(SystemBus):
430
511
busname_domain = "se.recompile"
545
636
new_object = super(
546
637
dbus_python_adapter.CachingBus,
547
638
self).get_object(busname, objectpath)
548
self.object_cache[(busname, objectpath)] = new_object
639
self.object_cache[(busname, objectpath)] = new_object
643
class pydbus_adapter:
644
class SystemBus(dbus.MandosBus):
645
def __init__(self, module=pydbus):
647
self.bus = self.pydbus.SystemBus()
649
@contextlib.contextmanager
650
def convert_exception(self, exception_class=dbus.Error):
653
except gi.repository.GLib.Error as e:
654
# This does what "raise from" would do
655
exc = exception_class(*e.args)
659
def call_method(self, methodname, busname, objectpath,
661
proxy_object = self.get(busname, objectpath)
662
log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
663
interface, methodname,
664
", ".join(repr(a) for a in args))
665
method = getattr(proxy_object[interface], methodname)
666
with self.convert_exception():
669
def get(self, busname, objectpath):
670
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
672
with self.convert_exception(dbus.ConnectFailed):
673
if sys.version_info.major <= 2:
674
with warnings.catch_warnings():
675
warnings.filterwarnings(
676
"ignore", "", DeprecationWarning,
677
r"^xml\.etree\.ElementTree$")
678
return self.bus.get(busname, objectpath)
680
return self.bus.get(busname, objectpath)
682
def set_property(self, busname, objectpath, interface, key,
684
proxy_object = self.get(busname, objectpath)
685
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
686
objectpath, self.properties_iface, interface,
688
setattr(proxy_object[interface], key, value)
690
class CachingBus(SystemBus):
691
"""A caching layer for pydbus_adapter.SystemBus"""
693
def __init__(self, *args, **kwargs):
694
self.object_cache = {}
695
super(pydbus_adapter.CachingBus,
696
self).__init__(*args, **kwargs)
698
def get(self, busname, objectpath):
700
return self.object_cache[(busname, objectpath)]
702
new_object = (super(pydbus_adapter.CachingBus, self)
703
.get(busname, objectpath))
704
self.object_cache[(busname, objectpath)] = new_object
708
class dbussy_adapter:
709
class SystemBus(dbus.SystemBus):
712
def __init__(self, dbussy, ravel):
715
self.bus = ravel.system_bus()
717
@contextlib.contextmanager
718
def convert_exception(self, exception_class=dbus.Error):
721
except self.dbussy.DBusError as e:
722
# This does what "raise from" would do
723
exc = exception_class(*e.args)
727
def call_method(self, methodname, busname, objectpath,
729
proxy_object = self.get_object(busname, objectpath)
730
log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath,
731
interface, methodname,
732
", ".join(repr(a) for a in args))
733
iface = proxy_object.get_interface(interface)
734
method = getattr(iface, methodname)
735
with self.convert_exception(dbus.Error):
736
value = method(*args)
737
# DBussy returns values either as an empty list or as a
738
# list of one element with the return value
740
return self.type_filter(value[0])
742
def get_object(self, busname, objectpath):
743
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
745
with self.convert_exception(dbus.ConnectFailed):
746
return self.bus[busname][objectpath]
748
def type_filter(self, value):
749
"""Convert the most bothersome types to Python types"""
750
# A D-Bus Variant value is represented as the Python type
751
# Tuple[dbussy.DBUS.Signature, Any]
752
if isinstance(value, tuple):
754
and isinstance(value[0],
755
self.dbussy.DBUS.Signature)):
756
return self.type_filter(value[1])
757
elif isinstance(value, self.dbussy.DBUS.ObjectPath):
759
# Also recurse into dictionaries
760
elif isinstance(value, dict):
761
return {self.type_filter(key):
762
self.type_filter(subval)
763
for key, subval in value.items()}
766
def set_property(self, busname, objectpath, interface, key,
768
proxy_object = self.get_object(busname, objectpath)
769
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
770
objectpath, self.properties_iface, interface,
773
# DBussy wants a Byte Array to be a sequence of
774
# values, not a byte string
776
setattr(proxy_object.get_interface(interface), key, value)
778
class MandosBus(SystemBus, dbus.MandosBus):
781
class CachingBus(MandosBus):
782
"""A caching layer for dbussy_adapter.MandosBus"""
784
def __init__(self, *args, **kwargs):
785
self.object_cache = {}
786
super(dbussy_adapter.CachingBus, self).__init__(*args,
789
def get_object(self, busname, objectpath):
791
return self.object_cache[(busname, objectpath)]
794
dbussy_adapter.CachingBus,
795
self).get_object(busname, objectpath)
796
self.object_cache[(busname, objectpath)] = new_object
549
797
return new_object
552
800
def commands_from_options(options):
556
if options.is_enabled:
557
commands.append(command.IsEnabled())
560
commands.append(command.Approve())
563
commands.append(command.Deny())
566
commands.append(command.Remove())
568
if options.dump_json:
569
commands.append(command.DumpJSON())
572
commands.append(command.Enable())
575
commands.append(command.Disable())
577
if options.bump_timeout:
578
commands.append(command.BumpTimeout())
580
if options.start_checker:
581
commands.append(command.StartChecker())
583
if options.stop_checker:
584
commands.append(command.StopChecker())
586
if options.approved_by_default is not None:
587
if options.approved_by_default:
588
commands.append(command.ApproveByDefault())
802
commands = list(options.commands)
804
def find_cmd(cmd, commands):
806
for i, c in enumerate(commands):
807
if isinstance(c, cmd):
811
# If command.Remove is present, move any instances of command.Deny
812
# to occur ahead of command.Remove.
813
index_of_remove = find_cmd(command.Remove, commands)
814
before_remove = commands[:index_of_remove]
815
after_remove = commands[index_of_remove:]
817
for cmd in after_remove:
818
if isinstance(cmd, command.Deny):
819
before_remove.append(cmd)
590
commands.append(command.DenyByDefault())
592
if options.checker is not None:
593
commands.append(command.SetChecker(options.checker))
595
if options.host is not None:
596
commands.append(command.SetHost(options.host))
598
if options.secret is not None:
599
commands.append(command.SetSecret(options.secret))
601
if options.timeout is not None:
602
commands.append(command.SetTimeout(options.timeout))
604
if options.extended_timeout:
606
command.SetExtendedTimeout(options.extended_timeout))
608
if options.interval is not None:
609
commands.append(command.SetInterval(options.interval))
611
if options.approval_delay is not None:
613
command.SetApprovalDelay(options.approval_delay))
615
if options.approval_duration is not None:
617
command.SetApprovalDuration(options.approval_duration))
821
cleaned_after.append(cmd)
822
if cleaned_after != after_remove:
823
commands = before_remove + cleaned_after
619
825
# If no command option has been given, show table of clients,
620
826
# optionally verbosely
1024
1222
def test_actions_all_conflicts_with_verbose(self):
1025
1223
for action, value in self.actions.items():
1026
options = self.parser.parse_args()
1027
setattr(options, action, value)
1029
options.verbose = True
1224
args = self.actionargs(action, value, "--all",
1030
1226
with self.assertParseError():
1031
self.check_option_syntax(options)
1227
self.parse_args(args)
1033
1229
def test_actions_with_client_conflicts_with_verbose(self):
1034
1230
for action, value in self.actions.items():
1035
options = self.parser.parse_args()
1036
setattr(options, action, value)
1037
options.verbose = True
1038
options.client = ["client"]
1231
args = self.actionargs(action, value, "--verbose",
1039
1233
with self.assertParseError():
1040
self.check_option_syntax(options)
1234
self.parse_args(args)
1042
1236
def test_dump_json_conflicts_with_verbose(self):
1043
options = self.parser.parse_args()
1044
options.dump_json = True
1045
options.verbose = True
1237
args = ["--dump-json", "--verbose"]
1046
1238
with self.assertParseError():
1047
self.check_option_syntax(options)
1239
self.parse_args(args)
1049
1241
def test_dump_json_conflicts_with_action(self):
1050
1242
for action, value in self.actions.items():
1051
options = self.parser.parse_args()
1052
setattr(options, action, value)
1053
options.dump_json = True
1243
args = self.actionargs(action, value, "--dump-json")
1054
1244
with self.assertParseError():
1055
self.check_option_syntax(options)
1245
self.parse_args(args)
1057
1247
def test_all_can_not_be_alone(self):
1058
options = self.parser.parse_args()
1060
1249
with self.assertParseError():
1061
self.check_option_syntax(options)
1250
self.parse_args(args)
1063
1252
def test_all_is_ok_with_any_action(self):
1064
1253
for action, value in self.actions.items():
1065
options = self.parser.parse_args()
1066
setattr(options, action, value)
1068
self.check_option_syntax(options)
1254
args = self.actionargs(action, value, "--all")
1255
self.parse_args(args)
1070
1257
def test_any_action_is_ok_with_one_client(self):
1071
1258
for action, value in self.actions.items():
1072
options = self.parser.parse_args()
1073
setattr(options, action, value)
1074
options.client = ["client"]
1075
self.check_option_syntax(options)
1259
args = self.actionargs(action, value, "client")
1260
self.parse_args(args)
1077
1262
def test_one_client_with_all_actions_except_is_enabled(self):
1078
options = self.parser.parse_args()
1079
1263
for action, value in self.actions.items():
1080
if action == "is_enabled":
1264
if action == "--is-enabled":
1082
setattr(options, action, value)
1083
options.client = ["client"]
1084
self.check_option_syntax(options)
1266
args = self.actionargs(action, value, "client")
1267
self.parse_args(args)
1086
1269
def test_two_clients_with_all_actions_except_is_enabled(self):
1087
options = self.parser.parse_args()
1088
1270
for action, value in self.actions.items():
1089
if action == "is_enabled":
1271
if action == "--is-enabled":
1091
setattr(options, action, value)
1092
options.client = ["client1", "client2"]
1093
self.check_option_syntax(options)
1273
args = self.actionargs(action, value, "client1",
1275
self.parse_args(args)
1095
1277
def test_two_clients_are_ok_with_actions_except_is_enabled(self):
1096
1278
for action, value in self.actions.items():
1097
if action == "is_enabled":
1279
if action == "--is-enabled":
1099
options = self.parser.parse_args()
1100
setattr(options, action, value)
1101
options.client = ["client1", "client2"]
1102
self.check_option_syntax(options)
1281
args = self.actionargs(action, value, "client1",
1283
self.parse_args(args)
1104
1285
def test_is_enabled_fails_without_client(self):
1105
options = self.parser.parse_args()
1106
options.is_enabled = True
1286
args = ["--is-enabled"]
1107
1287
with self.assertParseError():
1108
self.check_option_syntax(options)
1288
self.parse_args(args)
1110
1290
def test_is_enabled_fails_with_two_clients(self):
1111
options = self.parser.parse_args()
1112
options.is_enabled = True
1113
options.client = ["client1", "client2"]
1291
args = ["--is-enabled", "client1", "client2"]
1114
1292
with self.assertParseError():
1115
self.check_option_syntax(options)
1293
self.parse_args(args)
1117
1295
def test_remove_can_only_be_combined_with_action_deny(self):
1118
1296
for action, value in self.actions.items():
1119
if action in {"remove", "deny"}:
1297
if action in {"--remove", "--deny"}:
1121
options = self.parser.parse_args()
1122
setattr(options, action, value)
1124
options.remove = True
1299
args = self.actionargs(action, value, "--all",
1125
1301
with self.assertParseError():
1126
self.check_option_syntax(options)
1302
self.parse_args(args)
1129
1305
class Test_dbus_exceptions(unittest.TestCase):
1516
1719
self.assertIs(obj1, obj1b)
1722
class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs):
1724
def Stub_pydbus_func(self, func):
1726
"""stub pydbus module"""
1729
def get(busname, objectpath):
1730
DBusObject = collections.namedtuple(
1731
"DBusObject", ("methodname",))
1732
return {"interface":
1733
DBusObject(methodname=func)}
1736
def call_method(self, bus, methodname, busname, objectpath,
1738
with self.assertLogs(log, logging.DEBUG):
1739
return bus.call_method(methodname, busname, objectpath,
1742
def test_call_method_returns(self):
1743
expected_method_return = Unique()
1744
method_args = (Unique(), Unique())
1746
self.assertEqual(len(method_args), len(args))
1747
for marg, arg in zip(method_args, args):
1748
self.assertIs(marg, arg)
1749
return expected_method_return
1750
stub_pydbus = self.Stub_pydbus_func(func)
1751
bus = pydbus_adapter.SystemBus(stub_pydbus)
1752
ret = self.call_method(bus, "methodname", "busname",
1753
"objectpath", "interface",
1755
self.assertIs(ret, expected_method_return)
1757
def test_call_method_handles_exception(self):
1759
raise gi.repository.GLib.Error()
1761
stub_pydbus = self.Stub_pydbus_func(func)
1762
bus = pydbus_adapter.SystemBus(stub_pydbus)
1764
with self.assertRaises(dbus.Error) as e:
1765
self.call_method(bus, "methodname", "busname",
1766
"objectpath", "interface")
1768
self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
1770
def test_get_converts_to_correct_exception(self):
1771
bus = pydbus_adapter.SystemBus(
1772
self.fake_pydbus_raises_exception_on_connect)
1773
with self.assertRaises(dbus.ConnectFailed):
1774
self.call_method(bus, "methodname", "busname",
1775
"objectpath", "interface")
1777
class fake_pydbus_raises_exception_on_connect:
1778
"""fake dbus-python module"""
1781
def get(busname, objectpath):
1782
raise gi.repository.GLib.Error()
1783
Bus = collections.namedtuple("Bus", ["get"])
1786
def test_set_property_uses_setattr(self):
1793
def get(busname, objectpath):
1794
return {"interface": obj}
1795
bus = pydbus_adapter.SystemBus(pydbus_spy)
1797
bus.set_property("busname", "objectpath", "interface", "key",
1799
self.assertIs(value, obj.key)
1801
def test_get_suppresses_xml_deprecation_warning(self):
1802
if sys.version_info.major >= 3:
1804
class stub_pydbus_get:
1807
def get(busname, objectpath):
1808
warnings.warn_explicit(
1809
"deprecated", DeprecationWarning,
1810
"xml.etree.ElementTree", 0)
1811
bus = pydbus_adapter.SystemBus(stub_pydbus_get)
1812
with warnings.catch_warnings(record=True) as w:
1813
warnings.simplefilter("always")
1814
bus.get("busname", "objectpath")
1815
self.assertEqual(0, len(w))
1818
class Test_pydbus_adapter_CachingBus(unittest.TestCase):
1820
"""stub pydbus module"""
1823
def get(busname, objectpath):
1827
self.bus = pydbus_adapter.CachingBus(self.stub_pydbus)
1829
def test_returns_distinct_objectpaths(self):
1830
obj1 = self.bus.get("busname", "objectpath1")
1831
self.assertIsInstance(obj1, Unique)
1832
obj2 = self.bus.get("busname", "objectpath2")
1833
self.assertIsInstance(obj2, Unique)
1834
self.assertIsNot(obj1, obj2)
1836
def test_returns_distinct_busnames(self):
1837
obj1 = self.bus.get("busname1", "objectpath")
1838
self.assertIsInstance(obj1, Unique)
1839
obj2 = self.bus.get("busname2", "objectpath")
1840
self.assertIsInstance(obj2, Unique)
1841
self.assertIsNot(obj1, obj2)
1843
def test_returns_distinct_both(self):
1844
obj1 = self.bus.get("busname1", "objectpath")
1845
self.assertIsInstance(obj1, Unique)
1846
obj2 = self.bus.get("busname2", "objectpath")
1847
self.assertIsInstance(obj2, Unique)
1848
self.assertIsNot(obj1, obj2)
1850
def test_returns_same(self):
1851
obj1 = self.bus.get("busname", "objectpath")
1852
self.assertIsInstance(obj1, Unique)
1853
obj2 = self.bus.get("busname", "objectpath")
1854
self.assertIsInstance(obj2, Unique)
1855
self.assertIs(obj1, obj2)
1857
def test_returns_same_old(self):
1858
obj1 = self.bus.get("busname1", "objectpath1")
1859
self.assertIsInstance(obj1, Unique)
1860
obj2 = self.bus.get("busname2", "objectpath2")
1861
self.assertIsInstance(obj2, Unique)
1862
obj1b = self.bus.get("busname1", "objectpath1")
1863
self.assertIsInstance(obj1b, Unique)
1864
self.assertIsNot(obj1, obj2)
1865
self.assertIsNot(obj2, obj1b)
1866
self.assertIs(obj1, obj1b)
1869
class Test_dbussy_adapter_SystemBus(TestCaseWithAssertLogs):
1873
class ObjectPath(str):
1875
class DBusError(Exception):
1878
def fake_ravel_func(self, func):
1882
class DBusInterfaceProxy:
1884
def methodname(*args):
1885
return [func(*args)]
1888
def get_interface(interface):
1889
if interface == "interface":
1890
return DBusInterfaceProxy()
1891
return {"busname": {"objectpath": DBusObject()}}
1894
def call_method(self, bus, methodname, busname, objectpath,
1896
with self.assertLogs(log, logging.DEBUG):
1897
return bus.call_method(methodname, busname, objectpath,
1900
def test_call_method_returns(self):
1901
expected_method_return = Unique()
1902
method_args = (Unique(), Unique())
1904
self.assertEqual(len(method_args), len(args))
1905
for marg, arg in zip(method_args, args):
1906
self.assertIs(marg, arg)
1907
return expected_method_return
1908
fake_ravel = self.fake_ravel_func(func)
1909
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1910
ret = self.call_method(bus, "methodname", "busname",
1911
"objectpath", "interface",
1913
self.assertIs(ret, expected_method_return)
1915
def test_call_method_filters_objectpath(self):
1917
return method_return
1918
fake_ravel = self.fake_ravel_func(func)
1919
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1920
method_return = (self.dummy_dbussy.DBUS
1921
.ObjectPath("objectpath"))
1922
ret = self.call_method(bus, "methodname", "busname",
1923
"objectpath", "interface")
1924
self.assertEqual("objectpath", ret)
1925
self.assertNotIsInstance(ret,
1926
self.dummy_dbussy.DBUS.ObjectPath)
1928
def test_call_method_filters_objectpaths_in_dict(self):
1929
ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1931
return method_return
1932
fake_ravel = self.fake_ravel_func(func)
1933
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1935
ObjectPath("objectpath_key_1"):
1936
ObjectPath("objectpath_value_1"),
1937
ObjectPath("objectpath_key_2"):
1938
ObjectPath("objectpath_value_2"),
1940
ret = self.call_method(bus, "methodname", "busname",
1941
"objectpath", "interface")
1942
expected_method_return = {str(key): str(value)
1944
method_return.items()}
1945
for key, value in ret.items():
1946
self.assertNotIsInstance(key, ObjectPath)
1947
self.assertNotIsInstance(value, ObjectPath)
1948
self.assertEqual(expected_method_return, ret)
1949
self.assertIsInstance(ret, dict)
1951
def test_call_method_filters_objectpaths_in_dict_in_dict(self):
1952
ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1954
return method_return
1955
fake_ravel = self.fake_ravel_func(func)
1956
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1958
ObjectPath("key1"): {
1959
ObjectPath("key11"): ObjectPath("value11"),
1960
ObjectPath("key12"): ObjectPath("value12"),
1962
ObjectPath("key2"): {
1963
ObjectPath("key21"): ObjectPath("value21"),
1964
ObjectPath("key22"): ObjectPath("value22"),
1967
ret = self.call_method(bus, "methodname", "busname",
1968
"objectpath", "interface")
1969
expected_method_return = {
1970
"key1": {"key11": "value11",
1971
"key12": "value12"},
1972
"key2": {"key21": "value21",
1973
"key22": "value22"},
1975
self.assertEqual(expected_method_return, ret)
1976
for key, value in ret.items():
1977
self.assertIsInstance(value, dict)
1978
self.assertEqual(expected_method_return[key], value)
1979
self.assertNotIsInstance(key, ObjectPath)
1980
for inner_key, inner_value in value.items():
1981
self.assertIsInstance(value, dict)
1983
expected_method_return[key][inner_key],
1985
self.assertNotIsInstance(key, ObjectPath)
1987
def test_call_method_filters_objectpaths_in_dict_three_deep(self):
1988
ObjectPath = self.dummy_dbussy.DBUS.ObjectPath
1990
return method_return
1991
fake_ravel = self.fake_ravel_func(func)
1992
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
1994
ObjectPath("key1"): {
1995
ObjectPath("key2"): {
1996
ObjectPath("key3"): ObjectPath("value"),
2000
ret = self.call_method(bus, "methodname", "busname",
2001
"objectpath", "interface")
2002
expected_method_return = {"key1": {"key2": {"key3": "value"}}}
2003
self.assertEqual(expected_method_return, ret)
2004
self.assertIsInstance(ret, dict)
2005
self.assertNotIsInstance(next(iter(ret.keys())), ObjectPath)
2006
self.assertIsInstance(ret["key1"], dict)
2007
self.assertNotIsInstance(next(iter(ret["key1"].keys())),
2009
self.assertIsInstance(ret["key1"]["key2"], dict)
2010
self.assertNotIsInstance(
2011
next(iter(ret["key1"]["key2"].keys())),
2013
self.assertEqual("value", ret["key1"]["key2"]["key3"])
2014
self.assertNotIsInstance(ret["key1"]["key2"]["key3"],
2015
self.dummy_dbussy.DBUS.ObjectPath)
2017
def test_call_method_handles_exception(self):
2019
raise self.dummy_dbussy.DBusError()
2021
fake_ravel = self.fake_ravel_func(func)
2022
bus = dbussy_adapter.SystemBus(self.dummy_dbussy, fake_ravel)
2024
with self.assertRaises(dbus.Error) as e:
2025
self.call_method(bus, "methodname", "busname",
2026
"objectpath", "interface")
2028
self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
2030
def test_get_object_converts_to_correct_exception(self):
2031
class fake_ravel_raises_exception_on_connect:
2036
def __getitem__(key):
2037
if key == "objectpath":
2038
raise self.dummy_dbussy.DBusError()
2039
raise Exception(key)
2040
return {"busname": Bus()}
2042
raise self.dummy_dbussy.DBusError()
2043
bus = dbussy_adapter.SystemBus(
2045
fake_ravel_raises_exception_on_connect)
2046
with self.assertRaises(dbus.ConnectFailed):
2047
self.call_method(bus, "methodname", "busname",
2048
"objectpath", "interface")
1519
2051
class Test_commands_from_options(unittest.TestCase):
1521
2053
def setUp(self):
2070
2651
def runTest(self):
2071
2652
if not hasattr(self, "command"):
2072
2653
return # Abstract TestCase class
2073
values_to_get = getattr(self, "values_to_get",
2075
for value_to_set, value_to_get in zip(self.values_to_set,
2077
for clientpath in self.bus.clients:
2078
self.bus.clients[clientpath][self.propname] = Unique()
2079
self.run_command(value_to_set, self.bus.clients)
2080
for clientpath in self.bus.clients:
2081
value = self.bus.clients[clientpath][self.propname]
2655
if hasattr(self, "values_to_set"):
2656
cmd_args = [(value,) for value in self.values_to_set]
2657
values_to_get = getattr(self, "values_to_get",
2660
cmd_args = [() for x in range(len(self.values_to_get))]
2661
values_to_get = self.values_to_get
2662
self.assertTrue(values_to_get)
2663
for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2664
for clientpath in self.bus.clients:
2665
self.bus.clients[clientpath][self.propname] = (
2667
self.command(*cmd_arg).run(self.bus.clients, self.bus)
2668
self.assertTrue(self.bus.clients)
2669
for clientpath in self.bus.clients:
2670
value = (self.bus.clients[clientpath]
2082
2672
self.assertNotIsInstance(value, Unique)
2083
2673
self.assertEqual(value_to_get, value)
2085
def run_command(self, value, clients):
2086
self.command().run(clients, self.bus)
2089
2676
class TestEnableCmd(TestPropertySetterCmd):
2090
2677
command = command.Enable
2091
2678
propname = "Enabled"
2092
values_to_set = [True]
2679
values_to_get = [True]
2095
2682
class TestDisableCmd(TestPropertySetterCmd):
2096
2683
command = command.Disable
2097
2684
propname = "Enabled"
2098
values_to_set = [False]
2685
values_to_get = [False]
2101
2688
class TestBumpTimeoutCmd(TestPropertySetterCmd):
2102
2689
command = command.BumpTimeout
2103
2690
propname = "LastCheckedOK"
2104
values_to_set = [""]
2691
values_to_get = [""]
2107
2694
class TestStartCheckerCmd(TestPropertySetterCmd):
2108
2695
command = command.StartChecker
2109
2696
propname = "CheckerRunning"
2110
values_to_set = [True]
2697
values_to_get = [True]
2113
2700
class TestStopCheckerCmd(TestPropertySetterCmd):
2114
2701
command = command.StopChecker
2115
2702
propname = "CheckerRunning"
2116
values_to_set = [False]
2703
values_to_get = [False]
2119
2706
class TestApproveByDefaultCmd(TestPropertySetterCmd):
2120
2707
command = command.ApproveByDefault
2121
2708
propname = "ApprovedByDefault"
2122
values_to_set = [True]
2709
values_to_get = [True]
2125
2712
class TestDenyByDefaultCmd(TestPropertySetterCmd):
2126
2713
command = command.DenyByDefault
2127
2714
propname = "ApprovedByDefault"
2128
values_to_set = [False]
2131
class TestPropertySetterValueCmd(TestPropertySetterCmd):
2132
"""Abstract class for tests of PropertySetterValueCmd classes"""
2134
def run_command(self, value, clients):
2135
self.command(value).run(clients, self.bus)
2138
class TestSetCheckerCmd(TestPropertySetterValueCmd):
2715
values_to_get = [False]
2718
class TestSetCheckerCmd(TestPropertySetterCmd):
2139
2719
command = command.SetChecker
2140
2720
propname = "Checker"
2141
2721
values_to_set = ["", ":", "fping -q -- %s"]
2144
class TestSetHostCmd(TestPropertySetterValueCmd):
2724
class TestSetHostCmd(TestPropertySetterCmd):
2145
2725
command = command.SetHost
2146
2726
propname = "Host"
2147
2727
values_to_set = ["192.0.2.3", "client.example.org"]
2150
class TestSetSecretCmd(TestPropertySetterValueCmd):
2730
class TestSetSecretCmd(TestPropertySetterCmd):
2151
2731
command = command.SetSecret
2152
2732
propname = "Secret"
2153
values_to_set = [io.BytesIO(b""),
2154
io.BytesIO(b"secret\0xyzzy\nbar")]
2155
values_to_get = [f.getvalue() for f in values_to_set]
2158
class TestSetTimeoutCmd(TestPropertySetterValueCmd):
2733
def __init__(self, *args, **kwargs):
2734
self.values_to_set = [io.BytesIO(b""),
2735
io.BytesIO(b"secret\0xyzzy\nbar")]
2736
self.values_to_get = [f.getvalue() for f in
2738
super(TestSetSecretCmd, self).__init__(*args, **kwargs)
2741
class TestSetTimeoutCmd(TestPropertySetterCmd):
2159
2742
command = command.SetTimeout
2160
2743
propname = "Timeout"
2161
2744
values_to_set = [datetime.timedelta(),
2230
2814
if __name__ == "__main__":
2815
options = parse_test_args()
2232
if should_only_run_tests():
2233
# Call using ./tdd-python-script --check [--verbose]
2818
extra_test_prefix = options.prefix
2819
if extra_test_prefix is not None:
2820
if not (unittest.main(argv=[""], exit=False)
2821
.result.wasSuccessful()):
2823
class ExtraTestLoader(unittest.TestLoader):
2824
testMethodPrefix = extra_test_prefix
2825
# Call using ./scriptname --check [--verbose]
2826
unittest.main(argv=[""], testLoader=ExtraTestLoader())
2828
unittest.main(argv=[""])
2238
2832
logging.shutdown()
2836
# (lambda (&optional extra)
2837
# (if (not (funcall run-tests-in-test-buffer default-directory
2839
# (funcall show-test-buffer-in-test-window)
2840
# (funcall remove-test-window)
2841
# (if extra (message "Extra tests run successfully!"))))
2842
# run-tests-in-test-buffer:
2843
# (lambda (dir &optional extra)
2844
# (with-current-buffer (get-buffer-create "*Test*")
2845
# (setq buffer-read-only nil
2846
# default-directory dir)
2848
# (compilation-mode))
2849
# (let ((process-result
2850
# (let ((inhibit-read-only t))
2851
# (process-file-shell-command
2852
# (funcall get-command-line extra) nil "*Test*"))))
2853
# (and (numberp process-result)
2854
# (= process-result 0))))
2856
# (lambda (&optional extra)
2857
# (let ((quoted-script
2858
# (shell-quote-argument (funcall get-script-name))))
2860
# (concat "%s --check" (if extra " --prefix=atest" ""))
2864
# (if (fboundp 'file-local-name)
2865
# (file-local-name (buffer-file-name))
2866
# (or (file-remote-p (buffer-file-name) 'localname)
2867
# (buffer-file-name))))
2868
# remove-test-window:
2870
# (let ((test-window (get-buffer-window "*Test*")))
2871
# (if test-window (delete-window test-window))))
2872
# show-test-buffer-in-test-window:
2874
# (when (not (get-buffer-window-list "*Test*"))
2875
# (setq next-error-last-buffer (get-buffer "*Test*"))
2876
# (let* ((side (if (>= (window-width) 146) 'right 'bottom))
2877
# (display-buffer-overriding-action
2878
# `((display-buffer-in-side-window) (side . ,side)
2879
# (window-height . fit-window-to-buffer)
2880
# (window-width . fit-window-to-buffer))))
2881
# (display-buffer "*Test*"))))
2884
# (let* ((run-extra-tests (lambda () (interactive)
2885
# (funcall run-tests t)))
2886
# (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
2887
# (outer-keymap `(keymap (3 . ,inner-keymap)))) ; C-c
2888
# (setq minor-mode-overriding-map-alist
2889
# (cons `(run-tests . ,outer-keymap)
2890
# minor-mode-overriding-map-alist)))
2891
# (add-hook 'after-save-hook run-tests 90 t))