=== modified file 'Makefile' --- Makefile 2019-02-11 06:31:42 +0000 +++ Makefile 2019-04-09 19:33:36 +0000 @@ -425,6 +425,8 @@ $(INITRAMFSTOOLS)/hooks/mandos install --mode=u=rw,go=r initramfs-tools-conf \ $(INITRAMFSTOOLS)/conf.d/mandos-conf + install --mode=u=rw,go=r initramfs-tools-conf-hook \ + $(INITRAMFSTOOLS)/conf-hooks.d/zz-mandos install initramfs-tools-script \ $(INITRAMFSTOOLS)/scripts/init-premount/mandos install initramfs-tools-script-stop \ === modified file 'TODO' --- TODO 2019-02-13 08:47:02 +0000 +++ TODO 2019-04-09 19:41:53 +0000 @@ -86,7 +86,6 @@ *** TODO [#C] In Python 3.3, use shlex.quote() instead of re.escape() * mandos-ctl -*** Handle "no D-Bus server" and/or "no Mandos server found" better ** TODO Remove old string_to_delta format :2: * TODO mandos-dispatch === modified file 'debian/mandos-client.dirs' --- debian/mandos-client.dirs 2018-08-19 14:32:00 +0000 +++ debian/mandos-client.dirs 2019-04-09 19:33:36 +0000 @@ -2,5 +2,6 @@ usr/sbin usr/share/initramfs-tools/hooks usr/share/initramfs-tools/conf.d +usr/share/initramfs-tools/conf-hooks.d usr/share/initramfs-tools/scripts/init-premount usr/share/initramfs-tools/scripts/local-premount === added file 'initramfs-tools-conf-hook' --- initramfs-tools-conf-hook 1970-01-01 00:00:00 +0000 +++ initramfs-tools-conf-hook 2019-04-09 19:33:36 +0000 @@ -0,0 +1,14 @@ +# -*- shell-script -*- + +# The UMASK is set by the file "initramfs-tools-conf" (which is copied +# to /usr/share/initramfs-tools/conf.d/mandos-conf on installation) +# since there, as described therein, is the proper place to do that. +# However, it is possible for other packages to override the UMASK in +# any file in /usr/share/initramfs-tools/conf-hooks.d. Therefore, +# this file ("initramfs-tools-conf-hook") will be installed as +# "zz-mandos" in that directory to make sure UMASK is set correctly. + +# For more information on the effects of setting UMASK, see the +# aforementioned /usr/share/initramfs-tools/conf.d/mandos-conf file. + +UMASK=0027 === modified file 'intro.xml' --- intro.xml 2019-02-10 04:20:26 +0000 +++ intro.xml 2019-03-30 17:02:33 +0000 @@ -1,7 +1,7 @@ + %common; ]> @@ -144,7 +144,7 @@ long, and will no longer give out the encrypted key. The timing here is the only real weak point, and the method, frequency and timeout of the server’s checking can be adjusted to any desired - level of paranoia + level of paranoia. (The encrypted keys on the Mandos server is on its normal file === modified file 'mandos' --- mandos 2019-02-11 06:31:42 +0000 +++ mandos 2019-03-23 14:59:54 +0000 @@ -275,9 +275,8 @@ # Pretend that we have an Avahi module -class Avahi(object): - """This isn't so much a class as it is a module-like namespace. - It is instantiated once, and simulates having an Avahi module.""" +class avahi(object): + """This isn't so much a class as it is a module-like namespace.""" IF_UNSPEC = -1 # avahi-common/address.h PROTO_UNSPEC = -1 # avahi-common/address.h PROTO_INET = 0 # avahi-common/address.h @@ -287,7 +286,8 @@ DBUS_INTERFACE_SERVER = DBUS_NAME + ".Server" DBUS_PATH_SERVER = "/" - def string_array_to_txt_array(self, t): + @staticmethod + def string_array_to_txt_array(t): return dbus.Array((dbus.ByteArray(s.encode("utf-8")) for s in t), signature="ay") ENTRY_GROUP_ESTABLISHED = 2 # avahi-common/defs.h @@ -298,7 +298,6 @@ SERVER_RUNNING = 2 # avahi-common/defs.h SERVER_COLLISION = 3 # avahi-common/defs.h SERVER_FAILURE = 4 # avahi-common/defs.h -avahi = Avahi() class AvahiError(Exception): @@ -504,24 +503,14 @@ # Pretend that we have a GnuTLS module -class GnuTLS(object): - """This isn't so much a class as it is a module-like namespace. - It is instantiated once, and simulates having a GnuTLS module.""" +class gnutls(object): + """This isn't so much a class as it is a module-like namespace.""" library = ctypes.util.find_library("gnutls") if library is None: library = ctypes.util.find_library("gnutls-deb0") _library = ctypes.cdll.LoadLibrary(library) del library - _need_version = b"3.3.0" - _tls_rawpk_version = b"3.6.6" - - def __init__(self): - # Need to use "self" here, since this method is called before - # the assignment to the "gnutls" global variable happens. - if self.check_version(self._need_version) is None: - raise self.Error("Needs GnuTLS {} or later" - .format(self._need_version)) # Unless otherwise indicated, the constants and types below are # all from the gnutls/gnutls.h C header file. @@ -569,18 +558,14 @@ # Exceptions class Error(Exception): - # We need to use the class name "GnuTLS" here, since this - # exception might be raised from within GnuTLS.__init__, - # which is called before the assignment to the "gnutls" - # global variable has happened. def __init__(self, message=None, code=None, args=()): # Default usage is by a message string, but if a return # code is passed, convert it to a string with # gnutls.strerror() self.code = code if message is None and code is not None: - message = GnuTLS.strerror(code) - return super(GnuTLS.Error, self).__init__( + message = gnutls.strerror(code) + return super(gnutls.Error, self).__init__( message, *args) class CertificateSecurityError(Error): @@ -744,6 +729,12 @@ check_version.argtypes = [ctypes.c_char_p] check_version.restype = ctypes.c_char_p + _need_version = b"3.3.0" + if check_version(_need_version) is None: + raise self.Error("Needs GnuTLS {} or later" + .format(_need_version)) + + _tls_rawpk_version = b"3.6.6" has_rawpk = bool(check_version(_tls_rawpk_version)) if has_rawpk: @@ -810,8 +801,6 @@ # Remove non-public functions del _error_code, _retry_on_error -# Create the global "gnutls" object, simulating a module -gnutls = GnuTLS() def call_pipe(connection, # : multiprocessing.Connection === modified file 'mandos-ctl' --- mandos-ctl 2019-02-11 06:31:42 +0000 +++ mandos-ctl 2019-04-09 13:50:57 +0000 @@ -1,5 +1,5 @@ #!/usr/bin/python -# -*- mode: python; coding: utf-8 -*- +# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*- # # Mandos Monitor - Control and monitor the Mandos server # @@ -40,58 +40,211 @@ import os import collections import json - -import dbus +import unittest +import logging +import io +import tempfile +import contextlib + +try: + import pydbus + import gi + dbus_python = None +except ImportError: + import dbus as dbus_python + pydbus = None + class gi(object): + """Dummy gi module, for the tests""" + class repository(object): + class GLib(object): + class Error(Exception): + pass + +# Show warnings by default +if not sys.warnoptions: + import warnings + warnings.simplefilter("default") + +log = logging.getLogger(sys.argv[0]) +logging.basicConfig(level="INFO", # Show info level messages + format="%(message)s") # Show basic log messages + +logging.captureWarnings(True) # Show warnings via the logging system if sys.version_info.major == 2: str = unicode + import StringIO + io.StringIO = StringIO.StringIO locale.setlocale(locale.LC_ALL, "") -tablewords = { - "Name": "Name", - "Enabled": "Enabled", - "Timeout": "Timeout", - "LastCheckedOK": "Last Successful Check", - "LastApprovalRequest": "Last Approval Request", - "Created": "Created", - "Interval": "Interval", - "Host": "Host", - "Fingerprint": "Fingerprint", - "KeyID": "Key ID", - "CheckerRunning": "Check Is Running", - "LastEnabled": "Last Enabled", - "ApprovalPending": "Approval Is Pending", - "ApprovedByDefault": "Approved By Default", - "ApprovalDelay": "Approval Delay", - "ApprovalDuration": "Approval Duration", - "Checker": "Checker", - "ExtendedTimeout": "Extended Timeout", - "Expires": "Expires", - "LastCheckerStatus": "Last Checker Status", -} -defaultkeywords = ("Name", "Enabled", "Timeout", "LastCheckedOK") -domain = "se.recompile" -busname = domain + ".Mandos" -server_path = "/" -server_interface = domain + ".Mandos" -client_interface = domain + ".Mandos.Client" version = "1.8.3" -try: - dbus.OBJECT_MANAGER_IFACE -except AttributeError: - dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager" - - -def milliseconds_to_string(ms): - td = datetime.timedelta(0, 0, 0, ms) - return ("{days}{hours:02}:{minutes:02}:{seconds:02}" - .format(days="{}T".format(td.days) if td.days else "", - hours=td.seconds // 3600, - minutes=(td.seconds % 3600) // 60, - seconds=td.seconds % 60)) +def main(): + parser = argparse.ArgumentParser() + add_command_line_options(parser) + + options = parser.parse_args() + check_option_syntax(parser, options) + + clientnames = options.client + + if options.debug: + log.setLevel(logging.DEBUG) + + if pydbus is not None: + bus = pydbus_adapter.CachingBus(pydbus) + else: + bus = dbus_python_adapter.CachingBus(dbus_python) + + try: + all_clients = bus.get_clients_and_properties() + except dbus.ConnectFailed as e: + log.critical("Could not connect to Mandos server: %s", e) + sys.exit(1) + except dbus.Error as e: + log.critical( + "Failed to access Mandos server through D-Bus:\n%s", e) + sys.exit(1) + + # Compile dict of (clientpath: properties) to process + if not clientnames: + clients = all_clients + else: + clients = {} + for name in clientnames: + for objpath, properties in all_clients.items(): + if properties["Name"] == name: + clients[objpath] = properties + break + else: + log.critical("Client not found on server: %r", name) + sys.exit(1) + + commands = commands_from_options(options) + + for command in commands: + command.run(clients, bus) + + +def add_command_line_options(parser): + parser.add_argument("--version", action="version", + version="%(prog)s {}".format(version), + help="show version number and exit") + parser.add_argument("-a", "--all", action="store_true", + help="Select all clients") + parser.add_argument("-v", "--verbose", action="store_true", + help="Print all fields") + parser.add_argument("-j", "--dump-json", dest="commands", + action="append_const", default=[], + const=command.DumpJSON(), + help="Dump client data in JSON format") + enable_disable = parser.add_mutually_exclusive_group() + enable_disable.add_argument("-e", "--enable", dest="commands", + action="append_const", default=[], + const=command.Enable(), + help="Enable client") + enable_disable.add_argument("-d", "--disable", dest="commands", + action="append_const", default=[], + const=command.Disable(), + help="disable client") + parser.add_argument("-b", "--bump-timeout", dest="commands", + action="append_const", default=[], + const=command.BumpTimeout(), + help="Bump timeout for client") + start_stop_checker = parser.add_mutually_exclusive_group() + start_stop_checker.add_argument("--start-checker", + dest="commands", + action="append_const", default=[], + const=command.StartChecker(), + help="Start checker for client") + start_stop_checker.add_argument("--stop-checker", dest="commands", + action="append_const", default=[], + const=command.StopChecker(), + help="Stop checker for client") + parser.add_argument("-V", "--is-enabled", dest="commands", + action="append_const", default=[], + const=command.IsEnabled(), + help="Check if client is enabled") + parser.add_argument("-r", "--remove", dest="commands", + action="append_const", default=[], + const=command.Remove(), + help="Remove client") + parser.add_argument("-c", "--checker", dest="commands", + action="append", default=[], + metavar="COMMAND", type=command.SetChecker, + help="Set checker command for client") + parser.add_argument( + "-t", "--timeout", dest="commands", action="append", + default=[], metavar="TIME", + type=command.SetTimeout.argparse(string_to_delta), + help="Set timeout for client") + parser.add_argument( + "--extended-timeout", dest="commands", action="append", + default=[], metavar="TIME", + type=command.SetExtendedTimeout.argparse(string_to_delta), + help="Set extended timeout for client") + parser.add_argument( + "-i", "--interval", dest="commands", action="append", + default=[], metavar="TIME", + type=command.SetInterval.argparse(string_to_delta), + help="Set checker interval for client") + approve_deny_default = parser.add_mutually_exclusive_group() + approve_deny_default.add_argument( + "--approve-by-default", dest="commands", + action="append_const", default=[], + const=command.ApproveByDefault(), + help="Set client to be approved by default") + approve_deny_default.add_argument( + "--deny-by-default", dest="commands", + action="append_const", default=[], + const=command.DenyByDefault(), + help="Set client to be denied by default") + parser.add_argument( + "--approval-delay", dest="commands", action="append", + default=[], metavar="TIME", + type=command.SetApprovalDelay.argparse(string_to_delta), + help="Set delay before client approve/deny") + parser.add_argument( + "--approval-duration", dest="commands", action="append", + default=[], metavar="TIME", + type=command.SetApprovalDuration.argparse(string_to_delta), + help="Set duration of one client approval") + parser.add_argument("-H", "--host", dest="commands", + action="append", default=[], metavar="STRING", + type=command.SetHost, + help="Set host for client") + parser.add_argument( + "-s", "--secret", dest="commands", action="append", + default=[], metavar="FILENAME", + type=command.SetSecret.argparse(argparse.FileType(mode="rb")), + help="Set password blob (file) for client") + approve_deny = parser.add_mutually_exclusive_group() + approve_deny.add_argument( + "-A", "--approve", dest="commands", action="append_const", + default=[], const=command.Approve(), + help="Approve any current client request") + approve_deny.add_argument("-D", "--deny", dest="commands", + action="append_const", default=[], + const=command.Deny(), + help="Deny any current client request") + parser.add_argument("--debug", action="store_true", + help="Debug mode (show D-Bus commands)") + parser.add_argument("--check", action="store_true", + help="Run self-test") + parser.add_argument("client", nargs="*", help="Client name") + + +def string_to_delta(interval): + """Parse a string and return a datetime.timedelta""" + + try: + return rfc3339_duration_to_delta(interval) + except ValueError as e: + log.warning("%s - Parsing as pre-1.6.1 interval instead", + ' '.join(e.args)) + return parse_pre_1_6_1_interval(interval) def rfc3339_duration_to_delta(duration): @@ -103,6 +256,8 @@ datetime.timedelta(0, 60) >>> rfc3339_duration_to_delta("PT60M") datetime.timedelta(0, 3600) + >>> rfc3339_duration_to_delta("P60M") + datetime.timedelta(1680) >>> rfc3339_duration_to_delta("PT24H") datetime.timedelta(1) >>> rfc3339_duration_to_delta("P1W") @@ -111,6 +266,35 @@ datetime.timedelta(0, 330) >>> rfc3339_duration_to_delta("P1DT3M20S") datetime.timedelta(1, 200) + >>> # Can not be empty: + >>> rfc3339_duration_to_delta("") + Traceback (most recent call last): + ... + ValueError: Invalid RFC 3339 duration: "" + >>> # Must start with "P": + >>> rfc3339_duration_to_delta("1D") + Traceback (most recent call last): + ... + ValueError: Invalid RFC 3339 duration: "1D" + >>> # Must use correct order + >>> rfc3339_duration_to_delta("PT1S2M") + Traceback (most recent call last): + ... + ValueError: Invalid RFC 3339 duration: "PT1S2M" + >>> # Time needs time marker + >>> rfc3339_duration_to_delta("P1H2S") + Traceback (most recent call last): + ... + ValueError: Invalid RFC 3339 duration: "P1H2S" + >>> # Weeks can not be combined with anything else + >>> rfc3339_duration_to_delta("P1D2W") + Traceback (most recent call last): + ... + ValueError: Invalid RFC 3339 duration: "P1D2W" + >>> rfc3339_duration_to_delta("P2W2H") + Traceback (most recent call last): + ... + ValueError: Invalid RFC 3339 duration: "P2W2H" """ # Parsing an RFC 3339 duration with regular expressions is not @@ -187,34 +371,36 @@ break else: # No currently valid tokens were found - raise ValueError("Invalid RFC 3339 duration: {!r}" + raise ValueError("Invalid RFC 3339 duration: \"{}\"" .format(duration)) # End token found return value -def string_to_delta(interval): - """Parse a string and return a datetime.timedelta +def parse_pre_1_6_1_interval(interval): + """Parse an interval string as documented by Mandos before 1.6.1, + and return a datetime.timedelta - >>> string_to_delta('7d') + >>> parse_pre_1_6_1_interval('7d') datetime.timedelta(7) - >>> string_to_delta('60s') + >>> parse_pre_1_6_1_interval('60s') datetime.timedelta(0, 60) - >>> string_to_delta('60m') + >>> parse_pre_1_6_1_interval('60m') datetime.timedelta(0, 3600) - >>> string_to_delta('24h') + >>> parse_pre_1_6_1_interval('24h') datetime.timedelta(1) - >>> string_to_delta('1w') + >>> parse_pre_1_6_1_interval('1w') datetime.timedelta(7) - >>> string_to_delta('5m 30s') + >>> parse_pre_1_6_1_interval('5m 30s') datetime.timedelta(0, 330) + >>> parse_pre_1_6_1_interval('') + datetime.timedelta(0) + >>> # Ignore unknown characters, allow any order and repetitions + >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m') + datetime.timedelta(2, 480, 18000) + """ - try: - return rfc3339_duration_to_delta(interval) - except ValueError: - pass - value = datetime.timedelta(0) regexp = re.compile(r"(\d+)([dsmhw]?)") @@ -234,177 +420,339 @@ return value -def print_clients(clients, keywords): - def valuetostring(value, keyword): - if type(value) is dbus.Boolean: - return "Yes" if value else "No" - if keyword in ("Timeout", "Interval", "ApprovalDelay", - "ApprovalDuration", "ExtendedTimeout"): - return milliseconds_to_string(value) - return str(value) - - # Create format string to print table rows - format_string = " ".join("{{{key}:{width}}}".format( - width=max(len(tablewords[key]), - max(len(valuetostring(client[key], key)) - for client in clients)), - key=key) - for key in keywords) - # Print header line - print(format_string.format(**tablewords)) - for client in clients: - print(format_string - .format(**{key: valuetostring(client[key], key) - for key in keywords})) - - -def has_actions(options): - return any((options.enable, - options.disable, - options.bump_timeout, - options.start_checker, - options.stop_checker, - options.is_enabled, - options.remove, - options.checker is not None, - options.timeout is not None, - options.extended_timeout is not None, - options.interval is not None, - options.approved_by_default is not None, - options.approval_delay is not None, - options.approval_duration is not None, - options.host is not None, - options.secret is not None, - options.approve, - options.deny)) - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--version", action="version", - version="%(prog)s {}".format(version), - help="show version number and exit") - parser.add_argument("-a", "--all", action="store_true", - help="Select all clients") - parser.add_argument("-v", "--verbose", action="store_true", - help="Print all fields") - parser.add_argument("-j", "--dump-json", action="store_true", - help="Dump client data in JSON format") - parser.add_argument("-e", "--enable", action="store_true", - help="Enable client") - parser.add_argument("-d", "--disable", action="store_true", - help="disable client") - parser.add_argument("-b", "--bump-timeout", action="store_true", - help="Bump timeout for client") - parser.add_argument("--start-checker", action="store_true", - help="Start checker for client") - parser.add_argument("--stop-checker", action="store_true", - help="Stop checker for client") - parser.add_argument("-V", "--is-enabled", action="store_true", - help="Check if client is enabled") - parser.add_argument("-r", "--remove", action="store_true", - help="Remove client") - parser.add_argument("-c", "--checker", - help="Set checker command for client") - parser.add_argument("-t", "--timeout", - help="Set timeout for client") - parser.add_argument("--extended-timeout", - help="Set extended timeout for client") - parser.add_argument("-i", "--interval", - help="Set checker interval for client") - parser.add_argument("--approve-by-default", action="store_true", - default=None, dest="approved_by_default", - help="Set client to be approved by default") - parser.add_argument("--deny-by-default", action="store_false", - dest="approved_by_default", - help="Set client to be denied by default") - parser.add_argument("--approval-delay", - help="Set delay before client approve/deny") - parser.add_argument("--approval-duration", - help="Set duration of one client approval") - parser.add_argument("-H", "--host", help="Set host for client") - parser.add_argument("-s", "--secret", - type=argparse.FileType(mode="rb"), - help="Set password blob (file) for client") - parser.add_argument("-A", "--approve", action="store_true", - help="Approve any current client request") - parser.add_argument("-D", "--deny", action="store_true", - help="Deny any current client request") - parser.add_argument("--check", action="store_true", - help="Run self-test") - parser.add_argument("client", nargs="*", help="Client name") - options = parser.parse_args() - - if has_actions(options) and not (options.client or options.all): +def check_option_syntax(parser, options): + """Apply additional restrictions on options, not expressible in +argparse""" + + def has_commands(options, commands=None): + if commands is None: + commands = (command.Enable, + command.Disable, + command.BumpTimeout, + command.StartChecker, + command.StopChecker, + command.IsEnabled, + command.Remove, + command.SetChecker, + command.SetTimeout, + command.SetExtendedTimeout, + command.SetInterval, + command.ApproveByDefault, + command.DenyByDefault, + command.SetApprovalDelay, + command.SetApprovalDuration, + command.SetHost, + command.SetSecret, + command.Approve, + command.Deny) + return any(isinstance(cmd, commands) + for cmd in options.commands) + + if has_commands(options) and not (options.client or options.all): parser.error("Options require clients names or --all.") - if options.verbose and has_actions(options): + if options.verbose and has_commands(options): parser.error("--verbose can only be used alone.") - if options.dump_json and (options.verbose - or has_actions(options)): + if (has_commands(options, (command.DumpJSON,)) + and (options.verbose or len(options.commands) > 1)): parser.error("--dump-json can only be used alone.") - if options.all and not has_actions(options): + if options.all and not has_commands(options): parser.error("--all requires an action.") - - if options.check: - import doctest - fail_count, test_count = doctest.testmod() - sys.exit(os.EX_OK if fail_count == 0 else 1) - - try: - bus = dbus.SystemBus() - mandos_dbus_objc = bus.get_object(busname, server_path) - except dbus.exceptions.DBusException: - print("Could not connect to Mandos server", file=sys.stderr) - sys.exit(1) - - mandos_serv = dbus.Interface(mandos_dbus_objc, - dbus_interface=server_interface) - mandos_serv_object_manager = dbus.Interface( - mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE) - - # block stderr since dbus library prints to stderr - null = os.open(os.path.devnull, os.O_RDWR) - stderrcopy = os.dup(sys.stderr.fileno()) - os.dup2(null, sys.stderr.fileno()) - os.close(null) - try: - try: - mandos_clients = {path: ifs_and_props[client_interface] - for path, ifs_and_props in - mandos_serv_object_manager - .GetManagedObjects().items() - if client_interface in ifs_and_props} - finally: - # restore stderr - os.dup2(stderrcopy, sys.stderr.fileno()) - os.close(stderrcopy) - except dbus.exceptions.DBusException as e: - print("Access denied: " - "Accessing mandos server through D-Bus: {}".format(e), - file=sys.stderr) - sys.exit(1) - - # Compile dict of (clients: properties) to process - clients = {} - - if options.all or not options.client: - clients = {bus.get_object(busname, path): properties - for path, properties in mandos_clients.items()} - else: - for name in options.client: - for path, client in mandos_clients.items(): - if client["Name"] == name: - client_objc = bus.get_object(busname, path) - clients[client_objc] = client - break - else: - print("Client not found on server: {!r}" - .format(name), file=sys.stderr) - sys.exit(1) - - if not has_actions(options) and clients: - if options.verbose or options.dump_json: - keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK", + if (has_commands(options, (command.IsEnabled,)) + and len(options.client) > 1): + parser.error("--is-enabled requires exactly one client") + if (len(options.commands) > 1 + and has_commands(options, (command.Remove,)) + and not has_commands(options, (command.Deny,))): + parser.error("--remove can only be combined with --deny") + + +class dbus(object): + + class SystemBus(object): + + object_manager_iface = "org.freedesktop.DBus.ObjectManager" + def get_managed_objects(self, busname, objectpath): + return self.call_method("GetManagedObjects", busname, + objectpath, + self.object_manager_iface) + + properties_iface = "org.freedesktop.DBus.Properties" + def set_property(self, busname, objectpath, interface, key, + value): + self.call_method("Set", busname, objectpath, + self.properties_iface, interface, key, + value) + + + class MandosBus(SystemBus): + busname_domain = "se.recompile" + busname = busname_domain + ".Mandos" + server_path = "/" + server_interface = busname_domain + ".Mandos" + client_interface = busname_domain + ".Mandos.Client" + del busname_domain + + def get_clients_and_properties(self): + managed_objects = self.get_managed_objects( + self.busname, self.server_path) + return {objpath: properties[self.client_interface] + for objpath, properties in managed_objects.items() + if self.client_interface in properties} + + def set_client_property(self, objectpath, key, value): + return self.set_property(self.busname, objectpath, + self.client_interface, key, + value) + + def call_client_method(self, objectpath, method, *args): + return self.call_method(method, self.busname, objectpath, + self.client_interface, *args) + + def call_server_method(self, method, *args): + return self.call_method(method, self.busname, + self.server_path, + self.server_interface, *args) + + class Error(Exception): + pass + + class ConnectFailed(Error): + pass + + +class dbus_python_adapter(object): + + class SystemBus(dbus.MandosBus): + """Use dbus-python""" + + def __init__(self, module=dbus_python): + self.dbus_python = module + self.bus = self.dbus_python.SystemBus() + + @contextlib.contextmanager + def convert_exception(self, exception_class=dbus.Error): + try: + yield + except self.dbus_python.exceptions.DBusException as e: + # This does what "raise from" would do + exc = exception_class(*e.args) + exc.__cause__ = e + raise exc + + def call_method(self, methodname, busname, objectpath, + interface, *args): + proxy_object = self.get_object(busname, objectpath) + log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath, + interface, methodname, + ", ".join(repr(a) for a in args)) + method = getattr(proxy_object, methodname) + with self.convert_exception(): + with dbus_python_adapter.SilenceLogger( + "dbus.proxies"): + value = method(*args, dbus_interface=interface) + return self.type_filter(value) + + def get_object(self, busname, objectpath): + log.debug("D-Bus: Connect to: (busname=%r, path=%r)", + busname, objectpath) + with self.convert_exception(dbus.ConnectFailed): + return self.bus.get_object(busname, objectpath) + + def type_filter(self, value): + """Convert the most bothersome types to Python types""" + if isinstance(value, self.dbus_python.Boolean): + return bool(value) + if isinstance(value, self.dbus_python.ObjectPath): + return str(value) + # Also recurse into dictionaries + if isinstance(value, self.dbus_python.Dictionary): + return {self.type_filter(key): + self.type_filter(subval) + for key, subval in value.items()} + return value + + def set_client_property(self, objectpath, key, value): + if key == "Secret": + if not isinstance(value, bytes): + value = value.encode("utf-8") + value = self.dbus_python.ByteArray(value) + return self.set_property(self.busname, objectpath, + self.client_interface, key, + value) + + class SilenceLogger(object): + "Simple context manager to silence a particular logger" + def __init__(self, loggername): + self.logger = logging.getLogger(loggername) + + def __enter__(self): + self.logger.addFilter(self.nullfilter) + + class NullFilter(logging.Filter): + def filter(self, record): + return False + + nullfilter = NullFilter() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.logger.removeFilter(self.nullfilter) + + + class CachingBus(SystemBus): + """A caching layer for dbus_python_adapter.SystemBus""" + def __init__(self, *args, **kwargs): + self.object_cache = {} + super(dbus_python_adapter.CachingBus, + self).__init__(*args, **kwargs) + def get_object(self, busname, objectpath): + try: + return self.object_cache[(busname, objectpath)] + except KeyError: + new_object = super( + dbus_python_adapter.CachingBus, + self).get_object(busname, objectpath) + self.object_cache[(busname, objectpath)] = new_object + return new_object + + +class pydbus_adapter(object): + class SystemBus(dbus.MandosBus): + def __init__(self, module=pydbus): + self.pydbus = module + self.bus = self.pydbus.SystemBus() + + @contextlib.contextmanager + def convert_exception(self, exception_class=dbus.Error): + try: + yield + except gi.repository.GLib.Error as e: + # This does what "raise from" would do + exc = exception_class(*e.args) + exc.__cause__ = e + raise exc + + def call_method(self, methodname, busname, objectpath, + interface, *args): + proxy_object = self.get(busname, objectpath) + log.debug("D-Bus: %s:%s:%s.%s(%s)", busname, objectpath, + interface, methodname, + ", ".join(repr(a) for a in args)) + method = getattr(proxy_object[interface], methodname) + with self.convert_exception(): + return method(*args) + + def get(self, busname, objectpath): + log.debug("D-Bus: Connect to: (busname=%r, path=%r)", + busname, objectpath) + with self.convert_exception(dbus.ConnectFailed): + if sys.version_info.major <= 2: + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", "", DeprecationWarning, + r"^xml\.etree\.ElementTree$") + return self.bus.get(busname, objectpath) + else: + return self.bus.get(busname, objectpath) + + def set_property(self, busname, objectpath, interface, key, + value): + proxy_object = self.get(busname, objectpath) + log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname, + objectpath, self.properties_iface, interface, + key, value) + setattr(proxy_object[interface], key, value) + + class CachingBus(SystemBus): + """A caching layer for pydbus_adapter.SystemBus""" + def __init__(self, *args, **kwargs): + self.object_cache = {} + super(pydbus_adapter.CachingBus, + self).__init__(*args, **kwargs) + def get(self, busname, objectpath): + try: + return self.object_cache[(busname, objectpath)] + except KeyError: + new_object = (super(pydbus_adapter.CachingBus, self) + .get(busname, objectpath)) + self.object_cache[(busname, objectpath)] = new_object + return new_object + + +def commands_from_options(options): + + commands = list(options.commands) + + def find_cmd(cmd, commands): + i = 0 + for i, c in enumerate(commands): + if isinstance(c, cmd): + return i + return i+1 + + # If command.Remove is present, move any instances of command.Deny + # to occur ahead of command.Remove. + index_of_remove = find_cmd(command.Remove, commands) + before_remove = commands[:index_of_remove] + after_remove = commands[index_of_remove:] + cleaned_after = [] + for cmd in after_remove: + if isinstance(cmd, command.Deny): + before_remove.append(cmd) + else: + cleaned_after.append(cmd) + if cleaned_after != after_remove: + commands = before_remove + cleaned_after + + # If no command option has been given, show table of clients, + # optionally verbosely + if not commands: + commands.append(command.PrintTable(verbose=options.verbose)) + + return commands + + +class command(object): + """A namespace for command classes""" + + class Base(object): + """Abstract base class for commands""" + def run(self, clients, bus=None): + """Normal commands should implement run_on_one_client(), +but commands which want to operate on all clients at the same time can +override this run() method instead. +""" + self.bus = bus + for client, properties in clients.items(): + self.run_on_one_client(client, properties) + + + class IsEnabled(Base): + def run(self, clients, bus=None): + properties = next(iter(clients.values())) + if properties["Enabled"]: + sys.exit(0) + sys.exit(1) + + + class Approve(Base): + def run_on_one_client(self, client, properties): + self.bus.call_client_method(client, "Approve", True) + + + class Deny(Base): + def run_on_one_client(self, client, properties): + self.bus.call_client_method(client, "Approve", False) + + + class Remove(Base): + def run(self, clients, bus): + for clientpath in frozenset(clients.keys()): + bus.call_server_method("RemoveClient", clientpath) + + + class Output(Base): + """Abstract class for commands outputting client details""" + all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK", "Created", "Interval", "Host", "KeyID", "Fingerprint", "CheckerRunning", "LastEnabled", "ApprovalPending", @@ -412,87 +760,1730 @@ "ApprovalDelay", "ApprovalDuration", "Checker", "ExtendedTimeout", "Expires", "LastCheckerStatus") - else: - keywords = defaultkeywords - - if options.dump_json: - json.dump({client["Name"]: {key: - bool(client[key]) - if isinstance(client[key], - dbus.Boolean) - else client[key] - for key in keywords} - for client in clients.values()}, - fp=sys.stdout, indent=4, - separators=(',', ': ')) - print() - else: - print_clients(clients.values(), keywords) - else: - # Process each client in the list by all selected options - for client in clients: - - def set_client_prop(prop, value): - """Set a Client D-Bus property""" - client.Set(client_interface, prop, value, - dbus_interface=dbus.PROPERTIES_IFACE) - - def set_client_prop_ms(prop, value): - """Set a Client D-Bus property, converted - from a string to milliseconds.""" - set_client_prop(prop, - string_to_delta(value).total_seconds() - * 1000) - - if options.remove: - mandos_serv.RemoveClient(client.__dbus_object_path__) - if options.enable: - set_client_prop("Enabled", dbus.Boolean(True)) - if options.disable: - set_client_prop("Enabled", dbus.Boolean(False)) - if options.bump_timeout: - set_client_prop("LastCheckedOK", "") - if options.start_checker: - set_client_prop("CheckerRunning", dbus.Boolean(True)) - if options.stop_checker: - set_client_prop("CheckerRunning", dbus.Boolean(False)) - if options.is_enabled: - if client.Get(client_interface, "Enabled", - dbus_interface=dbus.PROPERTIES_IFACE): - sys.exit(0) - else: - sys.exit(1) - if options.checker is not None: - set_client_prop("Checker", options.checker) - if options.host is not None: - set_client_prop("Host", options.host) - if options.interval is not None: - set_client_prop_ms("Interval", options.interval) - if options.approval_delay is not None: - set_client_prop_ms("ApprovalDelay", - options.approval_delay) - if options.approval_duration is not None: - set_client_prop_ms("ApprovalDuration", - options.approval_duration) - if options.timeout is not None: - set_client_prop_ms("Timeout", options.timeout) - if options.extended_timeout is not None: - set_client_prop_ms("ExtendedTimeout", - options.extended_timeout) - if options.secret is not None: - set_client_prop("Secret", - dbus.ByteArray(options.secret.read())) - if options.approved_by_default is not None: - set_client_prop("ApprovedByDefault", - dbus.Boolean(options - .approved_by_default)) - if options.approve: - client.Approve(dbus.Boolean(True), - dbus_interface=client_interface) - elif options.deny: - client.Approve(dbus.Boolean(False), - dbus_interface=client_interface) - + + + class DumpJSON(Output): + def run(self, clients, bus=None): + data = {properties["Name"]: + {key: properties[key] + for key in self.all_keywords} + for properties in clients.values()} + print(json.dumps(data, indent=4, separators=(',', ': '))) + + + class PrintTable(Output): + def __init__(self, verbose=False): + self.verbose = verbose + + def run(self, clients, bus=None): + default_keywords = ("Name", "Enabled", "Timeout", + "LastCheckedOK") + keywords = default_keywords + if self.verbose: + keywords = self.all_keywords + print(self.TableOfClients(clients.values(), keywords)) + + class TableOfClients(object): + tableheaders = { + "Name": "Name", + "Enabled": "Enabled", + "Timeout": "Timeout", + "LastCheckedOK": "Last Successful Check", + "LastApprovalRequest": "Last Approval Request", + "Created": "Created", + "Interval": "Interval", + "Host": "Host", + "Fingerprint": "Fingerprint", + "KeyID": "Key ID", + "CheckerRunning": "Check Is Running", + "LastEnabled": "Last Enabled", + "ApprovalPending": "Approval Is Pending", + "ApprovedByDefault": "Approved By Default", + "ApprovalDelay": "Approval Delay", + "ApprovalDuration": "Approval Duration", + "Checker": "Checker", + "ExtendedTimeout": "Extended Timeout", + "Expires": "Expires", + "LastCheckerStatus": "Last Checker Status", + } + + def __init__(self, clients, keywords): + self.clients = clients + self.keywords = keywords + + def __str__(self): + return "\n".join(self.rows()) + + if sys.version_info.major == 2: + __unicode__ = __str__ + def __str__(self): + return str(self).encode( + locale.getpreferredencoding()) + + def rows(self): + format_string = self.row_formatting_string() + rows = [self.header_line(format_string)] + rows.extend(self.client_line(client, format_string) + for client in self.clients) + return rows + + def row_formatting_string(self): + "Format string used to format table rows" + return " ".join("{{{key}:{width}}}".format( + width=max(len(self.tableheaders[key]), + *(len(self.string_from_client(client, + key)) + for client in self.clients)), + key=key) + for key in self.keywords) + + def string_from_client(self, client, key): + return self.valuetostring(client[key], key) + + @classmethod + def valuetostring(cls, value, keyword): + if isinstance(value, bool): + return "Yes" if value else "No" + if keyword in ("Timeout", "Interval", "ApprovalDelay", + "ApprovalDuration", "ExtendedTimeout"): + return cls.milliseconds_to_string(value) + return str(value) + + def header_line(self, format_string): + return format_string.format(**self.tableheaders) + + def client_line(self, client, format_string): + return format_string.format( + **{key: self.string_from_client(client, key) + for key in self.keywords}) + + @staticmethod + def milliseconds_to_string(ms): + td = datetime.timedelta(0, 0, 0, ms) + return ("{days}{hours:02}:{minutes:02}:{seconds:02}" + .format(days="{}T".format(td.days) + if td.days else "", + hours=td.seconds // 3600, + minutes=(td.seconds % 3600) // 60, + seconds=td.seconds % 60)) + + + class PropertySetter(Base): + "Abstract class for Actions for setting one client property" + + def run_on_one_client(self, client, properties=None): + """Set the Client's D-Bus property""" + self.bus.set_client_property(client, self.propname, + self.value_to_set) + + @property + def propname(self): + raise NotImplementedError() + + + class Enable(PropertySetter): + propname = "Enabled" + value_to_set = True + + + class Disable(PropertySetter): + propname = "Enabled" + value_to_set = False + + + class BumpTimeout(PropertySetter): + propname = "LastCheckedOK" + value_to_set = "" + + + class StartChecker(PropertySetter): + propname = "CheckerRunning" + value_to_set = True + + + class StopChecker(PropertySetter): + propname = "CheckerRunning" + value_to_set = False + + + class ApproveByDefault(PropertySetter): + propname = "ApprovedByDefault" + value_to_set = True + + + class DenyByDefault(PropertySetter): + propname = "ApprovedByDefault" + value_to_set = False + + + class PropertySetterValue(PropertySetter): + """Abstract class for PropertySetter recieving a value as +constructor argument instead of a class attribute.""" + def __init__(self, value): + self.value_to_set = value + + @classmethod + def argparse(cls, argtype): + def cmdtype(arg): + return cls(argtype(arg)) + return cmdtype + + class SetChecker(PropertySetterValue): + propname = "Checker" + + + class SetHost(PropertySetterValue): + propname = "Host" + + + class SetSecret(PropertySetterValue): + propname = "Secret" + + @property + def value_to_set(self): + return self._vts + + @value_to_set.setter + def value_to_set(self, value): + """When setting, read data from supplied file object""" + self._vts = value.read() + value.close() + + + class PropertySetterValueMilliseconds(PropertySetterValue): + """Abstract class for PropertySetterValue taking a value +argument as a datetime.timedelta() but should store it as +milliseconds.""" + + @property + def value_to_set(self): + return self._vts + + @value_to_set.setter + def value_to_set(self, value): + "When setting, convert value from a datetime.timedelta" + self._vts = int(round(value.total_seconds() * 1000)) + + + class SetTimeout(PropertySetterValueMilliseconds): + propname = "Timeout" + + + class SetExtendedTimeout(PropertySetterValueMilliseconds): + propname = "ExtendedTimeout" + + + class SetInterval(PropertySetterValueMilliseconds): + propname = "Interval" + + + class SetApprovalDelay(PropertySetterValueMilliseconds): + propname = "ApprovalDelay" + + + class SetApprovalDuration(PropertySetterValueMilliseconds): + propname = "ApprovalDuration" + + + +class TestCaseWithAssertLogs(unittest.TestCase): + """unittest.TestCase.assertLogs only exists in Python 3.4""" + + if not hasattr(unittest.TestCase, "assertLogs"): + @contextlib.contextmanager + def assertLogs(self, logger, level=logging.INFO): + capturing_handler = self.CapturingLevelHandler(level) + old_level = logger.level + old_propagate = logger.propagate + logger.addHandler(capturing_handler) + logger.setLevel(level) + logger.propagate = False + try: + yield capturing_handler.watcher + finally: + logger.propagate = old_propagate + logger.removeHandler(capturing_handler) + logger.setLevel(old_level) + self.assertGreater(len(capturing_handler.watcher.records), + 0) + + class CapturingLevelHandler(logging.Handler): + def __init__(self, level, *args, **kwargs): + logging.Handler.__init__(self, *args, **kwargs) + self.watcher = self.LoggingWatcher([], []) + def emit(self, record): + self.watcher.records.append(record) + self.watcher.output.append(self.format(record)) + + LoggingWatcher = collections.namedtuple("LoggingWatcher", + ("records", + "output")) + + +class Unique(object): + """Class for objects which exist only to be unique objects, since +unittest.mock.sentinel only exists in Python 3.3""" + + +class Test_string_to_delta(TestCaseWithAssertLogs): + # Just test basic RFC 3339 functionality here, the doc string for + # rfc3339_duration_to_delta() already has more comprehensive + # tests, which are run by doctest. + + def test_rfc3339_zero_seconds(self): + self.assertEqual(datetime.timedelta(), + string_to_delta("PT0S")) + + def test_rfc3339_zero_days(self): + self.assertEqual(datetime.timedelta(), string_to_delta("P0D")) + + def test_rfc3339_one_second(self): + self.assertEqual(datetime.timedelta(0, 1), + string_to_delta("PT1S")) + + def test_rfc3339_two_hours(self): + self.assertEqual(datetime.timedelta(0, 7200), + string_to_delta("PT2H")) + + def test_falls_back_to_pre_1_6_1_with_warning(self): + with self.assertLogs(log, logging.WARNING): + value = string_to_delta("2h") + self.assertEqual(datetime.timedelta(0, 7200), value) + + +class Test_check_option_syntax(unittest.TestCase): + def setUp(self): + self.parser = argparse.ArgumentParser() + add_command_line_options(self.parser) + + def test_actions_requires_client_or_all(self): + for action, value in self.actions.items(): + args = self.actionargs(action, value) + with self.assertParseError(): + self.parse_args(args) + + # This mostly corresponds to the definition from has_commands() in + # check_option_syntax() + actions = { + "--enable": None, + "--disable": None, + "--bump-timeout": None, + "--start-checker": None, + "--stop-checker": None, + "--is-enabled": None, + "--remove": None, + "--checker": "x", + "--timeout": "PT0S", + "--extended-timeout": "PT0S", + "--interval": "PT0S", + "--approve-by-default": None, + "--deny-by-default": None, + "--approval-delay": "PT0S", + "--approval-duration": "PT0S", + "--host": "hostname", + "--secret": "/dev/null", + "--approve": None, + "--deny": None, + } + + @staticmethod + def actionargs(action, value, *args): + if value is not None: + return [action, value] + list(args) + else: + return [action] + list(args) + + @contextlib.contextmanager + def assertParseError(self): + with self.assertRaises(SystemExit) as e: + with self.redirect_stderr_to_devnull(): + yield + # Exit code from argparse is guaranteed to be "2". Reference: + # https://docs.python.org/3/library + # /argparse.html#exiting-methods + self.assertEqual(2, e.exception.code) + + def parse_args(self, args): + options = self.parser.parse_args(args) + check_option_syntax(self.parser, options) + + @staticmethod + @contextlib.contextmanager + def redirect_stderr_to_devnull(): + old_stderr = sys.stderr + with contextlib.closing(open(os.devnull, "w")) as null: + sys.stderr = null + try: + yield + finally: + sys.stderr = old_stderr + + def check_option_syntax(self, options): + check_option_syntax(self.parser, options) + + def test_actions_all_conflicts_with_verbose(self): + for action, value in self.actions.items(): + args = self.actionargs(action, value, "--all", + "--verbose") + with self.assertParseError(): + self.parse_args(args) + + def test_actions_with_client_conflicts_with_verbose(self): + for action, value in self.actions.items(): + args = self.actionargs(action, value, "--verbose", + "client") + with self.assertParseError(): + self.parse_args(args) + + def test_dump_json_conflicts_with_verbose(self): + args = ["--dump-json", "--verbose"] + with self.assertParseError(): + self.parse_args(args) + + def test_dump_json_conflicts_with_action(self): + for action, value in self.actions.items(): + args = self.actionargs(action, value, "--dump-json") + with self.assertParseError(): + self.parse_args(args) + + def test_all_can_not_be_alone(self): + args = ["--all"] + with self.assertParseError(): + self.parse_args(args) + + def test_all_is_ok_with_any_action(self): + for action, value in self.actions.items(): + args = self.actionargs(action, value, "--all") + self.parse_args(args) + + def test_any_action_is_ok_with_one_client(self): + for action, value in self.actions.items(): + args = self.actionargs(action, value, "client") + self.parse_args(args) + + def test_one_client_with_all_actions_except_is_enabled(self): + for action, value in self.actions.items(): + if action == "--is-enabled": + continue + args = self.actionargs(action, value, "client") + self.parse_args(args) + + def test_two_clients_with_all_actions_except_is_enabled(self): + for action, value in self.actions.items(): + if action == "--is-enabled": + continue + args = self.actionargs(action, value, "client1", + "client2") + self.parse_args(args) + + def test_two_clients_are_ok_with_actions_except_is_enabled(self): + for action, value in self.actions.items(): + if action == "--is-enabled": + continue + args = self.actionargs(action, value, "client1", + "client2") + self.parse_args(args) + + def test_is_enabled_fails_without_client(self): + args = ["--is-enabled"] + with self.assertParseError(): + self.parse_args(args) + + def test_is_enabled_fails_with_two_clients(self): + args = ["--is-enabled", "client1", "client2"] + with self.assertParseError(): + self.parse_args(args) + + def test_remove_can_only_be_combined_with_action_deny(self): + for action, value in self.actions.items(): + if action in {"--remove", "--deny"}: + continue + args = self.actionargs(action, value, "--all", + "--remove") + with self.assertParseError(): + self.parse_args(args) + + +class Test_dbus_exceptions(unittest.TestCase): + + def test_dbus_ConnectFailed_is_Error(self): + with self.assertRaises(dbus.Error): + raise dbus.ConnectFailed() + + +class Test_dbus_MandosBus(unittest.TestCase): + + class MockMandosBus(dbus.MandosBus): + def __init__(self): + self._name = "se.recompile.Mandos" + self._server_path = "/" + self._server_interface = "se.recompile.Mandos" + self._client_interface = "se.recompile.Mandos.Client" + self.calls = [] + self.call_method_return = Unique() + + def call_method(self, methodname, busname, objectpath, + interface, *args): + self.calls.append((methodname, busname, objectpath, + interface, args)) + return self.call_method_return + + def setUp(self): + self.bus = self.MockMandosBus() + + def test_set_client_property(self): + self.bus.set_client_property("objectpath", "key", "value") + expected_call = ("Set", self.bus._name, "objectpath", + "org.freedesktop.DBus.Properties", + (self.bus._client_interface, "key", "value")) + self.assertIn(expected_call, self.bus.calls) + + def test_call_client_method(self): + ret = self.bus.call_client_method("objectpath", "methodname") + self.assertIs(self.bus.call_method_return, ret) + expected_call = ("methodname", self.bus._name, "objectpath", + self.bus._client_interface, ()) + self.assertIn(expected_call, self.bus.calls) + + def test_call_client_method_with_args(self): + args = (Unique(), Unique()) + ret = self.bus.call_client_method("objectpath", "methodname", + *args) + self.assertIs(self.bus.call_method_return, ret) + expected_call = ("methodname", self.bus._name, "objectpath", + self.bus._client_interface, + (args[0], args[1])) + self.assertIn(expected_call, self.bus.calls) + + def test_get_clients_and_properties(self): + managed_objects = { + "objectpath": { + self.bus._client_interface: { + "key": "value", + "bool": True, + }, + "irrelevant_interface": { + "key": "othervalue", + "bool": False, + }, + }, + "other_objectpath": { + "other_irrelevant_interface": { + "key": "value 3", + "bool": None, + }, + }, + } + expected_clients_and_properties = { + "objectpath": { + "key": "value", + "bool": True, + } + } + self.bus.call_method_return = managed_objects + ret = self.bus.get_clients_and_properties() + self.assertDictEqual(expected_clients_and_properties, ret) + expected_call = ("GetManagedObjects", self.bus._name, + self.bus._server_path, + "org.freedesktop.DBus.ObjectManager", ()) + self.assertIn(expected_call, self.bus.calls) + + def test_call_server_method(self): + ret = self.bus.call_server_method("methodname") + self.assertIs(self.bus.call_method_return, ret) + expected_call = ("methodname", self.bus._name, + self.bus._server_path, + self.bus._server_interface, ()) + self.assertIn(expected_call, self.bus.calls) + + def test_call_server_method_with_args(self): + args = (Unique(), Unique()) + ret = self.bus.call_server_method("methodname", *args) + self.assertIs(self.bus.call_method_return, ret) + expected_call = ("methodname", self.bus._name, + self.bus._server_path, + self.bus._server_interface, + (args[0], args[1])) + self.assertIn(expected_call, self.bus.calls) + + +class Test_dbus_python_adapter_SystemBus(TestCaseWithAssertLogs): + + def MockDBusPython_func(self, func): + class mock_dbus_python(object): + """mock dbus-python module""" + class exceptions(object): + """Pseudo-namespace""" + class DBusException(Exception): + pass + class SystemBus(object): + @staticmethod + def get_object(busname, objectpath): + DBusObject = collections.namedtuple( + "DBusObject", ("methodname", "Set")) + def method(*args, **kwargs): + self.assertEqual({"dbus_interface": + "interface"}, + kwargs) + return func(*args) + def set_property(interface, key, value, + dbus_interface=None): + self.assertEqual( + "org.freedesktop.DBus.Properties", + dbus_interface) + self.assertEqual("Secret", key) + return func(interface, key, value, + dbus_interface=dbus_interface) + return DBusObject(methodname=method, + Set=set_property) + class Boolean(object): + def __init__(self, value): + self.value = bool(value) + def __bool__(self): + return self.value + if sys.version_info.major == 2: + __nonzero__ = __bool__ + class ObjectPath(str): + pass + class Dictionary(dict): + pass + class ByteArray(bytes): + pass + return mock_dbus_python + + def call_method(self, bus, methodname, busname, objectpath, + interface, *args): + with self.assertLogs(log, logging.DEBUG): + return bus.call_method(methodname, busname, objectpath, + interface, *args) + + def test_call_method_returns(self): + expected_method_return = Unique() + method_args = (Unique(), Unique()) + def func(*args): + self.assertEqual(len(method_args), len(args)) + for marg, arg in zip(method_args, args): + self.assertIs(marg, arg) + return expected_method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface", + *method_args) + self.assertIs(ret, expected_method_return) + + def test_call_method_filters_bool_true(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.Boolean(True) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + self.assertTrue(ret) + self.assertNotIsInstance(ret, mock_dbus_python.Boolean) + + def test_call_method_filters_bool_false(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.Boolean(False) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + self.assertFalse(ret) + self.assertNotIsInstance(ret, mock_dbus_python.Boolean) + + def test_call_method_filters_objectpath(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.ObjectPath("objectpath") + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + self.assertEqual("objectpath", ret) + self.assertIsNot("objectpath", ret) + self.assertNotIsInstance(ret, mock_dbus_python.ObjectPath) + + def test_call_method_filters_booleans_in_dict(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.Dictionary( + {mock_dbus_python.Boolean(True): + mock_dbus_python.Boolean(False), + mock_dbus_python.Boolean(False): + mock_dbus_python.Boolean(True)}) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + expected_method_return = {True: False, + False: True} + self.assertEqual(expected_method_return, ret) + self.assertNotIsInstance(ret, mock_dbus_python.Dictionary) + + def test_call_method_filters_objectpaths_in_dict(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.Dictionary( + {mock_dbus_python.ObjectPath("objectpath_key_1"): + mock_dbus_python.ObjectPath("objectpath_value_1"), + mock_dbus_python.ObjectPath("objectpath_key_2"): + mock_dbus_python.ObjectPath("objectpath_value_2")}) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + expected_method_return = {str(key): str(value) + for key, value in + method_return.items()} + self.assertEqual(expected_method_return, ret) + self.assertIsInstance(ret, dict) + self.assertNotIsInstance(ret, mock_dbus_python.Dictionary) + + def test_call_method_filters_dict_in_dict(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.Dictionary( + {"key1": mock_dbus_python.Dictionary({"key11": "value11", + "key12": "value12"}), + "key2": mock_dbus_python.Dictionary({"key21": "value21", + "key22": "value22"})}) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + expected_method_return = { + "key1": {"key11": "value11", + "key12": "value12"}, + "key2": {"key21": "value21", + "key22": "value22"}, + } + self.assertEqual(expected_method_return, ret) + self.assertIsInstance(ret, dict) + self.assertNotIsInstance(ret, mock_dbus_python.Dictionary) + for key, value in ret.items(): + self.assertIsInstance(value, dict) + self.assertEqual(expected_method_return[key], value) + self.assertNotIsInstance(value, + mock_dbus_python.Dictionary) + + def test_call_method_filters_dict_three_deep(self): + def func(): + return method_return + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + method_return = mock_dbus_python.Dictionary( + {"key1": + mock_dbus_python.Dictionary( + {"key2": + mock_dbus_python.Dictionary( + {"key3": + mock_dbus_python.Boolean(True), + }), + }), + }) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + expected_method_return = {"key1": {"key2": {"key3": True}}} + self.assertEqual(expected_method_return, ret) + self.assertIsInstance(ret, dict) + self.assertNotIsInstance(ret, mock_dbus_python.Dictionary) + self.assertIsInstance(ret["key1"], dict) + self.assertNotIsInstance(ret["key1"], + mock_dbus_python.Dictionary) + self.assertIsInstance(ret["key1"]["key2"], dict) + self.assertNotIsInstance(ret["key1"]["key2"], + mock_dbus_python.Dictionary) + self.assertTrue(ret["key1"]["key2"]["key3"]) + self.assertNotIsInstance(ret["key1"]["key2"]["key3"], + mock_dbus_python.Boolean) + + def test_call_method_handles_exception(self): + dbus_logger = logging.getLogger("dbus.proxies") + + def func(): + dbus_logger.error("Test") + raise mock_dbus_python.exceptions.DBusException() + + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + + class CountingHandler(logging.Handler): + count = 0 + def emit(self, record): + self.count += 1 + + counting_handler = CountingHandler() + + dbus_logger.addHandler(counting_handler) + + try: + with self.assertRaises(dbus.Error) as e: + self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + finally: + dbus_logger.removeFilter(counting_handler) + + self.assertNotIsInstance(e, dbus.ConnectFailed) + + # Make sure the dbus logger was suppressed + self.assertEqual(0, counting_handler.count) + + def test_Set_Secret_sends_bytearray(self): + ret = [None] + def func(*args, **kwargs): + ret[0] = (args, kwargs) + mock_dbus_python = self.MockDBusPython_func(func) + bus = dbus_python_adapter.SystemBus(mock_dbus_python) + bus.set_client_property("objectpath", "Secret", "value") + expected_call = (("se.recompile.Mandos.Client", "Secret", + mock_dbus_python.ByteArray(b"value")), + {"dbus_interface": + "org.freedesktop.DBus.Properties"}) + self.assertEqual(expected_call, ret[0]) + if sys.version_info.major == 2: + self.assertIsInstance(ret[0][0][-1], + mock_dbus_python.ByteArray) + + def test_get_object_converts_to_correct_exception(self): + bus = dbus_python_adapter.SystemBus( + self.fake_dbus_python_raises_exception_on_connect) + with self.assertRaises(dbus.ConnectFailed): + self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + + class fake_dbus_python_raises_exception_on_connect(object): + """fake dbus-python module""" + class exceptions(object): + """Pseudo-namespace""" + class DBusException(Exception): + pass + + @classmethod + def SystemBus(cls): + def get_object(busname, objectpath): + raise cls.exceptions.DBusException() + Bus = collections.namedtuple("Bus", ["get_object"]) + return Bus(get_object=get_object) + + +class Test_dbus_python_adapter_CachingBus(unittest.TestCase): + class mock_dbus_python(object): + """mock dbus-python modules""" + class SystemBus(object): + @staticmethod + def get_object(busname, objectpath): + return Unique() + + def setUp(self): + self.bus = dbus_python_adapter.CachingBus( + self.mock_dbus_python) + + def test_returns_distinct_objectpaths(self): + obj1 = self.bus.get_object("busname", "objectpath1") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get_object("busname", "objectpath2") + self.assertIsInstance(obj2, Unique) + self.assertIsNot(obj1, obj2) + + def test_returns_distinct_busnames(self): + obj1 = self.bus.get_object("busname1", "objectpath") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get_object("busname2", "objectpath") + self.assertIsInstance(obj2, Unique) + self.assertIsNot(obj1, obj2) + + def test_returns_distinct_both(self): + obj1 = self.bus.get_object("busname1", "objectpath") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get_object("busname2", "objectpath") + self.assertIsInstance(obj2, Unique) + self.assertIsNot(obj1, obj2) + + def test_returns_same(self): + obj1 = self.bus.get_object("busname", "objectpath") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get_object("busname", "objectpath") + self.assertIsInstance(obj2, Unique) + self.assertIs(obj1, obj2) + + def test_returns_same_old(self): + obj1 = self.bus.get_object("busname1", "objectpath1") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get_object("busname2", "objectpath2") + self.assertIsInstance(obj2, Unique) + obj1b = self.bus.get_object("busname1", "objectpath1") + self.assertIsInstance(obj1b, Unique) + self.assertIsNot(obj1, obj2) + self.assertIsNot(obj2, obj1b) + self.assertIs(obj1, obj1b) + + +class Test_pydbus_adapter_SystemBus(TestCaseWithAssertLogs): + + def Stub_pydbus_func(self, func): + class stub_pydbus(object): + """stub pydbus module""" + class SystemBus(object): + @staticmethod + def get(busname, objectpath): + DBusObject = collections.namedtuple( + "DBusObject", ("methodname",)) + return {"interface": + DBusObject(methodname=func)} + return stub_pydbus + + def call_method(self, bus, methodname, busname, objectpath, + interface, *args): + with self.assertLogs(log, logging.DEBUG): + return bus.call_method(methodname, busname, objectpath, + interface, *args) + + def test_call_method_returns(self): + expected_method_return = Unique() + method_args = (Unique(), Unique()) + def func(*args): + self.assertEqual(len(method_args), len(args)) + for marg, arg in zip(method_args, args): + self.assertIs(marg, arg) + return expected_method_return + stub_pydbus = self.Stub_pydbus_func(func) + bus = pydbus_adapter.SystemBus(stub_pydbus) + ret = self.call_method(bus, "methodname", "busname", + "objectpath", "interface", + *method_args) + self.assertIs(ret, expected_method_return) + + def test_call_method_handles_exception(self): + dbus_logger = logging.getLogger("dbus.proxies") + + def func(): + raise gi.repository.GLib.Error() + + stub_pydbus = self.Stub_pydbus_func(func) + bus = pydbus_adapter.SystemBus(stub_pydbus) + + with self.assertRaises(dbus.Error) as e: + self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + + self.assertNotIsInstance(e, dbus.ConnectFailed) + + def test_get_converts_to_correct_exception(self): + bus = pydbus_adapter.SystemBus( + self.fake_pydbus_raises_exception_on_connect) + with self.assertRaises(dbus.ConnectFailed): + self.call_method(bus, "methodname", "busname", + "objectpath", "interface") + + class fake_pydbus_raises_exception_on_connect(object): + """fake dbus-python module""" + @classmethod + def SystemBus(cls): + def get(busname, objectpath): + raise gi.repository.GLib.Error() + Bus = collections.namedtuple("Bus", ["get"]) + return Bus(get=get) + + def test_set_property_uses_setattr(self): + class Object(object): + pass + obj = Object() + class pydbus_spy(object): + class SystemBus(object): + @staticmethod + def get(busname, objectpath): + return {"interface": obj} + bus = pydbus_adapter.SystemBus(pydbus_spy) + value = Unique() + bus.set_property("busname", "objectpath", "interface", "key", + value) + self.assertIs(value, obj.key) + + def test_get_suppresses_xml_deprecation_warning(self): + if sys.version_info.major >= 3: + return + class stub_pydbus_get(object): + class SystemBus(object): + @staticmethod + def get(busname, objectpath): + warnings.warn_explicit( + "deprecated", DeprecationWarning, + "xml.etree.ElementTree", 0) + bus = pydbus_adapter.SystemBus(stub_pydbus_get) + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + bus.get("busname", "objectpath") + self.assertEqual(0, len(w)) + + +class Test_pydbus_adapter_CachingBus(unittest.TestCase): + class stub_pydbus(object): + """stub pydbus module""" + class SystemBus(object): + @staticmethod + def get(busname, objectpath): + return Unique() + + def setUp(self): + self.bus = pydbus_adapter.CachingBus(self.stub_pydbus) + + def test_returns_distinct_objectpaths(self): + obj1 = self.bus.get("busname", "objectpath1") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get("busname", "objectpath2") + self.assertIsInstance(obj2, Unique) + self.assertIsNot(obj1, obj2) + + def test_returns_distinct_busnames(self): + obj1 = self.bus.get("busname1", "objectpath") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get("busname2", "objectpath") + self.assertIsInstance(obj2, Unique) + self.assertIsNot(obj1, obj2) + + def test_returns_distinct_both(self): + obj1 = self.bus.get("busname1", "objectpath") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get("busname2", "objectpath") + self.assertIsInstance(obj2, Unique) + self.assertIsNot(obj1, obj2) + + def test_returns_same(self): + obj1 = self.bus.get("busname", "objectpath") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get("busname", "objectpath") + self.assertIsInstance(obj2, Unique) + self.assertIs(obj1, obj2) + + def test_returns_same_old(self): + obj1 = self.bus.get("busname1", "objectpath1") + self.assertIsInstance(obj1, Unique) + obj2 = self.bus.get("busname2", "objectpath2") + self.assertIsInstance(obj2, Unique) + obj1b = self.bus.get("busname1", "objectpath1") + self.assertIsInstance(obj1b, Unique) + self.assertIsNot(obj1, obj2) + self.assertIsNot(obj2, obj1b) + self.assertIs(obj1, obj1b) + + +class Test_commands_from_options(unittest.TestCase): + + def setUp(self): + self.parser = argparse.ArgumentParser() + add_command_line_options(self.parser) + + def test_is_enabled(self): + self.assert_command_from_args(["--is-enabled", "client"], + command.IsEnabled) + + def assert_command_from_args(self, args, command_cls, + **cmd_attrs): + """Assert that parsing ARGS should result in an instance of +COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS).""" + options = self.parser.parse_args(args) + check_option_syntax(self.parser, options) + commands = commands_from_options(options) + self.assertEqual(1, len(commands)) + command = commands[0] + self.assertIsInstance(command, command_cls) + for key, value in cmd_attrs.items(): + self.assertEqual(value, getattr(command, key)) + + def test_is_enabled_short(self): + self.assert_command_from_args(["-V", "client"], + command.IsEnabled) + + def test_approve(self): + self.assert_command_from_args(["--approve", "client"], + command.Approve) + + def test_approve_short(self): + self.assert_command_from_args(["-A", "client"], + command.Approve) + + def test_deny(self): + self.assert_command_from_args(["--deny", "client"], + command.Deny) + + def test_deny_short(self): + self.assert_command_from_args(["-D", "client"], command.Deny) + + def test_remove(self): + self.assert_command_from_args(["--remove", "client"], + command.Remove) + + def test_deny_before_remove(self): + options = self.parser.parse_args(["--deny", "--remove", + "client"]) + check_option_syntax(self.parser, options) + commands = commands_from_options(options) + self.assertEqual(2, len(commands)) + self.assertIsInstance(commands[0], command.Deny) + self.assertIsInstance(commands[1], command.Remove) + + def test_deny_before_remove_reversed(self): + options = self.parser.parse_args(["--remove", "--deny", + "--all"]) + check_option_syntax(self.parser, options) + commands = commands_from_options(options) + self.assertEqual(2, len(commands)) + self.assertIsInstance(commands[0], command.Deny) + self.assertIsInstance(commands[1], command.Remove) + + def test_remove_short(self): + self.assert_command_from_args(["-r", "client"], + command.Remove) + + def test_dump_json(self): + self.assert_command_from_args(["--dump-json"], + command.DumpJSON) + + def test_enable(self): + self.assert_command_from_args(["--enable", "client"], + command.Enable) + + def test_enable_short(self): + self.assert_command_from_args(["-e", "client"], + command.Enable) + + def test_disable(self): + self.assert_command_from_args(["--disable", "client"], + command.Disable) + + def test_disable_short(self): + self.assert_command_from_args(["-d", "client"], + command.Disable) + + def test_bump_timeout(self): + self.assert_command_from_args(["--bump-timeout", "client"], + command.BumpTimeout) + + def test_bump_timeout_short(self): + self.assert_command_from_args(["-b", "client"], + command.BumpTimeout) + + def test_start_checker(self): + self.assert_command_from_args(["--start-checker", "client"], + command.StartChecker) + + def test_stop_checker(self): + self.assert_command_from_args(["--stop-checker", "client"], + command.StopChecker) + + def test_approve_by_default(self): + self.assert_command_from_args(["--approve-by-default", + "client"], + command.ApproveByDefault) + + def test_deny_by_default(self): + self.assert_command_from_args(["--deny-by-default", "client"], + command.DenyByDefault) + + def test_checker(self): + self.assert_command_from_args(["--checker", ":", "client"], + command.SetChecker, + value_to_set=":") + + def test_checker_empty(self): + self.assert_command_from_args(["--checker", "", "client"], + command.SetChecker, + value_to_set="") + + def test_checker_short(self): + self.assert_command_from_args(["-c", ":", "client"], + command.SetChecker, + value_to_set=":") + + def test_host(self): + self.assert_command_from_args( + ["--host", "client.example.org", "client"], + command.SetHost, value_to_set="client.example.org") + + def test_host_short(self): + self.assert_command_from_args( + ["-H", "client.example.org", "client"], command.SetHost, + value_to_set="client.example.org") + + def test_secret_devnull(self): + self.assert_command_from_args(["--secret", os.path.devnull, + "client"], command.SetSecret, + value_to_set=b"") + + def test_secret_tempfile(self): + with tempfile.NamedTemporaryFile(mode="r+b") as f: + value = b"secret\0xyzzy\nbar" + f.write(value) + f.seek(0) + self.assert_command_from_args(["--secret", f.name, + "client"], + command.SetSecret, + value_to_set=value) + + def test_secret_devnull_short(self): + self.assert_command_from_args(["-s", os.path.devnull, + "client"], command.SetSecret, + value_to_set=b"") + + def test_secret_tempfile_short(self): + with tempfile.NamedTemporaryFile(mode="r+b") as f: + value = b"secret\0xyzzy\nbar" + f.write(value) + f.seek(0) + self.assert_command_from_args(["-s", f.name, "client"], + command.SetSecret, + value_to_set=value) + + def test_timeout(self): + self.assert_command_from_args(["--timeout", "PT5M", "client"], + command.SetTimeout, + value_to_set=300000) + + def test_timeout_short(self): + self.assert_command_from_args(["-t", "PT5M", "client"], + command.SetTimeout, + value_to_set=300000) + + def test_extended_timeout(self): + self.assert_command_from_args(["--extended-timeout", "PT15M", + "client"], + command.SetExtendedTimeout, + value_to_set=900000) + + def test_interval(self): + self.assert_command_from_args(["--interval", "PT2M", + "client"], command.SetInterval, + value_to_set=120000) + + def test_interval_short(self): + self.assert_command_from_args(["-i", "PT2M", "client"], + command.SetInterval, + value_to_set=120000) + + def test_approval_delay(self): + self.assert_command_from_args(["--approval-delay", "PT30S", + "client"], + command.SetApprovalDelay, + value_to_set=30000) + + def test_approval_duration(self): + self.assert_command_from_args(["--approval-duration", "PT1S", + "client"], + command.SetApprovalDuration, + value_to_set=1000) + + def test_print_table(self): + self.assert_command_from_args([], command.PrintTable, + verbose=False) + + def test_print_table_verbose(self): + self.assert_command_from_args(["--verbose"], + command.PrintTable, + verbose=True) + + def test_print_table_verbose_short(self): + self.assert_command_from_args(["-v"], command.PrintTable, + verbose=True) + + +class TestCommand(unittest.TestCase): + """Abstract class for tests of command classes""" + + class FakeMandosBus(dbus.MandosBus): + def __init__(self, testcase): + self.client_properties = { + "Name": "foo", + "KeyID": ("92ed150794387c03ce684574b1139a65" + "94a34f895daaaf09fd8ea90a27cddb12"), + "Secret": b"secret", + "Host": "foo.example.org", + "Enabled": True, + "Timeout": 300000, + "LastCheckedOK": "2019-02-03T00:00:00", + "Created": "2019-01-02T00:00:00", + "Interval": 120000, + "Fingerprint": ("778827225BA7DE539C5A" + "7CFA59CFF7CDBD9A5920"), + "CheckerRunning": False, + "LastEnabled": "2019-01-03T00:00:00", + "ApprovalPending": False, + "ApprovedByDefault": True, + "LastApprovalRequest": "", + "ApprovalDelay": 0, + "ApprovalDuration": 1000, + "Checker": "fping -q -- %(host)s", + "ExtendedTimeout": 900000, + "Expires": "2019-02-04T00:00:00", + "LastCheckerStatus": 0, + } + self.other_client_properties = { + "Name": "barbar", + "KeyID": ("0558568eedd67d622f5c83b35a115f79" + "6ab612cff5ad227247e46c2b020f441c"), + "Secret": b"secretbar", + "Host": "192.0.2.3", + "Enabled": True, + "Timeout": 300000, + "LastCheckedOK": "2019-02-04T00:00:00", + "Created": "2019-01-03T00:00:00", + "Interval": 120000, + "Fingerprint": ("3E393AEAEFB84C7E89E2" + "F547B3A107558FCA3A27"), + "CheckerRunning": True, + "LastEnabled": "2019-01-04T00:00:00", + "ApprovalPending": False, + "ApprovedByDefault": False, + "LastApprovalRequest": "2019-01-03T00:00:00", + "ApprovalDelay": 30000, + "ApprovalDuration": 93785000, + "Checker": ":", + "ExtendedTimeout": 900000, + "Expires": "2019-02-05T00:00:00", + "LastCheckerStatus": -2, + } + self.clients = collections.OrderedDict( + [ + ("client_objectpath", self.client_properties), + ("other_client_objectpath", + self.other_client_properties), + ]) + self.one_client = {"client_objectpath": + self.client_properties} + self.testcase = testcase + self.calls = [] + + def call_method(self, methodname, busname, objectpath, + interface, *args): + self.testcase.assertEqual("se.recompile.Mandos", busname) + self.calls.append((methodname, busname, objectpath, + interface, args)) + if interface == "org.freedesktop.DBus.Properties": + if methodname == "Set": + self.testcase.assertEqual(3, len(args)) + interface, key, value = args + self.testcase.assertEqual( + "se.recompile.Mandos.Client", interface) + self.clients[objectpath][key] = value + return + elif interface == "se.recompile.Mandos": + self.testcase.assertEqual("RemoveClient", methodname) + self.testcase.assertEqual(1, len(args)) + clientpath = args[0] + del self.clients[clientpath] + return + elif interface == "se.recompile.Mandos.Client": + if methodname == "Approve": + self.testcase.assertEqual(1, len(args)) + return + raise ValueError() + + def setUp(self): + self.bus = self.FakeMandosBus(self) + + +class TestBaseCommands(TestCommand): + + def test_IsEnabled_exits_successfully(self): + with self.assertRaises(SystemExit) as e: + command.IsEnabled().run(self.bus.one_client) + if e.exception.code is not None: + self.assertEqual(0, e.exception.code) + else: + self.assertIsNone(e.exception.code) + + def test_IsEnabled_exits_with_failure(self): + self.bus.client_properties["Enabled"] = False + with self.assertRaises(SystemExit) as e: + command.IsEnabled().run(self.bus.one_client) + if isinstance(e.exception.code, int): + self.assertNotEqual(0, e.exception.code) + else: + self.assertIsNotNone(e.exception.code) + + def test_Approve(self): + busname = "se.recompile.Mandos" + client_interface = "se.recompile.Mandos.Client" + command.Approve().run(self.bus.clients, self.bus) + for clientpath in self.bus.clients: + self.assertIn(("Approve", busname, clientpath, + client_interface, (True,)), self.bus.calls) + + def test_Deny(self): + busname = "se.recompile.Mandos" + client_interface = "se.recompile.Mandos.Client" + command.Deny().run(self.bus.clients, self.bus) + for clientpath in self.bus.clients: + self.assertIn(("Approve", busname, clientpath, + client_interface, (False,)), + self.bus.calls) + + def test_Remove(self): + command.Remove().run(self.bus.clients, self.bus) + for clientpath in self.bus.clients: + self.assertIn(("RemoveClient", dbus_busname, + dbus_server_path, dbus_server_interface, + (clientpath,)), self.bus.calls) + + expected_json = { + "foo": { + "Name": "foo", + "KeyID": ("92ed150794387c03ce684574b1139a65" + "94a34f895daaaf09fd8ea90a27cddb12"), + "Host": "foo.example.org", + "Enabled": True, + "Timeout": 300000, + "LastCheckedOK": "2019-02-03T00:00:00", + "Created": "2019-01-02T00:00:00", + "Interval": 120000, + "Fingerprint": ("778827225BA7DE539C5A" + "7CFA59CFF7CDBD9A5920"), + "CheckerRunning": False, + "LastEnabled": "2019-01-03T00:00:00", + "ApprovalPending": False, + "ApprovedByDefault": True, + "LastApprovalRequest": "", + "ApprovalDelay": 0, + "ApprovalDuration": 1000, + "Checker": "fping -q -- %(host)s", + "ExtendedTimeout": 900000, + "Expires": "2019-02-04T00:00:00", + "LastCheckerStatus": 0, + }, + "barbar": { + "Name": "barbar", + "KeyID": ("0558568eedd67d622f5c83b35a115f79" + "6ab612cff5ad227247e46c2b020f441c"), + "Host": "192.0.2.3", + "Enabled": True, + "Timeout": 300000, + "LastCheckedOK": "2019-02-04T00:00:00", + "Created": "2019-01-03T00:00:00", + "Interval": 120000, + "Fingerprint": ("3E393AEAEFB84C7E89E2" + "F547B3A107558FCA3A27"), + "CheckerRunning": True, + "LastEnabled": "2019-01-04T00:00:00", + "ApprovalPending": False, + "ApprovedByDefault": False, + "LastApprovalRequest": "2019-01-03T00:00:00", + "ApprovalDelay": 30000, + "ApprovalDuration": 93785000, + "Checker": ":", + "ExtendedTimeout": 900000, + "Expires": "2019-02-05T00:00:00", + "LastCheckerStatus": -2, + }, + } + + def test_DumpJSON_normal(self): + with self.capture_stdout_to_buffer() as buffer: + command.DumpJSON().run(self.bus.clients) + json_data = json.loads(buffer.getvalue()) + self.assertDictEqual(self.expected_json, json_data) + + @staticmethod + @contextlib.contextmanager + def capture_stdout_to_buffer(): + capture_buffer = io.StringIO() + old_stdout = sys.stdout + sys.stdout = capture_buffer + try: + yield capture_buffer + finally: + sys.stdout = old_stdout + + def test_DumpJSON_one_client(self): + with self.capture_stdout_to_buffer() as buffer: + command.DumpJSON().run(self.bus.one_client) + json_data = json.loads(buffer.getvalue()) + expected_json = {"foo": self.expected_json["foo"]} + self.assertDictEqual(expected_json, json_data) + + def test_PrintTable_normal(self): + with self.capture_stdout_to_buffer() as buffer: + command.PrintTable().run(self.bus.clients) + expected_output = "\n".join(( + "Name Enabled Timeout Last Successful Check", + "foo Yes 00:05:00 2019-02-03T00:00:00 ", + "barbar Yes 00:05:00 2019-02-04T00:00:00 ", + )) + "\n" + self.assertEqual(expected_output, buffer.getvalue()) + + def test_PrintTable_verbose(self): + with self.capture_stdout_to_buffer() as buffer: + command.PrintTable(verbose=True).run(self.bus.clients) + columns = ( + ( + "Name ", + "foo ", + "barbar ", + ),( + "Enabled ", + "Yes ", + "Yes ", + ),( + "Timeout ", + "00:05:00 ", + "00:05:00 ", + ),( + "Last Successful Check ", + "2019-02-03T00:00:00 ", + "2019-02-04T00:00:00 ", + ),( + "Created ", + "2019-01-02T00:00:00 ", + "2019-01-03T00:00:00 ", + ),( + "Interval ", + "00:02:00 ", + "00:02:00 ", + ),( + "Host ", + "foo.example.org ", + "192.0.2.3 ", + ),( + ("Key ID " + " "), + ("92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8" + "ea90a27cddb12 "), + ("0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e" + "46c2b020f441c "), + ),( + "Fingerprint ", + "778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 ", + "3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 ", + ),( + "Check Is Running ", + "No ", + "Yes ", + ),( + "Last Enabled ", + "2019-01-03T00:00:00 ", + "2019-01-04T00:00:00 ", + ),( + "Approval Is Pending ", + "No ", + "No ", + ),( + "Approved By Default ", + "Yes ", + "No ", + ),( + "Last Approval Request ", + " ", + "2019-01-03T00:00:00 ", + ),( + "Approval Delay ", + "00:00:00 ", + "00:00:30 ", + ),( + "Approval Duration ", + "00:00:01 ", + "1T02:03:05 ", + ),( + "Checker ", + "fping -q -- %(host)s ", + ": ", + ),( + "Extended Timeout ", + "00:15:00 ", + "00:15:00 ", + ),( + "Expires ", + "2019-02-04T00:00:00 ", + "2019-02-05T00:00:00 ", + ),( + "Last Checker Status", + "0 ", + "-2 ", + ) + ) + num_lines = max(len(rows) for rows in columns) + expected_output = ("\n".join("".join(rows[line] + for rows in columns) + for line in range(num_lines)) + + "\n") + self.assertEqual(expected_output, buffer.getvalue()) + + def test_PrintTable_one_client(self): + with self.capture_stdout_to_buffer() as buffer: + command.PrintTable().run(self.bus.one_client) + expected_output = "\n".join(( + "Name Enabled Timeout Last Successful Check", + "foo Yes 00:05:00 2019-02-03T00:00:00 ", + )) + "\n" + self.assertEqual(expected_output, buffer.getvalue()) + + +class TestPropertySetterCmd(TestCommand): + """Abstract class for tests of command.PropertySetter classes""" + + def runTest(self): + if not hasattr(self, "command"): + return # Abstract TestCase class + + if hasattr(self, "values_to_set"): + cmd_args = [(value,) for value in self.values_to_set] + values_to_get = getattr(self, "values_to_get", + self.values_to_set) + else: + cmd_args = [() for x in range(len(self.values_to_get))] + values_to_get = self.values_to_get + for value_to_get, cmd_arg in zip(values_to_get, cmd_args): + for clientpath in self.bus.clients: + self.bus.clients[clientpath][self.propname] = ( + Unique()) + self.command(*cmd_arg).run(self.bus.clients, self.bus) + for clientpath in self.bus.clients: + value = (self.bus.clients[clientpath] + [self.propname]) + self.assertNotIsInstance(value, Unique) + self.assertEqual(value_to_get, value) + + +class TestEnableCmd(TestPropertySetterCmd): + command = command.Enable + propname = "Enabled" + values_to_get = [True] + + +class TestDisableCmd(TestPropertySetterCmd): + command = command.Disable + propname = "Enabled" + values_to_get = [False] + + +class TestBumpTimeoutCmd(TestPropertySetterCmd): + command = command.BumpTimeout + propname = "LastCheckedOK" + values_to_get = [""] + + +class TestStartCheckerCmd(TestPropertySetterCmd): + command = command.StartChecker + propname = "CheckerRunning" + values_to_get = [True] + + +class TestStopCheckerCmd(TestPropertySetterCmd): + command = command.StopChecker + propname = "CheckerRunning" + values_to_get = [False] + + +class TestApproveByDefaultCmd(TestPropertySetterCmd): + command = command.ApproveByDefault + propname = "ApprovedByDefault" + values_to_get = [True] + + +class TestDenyByDefaultCmd(TestPropertySetterCmd): + command = command.DenyByDefault + propname = "ApprovedByDefault" + values_to_get = [False] + + +class TestSetCheckerCmd(TestPropertySetterCmd): + command = command.SetChecker + propname = "Checker" + values_to_set = ["", ":", "fping -q -- %s"] + + +class TestSetHostCmd(TestPropertySetterCmd): + command = command.SetHost + propname = "Host" + values_to_set = ["192.0.2.3", "client.example.org"] + + +class TestSetSecretCmd(TestPropertySetterCmd): + command = command.SetSecret + propname = "Secret" + values_to_set = [io.BytesIO(b""), + io.BytesIO(b"secret\0xyzzy\nbar")] + values_to_get = [f.getvalue() for f in values_to_set] + + +class TestSetTimeoutCmd(TestPropertySetterCmd): + command = command.SetTimeout + propname = "Timeout" + values_to_set = [datetime.timedelta(), + datetime.timedelta(minutes=5), + datetime.timedelta(seconds=1), + datetime.timedelta(weeks=1), + datetime.timedelta(weeks=52)] + values_to_get = [dt.total_seconds()*1000 for dt in values_to_set] + + +class TestSetExtendedTimeoutCmd(TestPropertySetterCmd): + command = command.SetExtendedTimeout + propname = "ExtendedTimeout" + values_to_set = [datetime.timedelta(), + datetime.timedelta(minutes=5), + datetime.timedelta(seconds=1), + datetime.timedelta(weeks=1), + datetime.timedelta(weeks=52)] + values_to_get = [dt.total_seconds()*1000 for dt in values_to_set] + + +class TestSetIntervalCmd(TestPropertySetterCmd): + command = command.SetInterval + propname = "Interval" + values_to_set = [datetime.timedelta(), + datetime.timedelta(minutes=5), + datetime.timedelta(seconds=1), + datetime.timedelta(weeks=1), + datetime.timedelta(weeks=52)] + values_to_get = [dt.total_seconds()*1000 for dt in values_to_set] + + +class TestSetApprovalDelayCmd(TestPropertySetterCmd): + command = command.SetApprovalDelay + propname = "ApprovalDelay" + values_to_set = [datetime.timedelta(), + datetime.timedelta(minutes=5), + datetime.timedelta(seconds=1), + datetime.timedelta(weeks=1), + datetime.timedelta(weeks=52)] + values_to_get = [dt.total_seconds()*1000 for dt in values_to_set] + + +class TestSetApprovalDurationCmd(TestPropertySetterCmd): + command = command.SetApprovalDuration + propname = "ApprovalDuration" + values_to_set = [datetime.timedelta(), + datetime.timedelta(minutes=5), + datetime.timedelta(seconds=1), + datetime.timedelta(weeks=1), + datetime.timedelta(weeks=52)] + values_to_get = [dt.total_seconds()*1000 for dt in values_to_set] + + + +def should_only_run_tests(): + parser = argparse.ArgumentParser(add_help=False) + parser.add_argument("--check", action='store_true') + args, unknown_args = parser.parse_known_args() + run_tests = args.check + if run_tests: + # Remove --check argument from sys.argv + sys.argv[1:] = unknown_args + return run_tests + +# Add all tests from doctest strings +def load_tests(loader, tests, none): + import doctest + tests.addTests(doctest.DocTestSuite()) + return tests if __name__ == "__main__": - main() + try: + if should_only_run_tests(): + # Call using ./tdd-python-script --check [--verbose] + unittest.main() + else: + main() + finally: + logging.shutdown() === modified file 'mandos-ctl.xml' --- mandos-ctl.xml 2019-02-10 04:20:26 +0000 +++ mandos-ctl.xml 2019-03-08 23:55:34 +0000 @@ -2,7 +2,7 @@ - + %common; ]> @@ -62,6 +62,22 @@ &COMMANDNAME; + + + + + + + + + + + CLIENT + + + + + &COMMANDNAME; @@ -78,18 +94,10 @@ - - - - - - - - - - - - - + @@ -173,13 +175,18 @@ &COMMANDNAME; - - - - - - - + + + + + + + + + + + + CLIENT @@ -191,6 +198,7 @@ + CLIENT @@ -504,6 +512,15 @@ + + + + Show debug output; currently, this means show D-Bus calls. + + + + + @@ -581,7 +598,7 @@ -&COMMANDNAME; --timeout="5m" --interval="1m" foo1.example.org foo2.example.org +&COMMANDNAME; --timeout="PT5M" --interval="PT1M" foo1.example.org foo2.example.org === modified file 'plugin-runner.c' --- plugin-runner.c 2019-02-11 05:14:10 +0000 +++ plugin-runner.c 2019-04-08 21:53:22 +0000 @@ -313,9 +313,10 @@ __attribute__((nonnull)) static void free_plugin(plugin *plugin_node){ - for(char **arg = plugin_node->argv; *arg != NULL; arg++){ + for(char **arg = (plugin_node->argv)+1; *arg != NULL; arg++){ free(*arg); } + free(plugin_node->name); free(plugin_node->argv); for(char **env = plugin_node->environ; *env != NULL; env++){ free(*env); === modified file 'plugins.d/askpass-fifo.c' --- plugins.d/askpass-fifo.c 2019-02-11 05:14:10 +0000 +++ plugins.d/askpass-fifo.c 2019-02-11 07:06:55 +0000 @@ -2,8 +2,8 @@ /* * Askpass-FIFO - Read a password from a FIFO and output it * - * Copyright © 2008-2018 Teddy Hogeborn - * Copyright © 2008-2018 Björn Påhlsson + * Copyright © 2008-2019 Teddy Hogeborn + * Copyright © 2008-2019 Björn Påhlsson * * This file is part of Mandos. * === modified file 'plugins.d/password-prompt.c' --- plugins.d/password-prompt.c 2019-02-11 05:14:10 +0000 +++ plugins.d/password-prompt.c 2019-02-11 07:06:55 +0000 @@ -2,8 +2,8 @@ /* * Password-prompt - Read a password from the terminal and print it * - * Copyright © 2008-2018 Teddy Hogeborn - * Copyright © 2008-2018 Björn Påhlsson + * Copyright © 2008-2019 Teddy Hogeborn + * Copyright © 2008-2019 Björn Påhlsson * * This file is part of Mandos. *