=== modified file 'mandos-ctl' --- mandos-ctl 2023-02-08 00:05:18 +0000 +++ mandos-ctl 2024-09-07 23:55:04 +0000 @@ -50,12 +50,15 @@ str = unicode input = raw_input + class gi: """Dummy gi module, for the tests""" class repository: class GLib: class Error(Exception): pass + + dbussy = None ravel = None dbus_python = None @@ -78,7 +81,7 @@ warnings.simplefilter("default") log = logging.getLogger(os.path.basename(sys.argv[0])) -logging.basicConfig(level="INFO", # Show info level messages +logging.basicConfig(level="INFO", # Show info level messages format="%(message)s") # Show basic log messages logging.captureWarnings(True) # Show warnings via the logging system @@ -392,7 +395,7 @@ def parse_pre_1_6_1_interval(interval): - """Parse an interval string as documented by Mandos before 1.6.1, + r"""Parse an interval string as documented by Mandos before 1.6.1, and return a datetime.timedelta >>> parse_pre_1_6_1_interval("7d") == datetime.timedelta(days=7) @@ -410,7 +413,8 @@ >>> parse_pre_1_6_1_interval("") == datetime.timedelta(0) True >>> # Ignore unknown characters, allow any order and repetitions - >>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") == datetime.timedelta(2, 480, 18000) + >>> parse_pre_1_6_1_interval("2dxy7zz11y3m5m") \ + ... == datetime.timedelta(2, 480, 18000) True """ @@ -485,12 +489,14 @@ class SystemBus: object_manager_iface = "org.freedesktop.DBus.ObjectManager" + def get_managed_objects(self, busname, objectpath): return self.call_method("GetManagedObjects", busname, objectpath, self.object_manager_iface) properties_iface = "org.freedesktop.DBus.Properties" + def set_property(self, busname, objectpath, interface, key, value): self.call_method("Set", busname, objectpath, @@ -501,7 +507,6 @@ interface, *args): raise NotImplementedError() - class MandosBus(SystemBus): busname_domain = "se.recompile" busname = busname_domain + ".Mandos" @@ -600,6 +605,7 @@ class SilenceLogger: "Simple context manager to silence a particular logger" + def __init__(self, loggername): self.logger = logging.getLogger(loggername) @@ -615,13 +621,14 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.logger.removeFilter(self.nullfilter) - class CachingBus(SystemBus): """A caching layer for dbus_python_adapter.SystemBus""" + def __init__(self, *args, **kwargs): self.object_cache = {} super(dbus_python_adapter.CachingBus, self).__init__(*args, **kwargs) + def get_object(self, busname, objectpath): try: return self.object_cache[(busname, objectpath)] @@ -629,7 +636,7 @@ new_object = super( dbus_python_adapter.CachingBus, self).get_object(busname, objectpath) - self.object_cache[(busname, objectpath)] = new_object + self.object_cache[(busname, objectpath)] = new_object return new_object @@ -682,17 +689,19 @@ class CachingBus(SystemBus): """A caching layer for pydbus_adapter.SystemBus""" + def __init__(self, *args, **kwargs): self.object_cache = {} super(pydbus_adapter.CachingBus, self).__init__(*args, **kwargs) + def get(self, busname, objectpath): try: return self.object_cache[(busname, objectpath)] except KeyError: new_object = (super(pydbus_adapter.CachingBus, self) .get(busname, objectpath)) - self.object_cache[(busname, objectpath)] = new_object + self.object_cache[(busname, objectpath)] = new_object return new_object @@ -724,7 +733,7 @@ iface = proxy_object.get_interface(interface) method = getattr(iface, methodname) with self.convert_exception(dbus.Error): - value = method(*args) + value = method(*args) # DBussy returns values either as an empty list or as a # list of one element with the return value if value: @@ -771,10 +780,12 @@ class CachingBus(MandosBus): """A caching layer for dbussy_adapter.MandosBus""" + def __init__(self, *args, **kwargs): self.object_cache = {} super(dbussy_adapter.CachingBus, self).__init__(*args, **kwargs) + def get_object(self, busname, objectpath): try: return self.object_cache[(busname, objectpath)] @@ -782,7 +793,7 @@ new_object = super( dbussy_adapter.CachingBus, self).get_object(busname, objectpath) - self.object_cache[(busname, objectpath)] = new_object + self.object_cache[(busname, objectpath)] = new_object return new_object @@ -824,6 +835,7 @@ class Base: """Abstract base class for commands""" + def run(self, clients, bus=None): """Normal commands should implement run_on_one_client(), but commands which want to operate on all clients at the same time can @@ -833,7 +845,6 @@ for client, properties in clients.items(): self.run_on_one_client(client, properties) - class IsEnabled(Base): def run(self, clients, bus=None): properties = next(iter(clients.values())) @@ -841,23 +852,19 @@ sys.exit(0) sys.exit(1) - class Approve(Base): def run_on_one_client(self, client, properties): self.bus.call_client_method(client, "Approve", True) - class Deny(Base): def run_on_one_client(self, client, properties): self.bus.call_client_method(client, "Approve", False) - class Remove(Base): def run(self, clients, bus): for clientpath in frozenset(clients.keys()): bus.call_server_method("RemoveClient", clientpath) - class Output(Base): """Abstract class for commands outputting client details""" all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK", @@ -869,7 +876,6 @@ "Checker", "ExtendedTimeout", "Expires", "LastCheckerStatus") - class DumpJSON(Output): def run(self, clients, bus=None): data = {properties["Name"]: @@ -878,7 +884,6 @@ for properties in clients.values()} print(json.dumps(data, indent=4, separators=(",", ": "))) - class PrintTable(Output): def __init__(self, verbose=False): self.verbose = verbose @@ -924,6 +929,7 @@ if sys.version_info.major == 2: __unicode__ = __str__ + def __str__(self): return str(self).encode( locale.getpreferredencoding()) @@ -975,7 +981,6 @@ minutes=(td.seconds % 3600) // 60, seconds=td.seconds % 60)) - class PropertySetter(Base): "Abstract class for Actions for setting one client property" @@ -988,45 +993,38 @@ def propname(self): raise NotImplementedError() - class Enable(PropertySetter): propname = "Enabled" value_to_set = True - class Disable(PropertySetter): propname = "Enabled" value_to_set = False - class BumpTimeout(PropertySetter): propname = "LastCheckedOK" value_to_set = "" - class StartChecker(PropertySetter): propname = "CheckerRunning" value_to_set = True - class StopChecker(PropertySetter): propname = "CheckerRunning" value_to_set = False - class ApproveByDefault(PropertySetter): propname = "ApprovedByDefault" value_to_set = True - class DenyByDefault(PropertySetter): propname = "ApprovedByDefault" value_to_set = False - class PropertySetterValue(PropertySetter): """Abstract class for PropertySetter recieving a value as constructor argument instead of a class attribute.""" + def __init__(self, value): self.value_to_set = value @@ -1039,11 +1037,9 @@ class SetChecker(PropertySetterValue): propname = "Checker" - class SetHost(PropertySetterValue): propname = "Host" - class SetSecret(PropertySetterValue): propname = "Secret" @@ -1057,7 +1053,6 @@ self._vts = value.read() value.close() - class PropertySetterValueMilliseconds(PropertySetterValue): """Abstract class for PropertySetterValue taking a value argument as a datetime.timedelta() but should store it as @@ -1072,23 +1067,18 @@ "When setting, convert value from a datetime.timedelta" self._vts = int(round(value.total_seconds() * 1000)) - class SetTimeout(PropertySetterValueMilliseconds): propname = "Timeout" - class SetExtendedTimeout(PropertySetterValueMilliseconds): propname = "ExtendedTimeout" - class SetInterval(PropertySetterValueMilliseconds): propname = "Interval" - class SetApprovalDelay(PropertySetterValueMilliseconds): propname = "ApprovalDelay" - class SetApprovalDuration(PropertySetterValueMilliseconds): propname = "ApprovalDuration" @@ -1765,8 +1755,6 @@ self.assertIs(ret, expected_method_return) def test_call_method_handles_exception(self): - dbus_logger = logging.getLogger("dbus.proxies") - def func(): raise gi.repository.GLib.Error()