=== modified file 'mandos-ctl' --- mandos-ctl 2019-03-12 19:15:52 +0000 +++ mandos-ctl 2019-03-12 20:13:34 +0000 @@ -79,13 +79,156 @@ dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager" -def milliseconds_to_string(ms): - td = datetime.timedelta(0, 0, 0, ms) - return ("{days}{hours:02}:{minutes:02}:{seconds:02}" - .format(days="{}T".format(td.days) if td.days else "", - hours=td.seconds // 3600, - minutes=(td.seconds % 3600) // 60, - seconds=td.seconds % 60)) +def main(): + parser = argparse.ArgumentParser() + + add_command_line_options(parser) + + options = parser.parse_args() + + check_option_syntax(parser, options) + + clientnames = options.client + + if options.debug: + log.setLevel(logging.DEBUG) + + try: + bus = dbus.SystemBus() + log.debug("D-Bus: Connect to: (busname=%r, path=%r)", + dbus_busname, server_dbus_path) + mandos_dbus_objc = bus.get_object(dbus_busname, + server_dbus_path) + except dbus.exceptions.DBusException: + log.critical("Could not connect to Mandos server") + sys.exit(1) + + mandos_serv = dbus.Interface(mandos_dbus_objc, + dbus_interface=server_dbus_interface) + mandos_serv_object_manager = dbus.Interface( + mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE) + + # Filter out log message from dbus module + dbus_logger = logging.getLogger("dbus.proxies") + class NullFilter(logging.Filter): + def filter(self, record): + return False + dbus_filter = NullFilter() + try: + dbus_logger.addFilter(dbus_filter) + log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname, + server_dbus_path, dbus.OBJECT_MANAGER_IFACE) + mandos_clients = {path: ifs_and_props[client_dbus_interface] + for path, ifs_and_props in + mandos_serv_object_manager + .GetManagedObjects().items() + if client_dbus_interface in ifs_and_props} + except dbus.exceptions.DBusException as e: + log.critical("Failed to access Mandos server through D-Bus:" + "\n%s", e) + sys.exit(1) + finally: + # restore dbus logger + dbus_logger.removeFilter(dbus_filter) + + # Compile dict of (clients: properties) to process + clients = {} + + if not clientnames: + clients = {objpath: properties + for objpath, properties in mandos_clients.items()} + else: + for name in clientnames: + for objpath, properties in mandos_clients.items(): + if properties["Name"] == name: + clients[objpath] = properties + break + else: + log.critical("Client not found on server: %r", name) + sys.exit(1) + + # Run all commands on clients + commands = commands_from_options(options) + for command in commands: + command.run(clients, bus, mandos_serv) + + +def add_command_line_options(parser): + parser.add_argument("--version", action="version", + version="%(prog)s {}".format(version), + help="show version number and exit") + parser.add_argument("-a", "--all", action="store_true", + help="Select all clients") + parser.add_argument("-v", "--verbose", action="store_true", + help="Print all fields") + parser.add_argument("-j", "--dump-json", action="store_true", + help="Dump client data in JSON format") + enable_disable = parser.add_mutually_exclusive_group() + enable_disable.add_argument("-e", "--enable", action="store_true", + help="Enable client") + enable_disable.add_argument("-d", "--disable", + action="store_true", + help="disable client") + parser.add_argument("-b", "--bump-timeout", action="store_true", + help="Bump timeout for client") + start_stop_checker = parser.add_mutually_exclusive_group() + start_stop_checker.add_argument("--start-checker", + action="store_true", + help="Start checker for client") + start_stop_checker.add_argument("--stop-checker", + action="store_true", + help="Stop checker for client") + parser.add_argument("-V", "--is-enabled", action="store_true", + help="Check if client is enabled") + parser.add_argument("-r", "--remove", action="store_true", + help="Remove client") + parser.add_argument("-c", "--checker", + help="Set checker command for client") + parser.add_argument("-t", "--timeout", type=string_to_delta, + help="Set timeout for client") + parser.add_argument("--extended-timeout", type=string_to_delta, + help="Set extended timeout for client") + parser.add_argument("-i", "--interval", type=string_to_delta, + help="Set checker interval for client") + approve_deny_default = parser.add_mutually_exclusive_group() + approve_deny_default.add_argument( + "--approve-by-default", action="store_true", + default=None, dest="approved_by_default", + help="Set client to be approved by default") + approve_deny_default.add_argument( + "--deny-by-default", action="store_false", + dest="approved_by_default", + help="Set client to be denied by default") + parser.add_argument("--approval-delay", type=string_to_delta, + help="Set delay before client approve/deny") + parser.add_argument("--approval-duration", type=string_to_delta, + help="Set duration of one client approval") + parser.add_argument("-H", "--host", help="Set host for client") + parser.add_argument("-s", "--secret", + type=argparse.FileType(mode="rb"), + help="Set password blob (file) for client") + approve_deny = parser.add_mutually_exclusive_group() + approve_deny.add_argument( + "-A", "--approve", action="store_true", + help="Approve any current client request") + approve_deny.add_argument("-D", "--deny", action="store_true", + help="Deny any current client request") + parser.add_argument("--debug", action="store_true", + help="Debug mode (show D-Bus commands)") + parser.add_argument("--check", action="store_true", + help="Run self-test") + parser.add_argument("client", nargs="*", help="Client name") + + +def string_to_delta(interval): + """Parse a string and return a datetime.timedelta""" + + try: + return rfc3339_duration_to_delta(interval) + except ValueError as e: + log.warning("%s - Parsing as pre-1.6.1 interval instead", + ' '.join(e.args)) + return parse_pre_1_6_1_interval(interval) def rfc3339_duration_to_delta(duration): @@ -218,17 +361,6 @@ return value -def string_to_delta(interval): - """Parse a string and return a datetime.timedelta""" - - try: - return rfc3339_duration_to_delta(interval) - except ValueError as e: - log.warning("%s - Parsing as pre-1.6.1 interval instead", - ' '.join(e.args)) - return parse_pre_1_6_1_interval(interval) - - def parse_pre_1_6_1_interval(interval): """Parse an interval string as documented by Mandos before 1.6.1, and return a datetime.timedelta @@ -272,9 +404,122 @@ return value -## Classes for commands. - -# Abstract classes first +def check_option_syntax(parser, options): + """Apply additional restrictions on options, not expressible in +argparse""" + + def has_actions(options): + return any((options.enable, + options.disable, + options.bump_timeout, + options.start_checker, + options.stop_checker, + options.is_enabled, + options.remove, + options.checker is not None, + options.timeout is not None, + options.extended_timeout is not None, + options.interval is not None, + options.approved_by_default is not None, + options.approval_delay is not None, + options.approval_duration is not None, + options.host is not None, + options.secret is not None, + options.approve, + options.deny)) + + if has_actions(options) and not (options.client or options.all): + parser.error("Options require clients names or --all.") + if options.verbose and has_actions(options): + parser.error("--verbose can only be used alone.") + if options.dump_json and (options.verbose + or has_actions(options)): + parser.error("--dump-json can only be used alone.") + if options.all and not has_actions(options): + parser.error("--all requires an action.") + if options.is_enabled and len(options.client) > 1: + parser.error("--is-enabled requires exactly one client") + if options.remove: + options.remove = False + if has_actions(options) and not options.deny: + parser.error("--remove can only be combined with --deny") + options.remove = True + + +def commands_from_options(options): + + commands = [] + + if options.is_enabled: + commands.append(IsEnabledCmd()) + + if options.approve: + commands.append(ApproveCmd()) + + if options.deny: + commands.append(DenyCmd()) + + if options.remove: + commands.append(RemoveCmd()) + + if options.dump_json: + commands.append(DumpJSONCmd()) + + if options.enable: + commands.append(EnableCmd()) + + if options.disable: + commands.append(DisableCmd()) + + if options.bump_timeout: + commands.append(BumpTimeoutCmd()) + + if options.start_checker: + commands.append(StartCheckerCmd()) + + if options.stop_checker: + commands.append(StopCheckerCmd()) + + if options.approved_by_default is not None: + if options.approved_by_default: + commands.append(ApproveByDefaultCmd()) + else: + commands.append(DenyByDefaultCmd()) + + if options.checker is not None: + commands.append(SetCheckerCmd(options.checker)) + + if options.host is not None: + commands.append(SetHostCmd(options.host)) + + if options.secret is not None: + commands.append(SetSecretCmd(options.secret)) + + if options.timeout is not None: + commands.append(SetTimeoutCmd(options.timeout)) + + if options.extended_timeout: + commands.append( + SetExtendedTimeoutCmd(options.extended_timeout)) + + if options.interval is not None: + commands.append(SetIntervalCmd(options.interval)) + + if options.approval_delay is not None: + commands.append(SetApprovalDelayCmd(options.approval_delay)) + + if options.approval_duration is not None: + commands.append( + SetApprovalDurationCmd(options.approval_duration)) + + # If no command option has been given, show table of clients, + # optionally verbosely + if not commands: + commands.append(PrintTableCmd(verbose=options.verbose)) + + return commands + + class Command(object): """Abstract class for commands""" def run(self, clients, bus=None, mandos=None): @@ -288,6 +533,41 @@ client = bus.get_object(dbus_busname, clientpath) self.run_on_one_client(client, properties) + +class IsEnabledCmd(Command): + def run(self, clients, bus=None, mandos=None): + client, properties = next(iter(clients.items())) + if self.is_enabled(client, properties): + sys.exit(0) + sys.exit(1) + def is_enabled(self, client, properties): + return properties["Enabled"] + + +class ApproveCmd(Command): + def run_on_one_client(self, client, properties): + log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname, + client.__dbus_object_path__, client_dbus_interface) + client.Approve(dbus.Boolean(True), + dbus_interface=client_dbus_interface) + + +class DenyCmd(Command): + def run_on_one_client(self, client, properties): + log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname, + client.__dbus_object_path__, client_dbus_interface) + client.Approve(dbus.Boolean(False), + dbus_interface=client_dbus_interface) + + +class RemoveCmd(Command): + def run_on_one_client(self, client, properties): + log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname, + server_dbus_path, server_dbus_interface, + str(client.__dbus_object_path__)) + self.mandos.RemoveClient(client.__dbus_object_path__) + + class PrintCmd(Command): """Abstract class for commands printing client details""" all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK", @@ -302,40 +582,20 @@ def output(self, clients): raise NotImplementedError() -class PropertyCmd(Command): - """Abstract class for Actions for setting one client property""" - def run_on_one_client(self, client, properties): - """Set the Client's D-Bus property""" - log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname, - client.__dbus_object_path__, - dbus.PROPERTIES_IFACE, client_dbus_interface, - self.propname, self.value_to_set - if not isinstance(self.value_to_set, dbus.Boolean) - else bool(self.value_to_set)) - client.Set(client_dbus_interface, self.propname, - self.value_to_set, - dbus_interface=dbus.PROPERTIES_IFACE) - @property - def propname(self): - raise NotImplementedError() - -class PropertyValueCmd(PropertyCmd): - """Abstract class for PropertyCmd recieving a value as argument""" - def __init__(self, value): - self.value_to_set = value - -class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd): - """Abstract class for PropertyValueCmd taking a value argument as -a datetime.timedelta() but should store it as milliseconds.""" - @property - def value_to_set(self): - return self._vts - @value_to_set.setter - def value_to_set(self, value): - """When setting, convert value from a datetime.timedelta""" - self._vts = int(round(value.total_seconds() * 1000)) - -# Actual (non-abstract) command classes + +class DumpJSONCmd(PrintCmd): + def output(self, clients): + data = {client["Name"]: + {key: self.dbus_boolean_to_bool(client[key]) + for key in self.all_keywords} + for client in clients.values()} + return json.dumps(data, indent=4, separators=(',', ': ')) + @staticmethod + def dbus_boolean_to_bool(value): + if isinstance(value, dbus.Boolean): + value = bool(value) + return value + class PrintTableCmd(PrintCmd): def __init__(self, verbose=False): @@ -424,84 +684,82 @@ for key in self.keywords}) - -class DumpJSONCmd(PrintCmd): - def output(self, clients): - data = {client["Name"]: - {key: self.dbus_boolean_to_bool(client[key]) - for key in self.all_keywords} - for client in clients.values()} - return json.dumps(data, indent=4, separators=(',', ': ')) - @staticmethod - def dbus_boolean_to_bool(value): - if isinstance(value, dbus.Boolean): - value = bool(value) - return value - -class IsEnabledCmd(Command): - def run(self, clients, bus=None, mandos=None): - client, properties = next(iter(clients.items())) - if self.is_enabled(client, properties): - sys.exit(0) - sys.exit(1) - def is_enabled(self, client, properties): - return properties["Enabled"] - -class RemoveCmd(Command): - def run_on_one_client(self, client, properties): - log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname, - server_dbus_path, server_dbus_interface, - str(client.__dbus_object_path__)) - self.mandos.RemoveClient(client.__dbus_object_path__) - -class ApproveCmd(Command): - def run_on_one_client(self, client, properties): - log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname, - client.__dbus_object_path__, client_dbus_interface) - client.Approve(dbus.Boolean(True), - dbus_interface=client_dbus_interface) - -class DenyCmd(Command): - def run_on_one_client(self, client, properties): - log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname, - client.__dbus_object_path__, client_dbus_interface) - client.Approve(dbus.Boolean(False), - dbus_interface=client_dbus_interface) +def milliseconds_to_string(ms): + td = datetime.timedelta(0, 0, 0, ms) + return ("{days}{hours:02}:{minutes:02}:{seconds:02}" + .format(days="{}T".format(td.days) if td.days else "", + hours=td.seconds // 3600, + minutes=(td.seconds % 3600) // 60, + seconds=td.seconds % 60)) + + +class PropertyCmd(Command): + """Abstract class for Actions for setting one client property""" + def run_on_one_client(self, client, properties): + """Set the Client's D-Bus property""" + log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname, + client.__dbus_object_path__, + dbus.PROPERTIES_IFACE, client_dbus_interface, + self.propname, self.value_to_set + if not isinstance(self.value_to_set, dbus.Boolean) + else bool(self.value_to_set)) + client.Set(client_dbus_interface, self.propname, + self.value_to_set, + dbus_interface=dbus.PROPERTIES_IFACE) + @property + def propname(self): + raise NotImplementedError() + class EnableCmd(PropertyCmd): propname = "Enabled" value_to_set = dbus.Boolean(True) + class DisableCmd(PropertyCmd): propname = "Enabled" value_to_set = dbus.Boolean(False) + class BumpTimeoutCmd(PropertyCmd): propname = "LastCheckedOK" value_to_set = "" + class StartCheckerCmd(PropertyCmd): propname = "CheckerRunning" value_to_set = dbus.Boolean(True) + class StopCheckerCmd(PropertyCmd): propname = "CheckerRunning" value_to_set = dbus.Boolean(False) + class ApproveByDefaultCmd(PropertyCmd): propname = "ApprovedByDefault" value_to_set = dbus.Boolean(True) + class DenyByDefaultCmd(PropertyCmd): propname = "ApprovedByDefault" value_to_set = dbus.Boolean(False) + +class PropertyValueCmd(PropertyCmd): + """Abstract class for PropertyCmd recieving a value as argument""" + def __init__(self, value): + self.value_to_set = value + + class SetCheckerCmd(PropertyValueCmd): propname = "Checker" + class SetHostCmd(PropertyValueCmd): propname = "Host" + class SetSecretCmd(PropertyValueCmd): propname = "Secret" @property @@ -513,291 +771,40 @@ self._vts = value.read() value.close() + +class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd): + """Abstract class for PropertyValueCmd taking a value argument as +a datetime.timedelta() but should store it as milliseconds.""" + @property + def value_to_set(self): + return self._vts + @value_to_set.setter + def value_to_set(self, value): + """When setting, convert value from a datetime.timedelta""" + self._vts = int(round(value.total_seconds() * 1000)) + + class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd): propname = "Timeout" + class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd): propname = "ExtendedTimeout" + class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd): propname = "Interval" + class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd): propname = "ApprovalDelay" + class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd): propname = "ApprovalDuration" -def add_command_line_options(parser): - parser.add_argument("--version", action="version", - version="%(prog)s {}".format(version), - help="show version number and exit") - parser.add_argument("-a", "--all", action="store_true", - help="Select all clients") - parser.add_argument("-v", "--verbose", action="store_true", - help="Print all fields") - parser.add_argument("-j", "--dump-json", action="store_true", - help="Dump client data in JSON format") - enable_disable = parser.add_mutually_exclusive_group() - enable_disable.add_argument("-e", "--enable", action="store_true", - help="Enable client") - enable_disable.add_argument("-d", "--disable", - action="store_true", - help="disable client") - parser.add_argument("-b", "--bump-timeout", action="store_true", - help="Bump timeout for client") - start_stop_checker = parser.add_mutually_exclusive_group() - start_stop_checker.add_argument("--start-checker", - action="store_true", - help="Start checker for client") - start_stop_checker.add_argument("--stop-checker", - action="store_true", - help="Stop checker for client") - parser.add_argument("-V", "--is-enabled", action="store_true", - help="Check if client is enabled") - parser.add_argument("-r", "--remove", action="store_true", - help="Remove client") - parser.add_argument("-c", "--checker", - help="Set checker command for client") - parser.add_argument("-t", "--timeout", type=string_to_delta, - help="Set timeout for client") - parser.add_argument("--extended-timeout", type=string_to_delta, - help="Set extended timeout for client") - parser.add_argument("-i", "--interval", type=string_to_delta, - help="Set checker interval for client") - approve_deny_default = parser.add_mutually_exclusive_group() - approve_deny_default.add_argument( - "--approve-by-default", action="store_true", - default=None, dest="approved_by_default", - help="Set client to be approved by default") - approve_deny_default.add_argument( - "--deny-by-default", action="store_false", - dest="approved_by_default", - help="Set client to be denied by default") - parser.add_argument("--approval-delay", type=string_to_delta, - help="Set delay before client approve/deny") - parser.add_argument("--approval-duration", type=string_to_delta, - help="Set duration of one client approval") - parser.add_argument("-H", "--host", help="Set host for client") - parser.add_argument("-s", "--secret", - type=argparse.FileType(mode="rb"), - help="Set password blob (file) for client") - approve_deny = parser.add_mutually_exclusive_group() - approve_deny.add_argument( - "-A", "--approve", action="store_true", - help="Approve any current client request") - approve_deny.add_argument("-D", "--deny", action="store_true", - help="Deny any current client request") - parser.add_argument("--debug", action="store_true", - help="Debug mode (show D-Bus commands)") - parser.add_argument("--check", action="store_true", - help="Run self-test") - parser.add_argument("client", nargs="*", help="Client name") - - -def commands_from_options(options): - - commands = [] - - if options.dump_json: - commands.append(DumpJSONCmd()) - - if options.enable: - commands.append(EnableCmd()) - - if options.disable: - commands.append(DisableCmd()) - - if options.bump_timeout: - commands.append(BumpTimeoutCmd()) - - if options.start_checker: - commands.append(StartCheckerCmd()) - - if options.stop_checker: - commands.append(StopCheckerCmd()) - - if options.is_enabled: - commands.append(IsEnabledCmd()) - - if options.checker is not None: - commands.append(SetCheckerCmd(options.checker)) - - if options.timeout is not None: - commands.append(SetTimeoutCmd(options.timeout)) - - if options.extended_timeout: - commands.append( - SetExtendedTimeoutCmd(options.extended_timeout)) - - if options.interval is not None: - commands.append(SetIntervalCmd(options.interval)) - - if options.approved_by_default is not None: - if options.approved_by_default: - commands.append(ApproveByDefaultCmd()) - else: - commands.append(DenyByDefaultCmd()) - - if options.approval_delay is not None: - commands.append(SetApprovalDelayCmd(options.approval_delay)) - - if options.approval_duration is not None: - commands.append( - SetApprovalDurationCmd(options.approval_duration)) - - if options.host is not None: - commands.append(SetHostCmd(options.host)) - - if options.secret is not None: - commands.append(SetSecretCmd(options.secret)) - - if options.approve: - commands.append(ApproveCmd()) - - if options.deny: - commands.append(DenyCmd()) - - if options.remove: - commands.append(RemoveCmd()) - - # If no command option has been given, show table of clients, - # optionally verbosely - if not commands: - commands.append(PrintTableCmd(verbose=options.verbose)) - - return commands - - -def check_option_syntax(parser, options): - """Apply additional restrictions on options, not expressible in -argparse""" - - def has_actions(options): - return any((options.enable, - options.disable, - options.bump_timeout, - options.start_checker, - options.stop_checker, - options.is_enabled, - options.remove, - options.checker is not None, - options.timeout is not None, - options.extended_timeout is not None, - options.interval is not None, - options.approved_by_default is not None, - options.approval_delay is not None, - options.approval_duration is not None, - options.host is not None, - options.secret is not None, - options.approve, - options.deny)) - - if has_actions(options) and not (options.client or options.all): - parser.error("Options require clients names or --all.") - if options.verbose and has_actions(options): - parser.error("--verbose can only be used alone.") - if options.dump_json and (options.verbose - or has_actions(options)): - parser.error("--dump-json can only be used alone.") - if options.all and not has_actions(options): - parser.error("--all requires an action.") - if options.is_enabled and len(options.client) > 1: - parser.error("--is-enabled requires exactly one client") - if options.remove: - options.remove = False - if has_actions(options) and not options.deny: - parser.error("--remove can only be combined with --deny") - options.remove = True - - -def main(): - parser = argparse.ArgumentParser() - - add_command_line_options(parser) - - options = parser.parse_args() - - check_option_syntax(parser, options) - - clientnames = options.client - - if options.debug: - log.setLevel(logging.DEBUG) - - try: - bus = dbus.SystemBus() - log.debug("D-Bus: Connect to: (busname=%r, path=%r)", - dbus_busname, server_dbus_path) - mandos_dbus_objc = bus.get_object(dbus_busname, - server_dbus_path) - except dbus.exceptions.DBusException: - log.critical("Could not connect to Mandos server") - sys.exit(1) - - mandos_serv = dbus.Interface(mandos_dbus_objc, - dbus_interface=server_dbus_interface) - mandos_serv_object_manager = dbus.Interface( - mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE) - - # Filter out log message from dbus module - dbus_logger = logging.getLogger("dbus.proxies") - class NullFilter(logging.Filter): - def filter(self, record): - return False - dbus_filter = NullFilter() - try: - dbus_logger.addFilter(dbus_filter) - log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname, - server_dbus_path, dbus.OBJECT_MANAGER_IFACE) - mandos_clients = {path: ifs_and_props[client_dbus_interface] - for path, ifs_and_props in - mandos_serv_object_manager - .GetManagedObjects().items() - if client_dbus_interface in ifs_and_props} - except dbus.exceptions.DBusException as e: - log.critical("Failed to access Mandos server through D-Bus:" - "\n%s", e) - sys.exit(1) - finally: - # restore dbus logger - dbus_logger.removeFilter(dbus_filter) - - # Compile dict of (clients: properties) to process - clients = {} - - if not clientnames: - clients = {objpath: properties - for objpath, properties in mandos_clients.items()} - else: - for name in clientnames: - for objpath, properties in mandos_clients.items(): - if properties["Name"] == name: - clients[objpath] = properties - break - else: - log.critical("Client not found on server: %r", name) - sys.exit(1) - - # Run all commands on clients - commands = commands_from_options(options) - for command in commands: - command.run(clients, bus, mandos_serv) -class Test_milliseconds_to_string(unittest.TestCase): - def test_all(self): - self.assertEqual(milliseconds_to_string(93785000), - "1T02:03:05") - def test_no_days(self): - self.assertEqual(milliseconds_to_string(7385000), "02:03:05") - def test_all_zero(self): - self.assertEqual(milliseconds_to_string(0), "00:00:00") - def test_no_fractional_seconds(self): - self.assertEqual(milliseconds_to_string(400), "00:00:00") - self.assertEqual(milliseconds_to_string(900), "00:00:00") - self.assertEqual(milliseconds_to_string(1900), "00:00:01") - class Test_string_to_delta(unittest.TestCase): def test_handles_basic_rfc3339(self): self.assertEqual(string_to_delta("PT0S"), @@ -831,6 +838,336 @@ self.assertEqual(value, datetime.timedelta(0, 7200)) +class Test_check_option_syntax(unittest.TestCase): + # This mostly corresponds to the definition from has_actions() in + # check_option_syntax() + actions = { + # The actual values set here are not that important, but we do + # at least stick to the correct types, even though they are + # never used + "enable": True, + "disable": True, + "bump_timeout": True, + "start_checker": True, + "stop_checker": True, + "is_enabled": True, + "remove": True, + "checker": "x", + "timeout": datetime.timedelta(), + "extended_timeout": datetime.timedelta(), + "interval": datetime.timedelta(), + "approved_by_default": True, + "approval_delay": datetime.timedelta(), + "approval_duration": datetime.timedelta(), + "host": "x", + "secret": io.BytesIO(b"x"), + "approve": True, + "deny": True, + } + + def setUp(self): + self.parser = argparse.ArgumentParser() + add_command_line_options(self.parser) + + @contextlib.contextmanager + def assertParseError(self): + with self.assertRaises(SystemExit) as e: + with self.temporarily_suppress_stderr(): + yield + # Exit code from argparse is guaranteed to be "2". Reference: + # https://docs.python.org/3/library + # /argparse.html#exiting-methods + self.assertEqual(e.exception.code, 2) + + @staticmethod + @contextlib.contextmanager + def temporarily_suppress_stderr(): + null = os.open(os.path.devnull, os.O_RDWR) + stderrcopy = os.dup(sys.stderr.fileno()) + os.dup2(null, sys.stderr.fileno()) + os.close(null) + try: + yield + finally: + # restore stderr + os.dup2(stderrcopy, sys.stderr.fileno()) + os.close(stderrcopy) + + def check_option_syntax(self, options): + check_option_syntax(self.parser, options) + + def test_actions_requires_client_or_all(self): + for action, value in self.actions.items(): + options = self.parser.parse_args() + setattr(options, action, value) + with self.assertParseError(): + self.check_option_syntax(options) + + def test_actions_conflicts_with_verbose(self): + for action, value in self.actions.items(): + options = self.parser.parse_args() + setattr(options, action, value) + options.verbose = True + with self.assertParseError(): + self.check_option_syntax(options) + + def test_dump_json_conflicts_with_verbose(self): + options = self.parser.parse_args() + options.dump_json = True + options.verbose = True + with self.assertParseError(): + self.check_option_syntax(options) + + def test_dump_json_conflicts_with_action(self): + for action, value in self.actions.items(): + options = self.parser.parse_args() + setattr(options, action, value) + options.dump_json = True + with self.assertParseError(): + self.check_option_syntax(options) + + def test_all_can_not_be_alone(self): + options = self.parser.parse_args() + options.all = True + with self.assertParseError(): + self.check_option_syntax(options) + + def test_all_is_ok_with_any_action(self): + for action, value in self.actions.items(): + options = self.parser.parse_args() + setattr(options, action, value) + options.all = True + self.check_option_syntax(options) + + def test_is_enabled_fails_without_client(self): + options = self.parser.parse_args() + options.is_enabled = True + with self.assertParseError(): + self.check_option_syntax(options) + + def test_is_enabled_works_with_one_client(self): + options = self.parser.parse_args() + options.is_enabled = True + options.client = ["foo"] + self.check_option_syntax(options) + + def test_is_enabled_fails_with_two_clients(self): + options = self.parser.parse_args() + options.is_enabled = True + options.client = ["foo", "barbar"] + with self.assertParseError(): + self.check_option_syntax(options) + + def test_remove_can_only_be_combined_with_action_deny(self): + for action, value in self.actions.items(): + if action in {"remove", "deny"}: + continue + options = self.parser.parse_args() + setattr(options, action, value) + options.all = True + options.remove = True + with self.assertParseError(): + self.check_option_syntax(options) + + +class Test_command_from_options(unittest.TestCase): + def setUp(self): + self.parser = argparse.ArgumentParser() + add_command_line_options(self.parser) + def assert_command_from_args(self, args, command_cls, + **cmd_attrs): + """Assert that parsing ARGS should result in an instance of +COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS).""" + options = self.parser.parse_args(args) + check_option_syntax(self.parser, options) + commands = commands_from_options(options) + self.assertEqual(len(commands), 1) + command = commands[0] + self.assertIsInstance(command, command_cls) + for key, value in cmd_attrs.items(): + self.assertEqual(getattr(command, key), value) + def test_print_table(self): + self.assert_command_from_args([], PrintTableCmd, + verbose=False) + + def test_print_table_verbose(self): + self.assert_command_from_args(["--verbose"], PrintTableCmd, + verbose=True) + + def test_print_table_verbose_short(self): + self.assert_command_from_args(["-v"], PrintTableCmd, + verbose=True) + + def test_enable(self): + self.assert_command_from_args(["--enable", "foo"], EnableCmd) + + def test_enable_short(self): + self.assert_command_from_args(["-e", "foo"], EnableCmd) + + def test_disable(self): + self.assert_command_from_args(["--disable", "foo"], + DisableCmd) + + def test_disable_short(self): + self.assert_command_from_args(["-d", "foo"], DisableCmd) + + def test_bump_timeout(self): + self.assert_command_from_args(["--bump-timeout", "foo"], + BumpTimeoutCmd) + + def test_bump_timeout_short(self): + self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd) + + def test_start_checker(self): + self.assert_command_from_args(["--start-checker", "foo"], + StartCheckerCmd) + + def test_stop_checker(self): + self.assert_command_from_args(["--stop-checker", "foo"], + StopCheckerCmd) + + def test_remove(self): + self.assert_command_from_args(["--remove", "foo"], + RemoveCmd) + + def test_remove_short(self): + self.assert_command_from_args(["-r", "foo"], RemoveCmd) + + def test_checker(self): + self.assert_command_from_args(["--checker", ":", "foo"], + SetCheckerCmd, value_to_set=":") + + def test_checker_empty(self): + self.assert_command_from_args(["--checker", "", "foo"], + SetCheckerCmd, value_to_set="") + + def test_checker_short(self): + self.assert_command_from_args(["-c", ":", "foo"], + SetCheckerCmd, value_to_set=":") + + def test_timeout(self): + self.assert_command_from_args(["--timeout", "PT5M", "foo"], + SetTimeoutCmd, + value_to_set=300000) + + def test_timeout_short(self): + self.assert_command_from_args(["-t", "PT5M", "foo"], + SetTimeoutCmd, + value_to_set=300000) + + def test_extended_timeout(self): + self.assert_command_from_args(["--extended-timeout", "PT15M", + "foo"], + SetExtendedTimeoutCmd, + value_to_set=900000) + + def test_interval(self): + self.assert_command_from_args(["--interval", "PT2M", "foo"], + SetIntervalCmd, + value_to_set=120000) + + def test_interval_short(self): + self.assert_command_from_args(["-i", "PT2M", "foo"], + SetIntervalCmd, + value_to_set=120000) + + def test_approve_by_default(self): + self.assert_command_from_args(["--approve-by-default", "foo"], + ApproveByDefaultCmd) + + def test_deny_by_default(self): + self.assert_command_from_args(["--deny-by-default", "foo"], + DenyByDefaultCmd) + + def test_approval_delay(self): + self.assert_command_from_args(["--approval-delay", "PT30S", + "foo"], SetApprovalDelayCmd, + value_to_set=30000) + + def test_approval_duration(self): + self.assert_command_from_args(["--approval-duration", "PT1S", + "foo"], SetApprovalDurationCmd, + value_to_set=1000) + + def test_host(self): + self.assert_command_from_args(["--host", "foo.example.org", + "foo"], SetHostCmd, + value_to_set="foo.example.org") + + def test_host_short(self): + self.assert_command_from_args(["-H", "foo.example.org", + "foo"], SetHostCmd, + value_to_set="foo.example.org") + + def test_secret_devnull(self): + self.assert_command_from_args(["--secret", os.path.devnull, + "foo"], SetSecretCmd, + value_to_set=b"") + + def test_secret_tempfile(self): + with tempfile.NamedTemporaryFile(mode="r+b") as f: + value = b"secret\0xyzzy\nbar" + f.write(value) + f.seek(0) + self.assert_command_from_args(["--secret", f.name, + "foo"], SetSecretCmd, + value_to_set=value) + + def test_secret_devnull_short(self): + self.assert_command_from_args(["-s", os.path.devnull, "foo"], + SetSecretCmd, value_to_set=b"") + + def test_secret_tempfile_short(self): + with tempfile.NamedTemporaryFile(mode="r+b") as f: + value = b"secret\0xyzzy\nbar" + f.write(value) + f.seek(0) + self.assert_command_from_args(["-s", f.name, "foo"], + SetSecretCmd, + value_to_set=value) + + def test_approve(self): + self.assert_command_from_args(["--approve", "foo"], + ApproveCmd) + + def test_approve_short(self): + self.assert_command_from_args(["-A", "foo"], ApproveCmd) + + def test_deny(self): + self.assert_command_from_args(["--deny", "foo"], DenyCmd) + + def test_deny_short(self): + self.assert_command_from_args(["-D", "foo"], DenyCmd) + + def test_dump_json(self): + self.assert_command_from_args(["--dump-json"], DumpJSONCmd) + + def test_is_enabled(self): + self.assert_command_from_args(["--is-enabled", "foo"], + IsEnabledCmd) + + def test_is_enabled_short(self): + self.assert_command_from_args(["-V", "foo"], IsEnabledCmd) + + def test_deny_before_remove(self): + options = self.parser.parse_args(["--deny", "--remove", + "foo"]) + check_option_syntax(self.parser, options) + commands = commands_from_options(options) + self.assertEqual(len(commands), 2) + self.assertIsInstance(commands[0], DenyCmd) + self.assertIsInstance(commands[1], RemoveCmd) + + def test_deny_before_remove_reversed(self): + options = self.parser.parse_args(["--remove", "--deny", + "--all"]) + check_option_syntax(self.parser, options) + commands = commands_from_options(options) + self.assertEqual(len(commands), 2) + self.assertIsInstance(commands[0], DenyCmd) + self.assertIsInstance(commands[1], RemoveCmd) + + class TestCmd(unittest.TestCase): """Abstract class for tests of command classes""" def setUp(self): @@ -922,6 +1259,126 @@ }[path] return Bus() + +class TestIsEnabledCmd(TestCmd): + def test_is_enabled(self): + self.assertTrue(all(IsEnabledCmd().is_enabled(client, + properties) + for client, properties + in self.clients.items())) + def test_is_enabled_run_exits_successfully(self): + with self.assertRaises(SystemExit) as e: + IsEnabledCmd().run(self.one_client) + if e.exception.code is not None: + self.assertEqual(e.exception.code, 0) + else: + self.assertIsNone(e.exception.code) + def test_is_enabled_run_exits_with_failure(self): + self.client.attributes["Enabled"] = dbus.Boolean(False) + with self.assertRaises(SystemExit) as e: + IsEnabledCmd().run(self.one_client) + if isinstance(e.exception.code, int): + self.assertNotEqual(e.exception.code, 0) + else: + self.assertIsNotNone(e.exception.code) + + +class TestApproveCmd(TestCmd): + def test_approve(self): + ApproveCmd().run(self.clients, self.bus) + for clientpath in self.clients: + client = self.bus.get_object(dbus_busname, clientpath) + self.assertIn(("Approve", (True, client_dbus_interface)), + client.calls) + + +class TestDenyCmd(TestCmd): + def test_deny(self): + DenyCmd().run(self.clients, self.bus) + for clientpath in self.clients: + client = self.bus.get_object(dbus_busname, clientpath) + self.assertIn(("Approve", (False, client_dbus_interface)), + client.calls) + +class TestRemoveCmd(TestCmd): + def test_remove(self): + class MockMandos(object): + def __init__(self): + self.calls = [] + def RemoveClient(self, dbus_path): + self.calls.append(("RemoveClient", (dbus_path,))) + mandos = MockMandos() + super(TestRemoveCmd, self).setUp() + RemoveCmd().run(self.clients, self.bus, mandos) + self.assertEqual(len(mandos.calls), 2) + for clientpath in self.clients: + self.assertIn(("RemoveClient", (clientpath,)), + mandos.calls) + + +class TestDumpJSONCmd(TestCmd): + def setUp(self): + self.expected_json = { + "foo": { + "Name": "foo", + "KeyID": ("92ed150794387c03ce684574b1139a65" + "94a34f895daaaf09fd8ea90a27cddb12"), + "Host": "foo.example.org", + "Enabled": True, + "Timeout": 300000, + "LastCheckedOK": "2019-02-03T00:00:00", + "Created": "2019-01-02T00:00:00", + "Interval": 120000, + "Fingerprint": ("778827225BA7DE539C5A" + "7CFA59CFF7CDBD9A5920"), + "CheckerRunning": False, + "LastEnabled": "2019-01-03T00:00:00", + "ApprovalPending": False, + "ApprovedByDefault": True, + "LastApprovalRequest": "", + "ApprovalDelay": 0, + "ApprovalDuration": 1000, + "Checker": "fping -q -- %(host)s", + "ExtendedTimeout": 900000, + "Expires": "2019-02-04T00:00:00", + "LastCheckerStatus": 0, + }, + "barbar": { + "Name": "barbar", + "KeyID": ("0558568eedd67d622f5c83b35a115f79" + "6ab612cff5ad227247e46c2b020f441c"), + "Host": "192.0.2.3", + "Enabled": True, + "Timeout": 300000, + "LastCheckedOK": "2019-02-04T00:00:00", + "Created": "2019-01-03T00:00:00", + "Interval": 120000, + "Fingerprint": ("3E393AEAEFB84C7E89E2" + "F547B3A107558FCA3A27"), + "CheckerRunning": True, + "LastEnabled": "2019-01-04T00:00:00", + "ApprovalPending": False, + "ApprovedByDefault": False, + "LastApprovalRequest": "2019-01-03T00:00:00", + "ApprovalDelay": 30000, + "ApprovalDuration": 1000, + "Checker": ":", + "ExtendedTimeout": 900000, + "Expires": "2019-02-05T00:00:00", + "LastCheckerStatus": -2, + }, + } + return super(TestDumpJSONCmd, self).setUp() + def test_normal(self): + json_data = json.loads(DumpJSONCmd().output(self.clients)) + self.assertDictEqual(json_data, self.expected_json) + def test_one_client(self): + clients = self.one_client + json_data = json.loads(DumpJSONCmd().output(clients)) + expected_json = {"foo": self.expected_json["foo"]} + self.assertDictEqual(json_data, expected_json) + + class TestPrintTableCmd(TestCmd): def test_normal(self): output = PrintTableCmd().output(self.clients.values()) @@ -1033,144 +1490,26 @@ """[1:-1] self.assertEqual(output, expected_output) -class TestDumpJSONCmd(TestCmd): - def setUp(self): - self.expected_json = { - "foo": { - "Name": "foo", - "KeyID": ("92ed150794387c03ce684574b1139a65" - "94a34f895daaaf09fd8ea90a27cddb12"), - "Host": "foo.example.org", - "Enabled": True, - "Timeout": 300000, - "LastCheckedOK": "2019-02-03T00:00:00", - "Created": "2019-01-02T00:00:00", - "Interval": 120000, - "Fingerprint": ("778827225BA7DE539C5A" - "7CFA59CFF7CDBD9A5920"), - "CheckerRunning": False, - "LastEnabled": "2019-01-03T00:00:00", - "ApprovalPending": False, - "ApprovedByDefault": True, - "LastApprovalRequest": "", - "ApprovalDelay": 0, - "ApprovalDuration": 1000, - "Checker": "fping -q -- %(host)s", - "ExtendedTimeout": 900000, - "Expires": "2019-02-04T00:00:00", - "LastCheckerStatus": 0, - }, - "barbar": { - "Name": "barbar", - "KeyID": ("0558568eedd67d622f5c83b35a115f79" - "6ab612cff5ad227247e46c2b020f441c"), - "Host": "192.0.2.3", - "Enabled": True, - "Timeout": 300000, - "LastCheckedOK": "2019-02-04T00:00:00", - "Created": "2019-01-03T00:00:00", - "Interval": 120000, - "Fingerprint": ("3E393AEAEFB84C7E89E2" - "F547B3A107558FCA3A27"), - "CheckerRunning": True, - "LastEnabled": "2019-01-04T00:00:00", - "ApprovalPending": False, - "ApprovedByDefault": False, - "LastApprovalRequest": "2019-01-03T00:00:00", - "ApprovalDelay": 30000, - "ApprovalDuration": 1000, - "Checker": ":", - "ExtendedTimeout": 900000, - "Expires": "2019-02-05T00:00:00", - "LastCheckerStatus": -2, - }, - } - return super(TestDumpJSONCmd, self).setUp() - def test_normal(self): - json_data = json.loads(DumpJSONCmd().output(self.clients)) - self.assertDictEqual(json_data, self.expected_json) - def test_one_client(self): - clients = self.one_client - json_data = json.loads(DumpJSONCmd().output(clients)) - expected_json = {"foo": self.expected_json["foo"]} - self.assertDictEqual(json_data, expected_json) - -class TestIsEnabledCmd(TestCmd): - def test_is_enabled(self): - self.assertTrue(all(IsEnabledCmd().is_enabled(client, - properties) - for client, properties - in self.clients.items())) - def test_is_enabled_run_exits_successfully(self): - with self.assertRaises(SystemExit) as e: - IsEnabledCmd().run(self.one_client) - if e.exception.code is not None: - self.assertEqual(e.exception.code, 0) - else: - self.assertIsNone(e.exception.code) - def test_is_enabled_run_exits_with_failure(self): - self.client.attributes["Enabled"] = dbus.Boolean(False) - with self.assertRaises(SystemExit) as e: - IsEnabledCmd().run(self.one_client) - if isinstance(e.exception.code, int): - self.assertNotEqual(e.exception.code, 0) - else: - self.assertIsNotNone(e.exception.code) - -class TestRemoveCmd(TestCmd): - def test_remove(self): - class MockMandos(object): - def __init__(self): - self.calls = [] - def RemoveClient(self, dbus_path): - self.calls.append(("RemoveClient", (dbus_path,))) - mandos = MockMandos() - super(TestRemoveCmd, self).setUp() - RemoveCmd().run(self.clients, self.bus, mandos) - self.assertEqual(len(mandos.calls), 2) - for clientpath in self.clients: - self.assertIn(("RemoveClient", (clientpath,)), - mandos.calls) - -class TestApproveCmd(TestCmd): - def test_approve(self): - ApproveCmd().run(self.clients, self.bus) - for clientpath in self.clients: - client = self.bus.get_object(dbus_busname, clientpath) - self.assertIn(("Approve", (True, client_dbus_interface)), - client.calls) - -class TestDenyCmd(TestCmd): - def test_deny(self): - DenyCmd().run(self.clients, self.bus) - for clientpath in self.clients: - client = self.bus.get_object(dbus_busname, clientpath) - self.assertIn(("Approve", (False, client_dbus_interface)), - client.calls) - -class TestEnableCmd(TestCmd): - def test_enable(self): - for clientpath in self.clients: - client = self.bus.get_object(dbus_busname, clientpath) - client.attributes["Enabled"] = False - - EnableCmd().run(self.clients, self.bus) - - for clientpath in self.clients: - client = self.bus.get_object(dbus_busname, clientpath) - self.assertTrue(client.attributes["Enabled"]) - -class TestDisableCmd(TestCmd): - def test_disable(self): - DisableCmd().run(self.clients, self.bus) - for clientpath in self.clients: - client = self.bus.get_object(dbus_busname, clientpath) - self.assertFalse(client.attributes["Enabled"]) + +class Test_milliseconds_to_string(unittest.TestCase): + def test_all(self): + self.assertEqual(milliseconds_to_string(93785000), + "1T02:03:05") + def test_no_days(self): + self.assertEqual(milliseconds_to_string(7385000), "02:03:05") + def test_all_zero(self): + self.assertEqual(milliseconds_to_string(0), "00:00:00") + def test_no_fractional_seconds(self): + self.assertEqual(milliseconds_to_string(400), "00:00:00") + self.assertEqual(milliseconds_to_string(900), "00:00:00") + self.assertEqual(milliseconds_to_string(1900), "00:00:01") + class Unique(object): """Class for objects which exist only to be unique objects, since unittest.mock.sentinel only exists in Python 3.3""" + class TestPropertyCmd(TestCmd): """Abstract class for tests of PropertyCmd classes""" def runTest(self): @@ -1194,31 +1533,58 @@ def run_command(self, value, clients): self.command().run(clients, self.bus) + +class TestEnableCmd(TestCmd): + def test_enable(self): + for clientpath in self.clients: + client = self.bus.get_object(dbus_busname, clientpath) + client.attributes["Enabled"] = False + + EnableCmd().run(self.clients, self.bus) + + for clientpath in self.clients: + client = self.bus.get_object(dbus_busname, clientpath) + self.assertTrue(client.attributes["Enabled"]) + + +class TestDisableCmd(TestCmd): + def test_disable(self): + DisableCmd().run(self.clients, self.bus) + for clientpath in self.clients: + client = self.bus.get_object(dbus_busname, clientpath) + self.assertFalse(client.attributes["Enabled"]) + + class TestBumpTimeoutCmd(TestPropertyCmd): command = BumpTimeoutCmd propname = "LastCheckedOK" values_to_set = [""] + class TestStartCheckerCmd(TestPropertyCmd): command = StartCheckerCmd propname = "CheckerRunning" values_to_set = [dbus.Boolean(True)] + class TestStopCheckerCmd(TestPropertyCmd): command = StopCheckerCmd propname = "CheckerRunning" values_to_set = [dbus.Boolean(False)] + class TestApproveByDefaultCmd(TestPropertyCmd): command = ApproveByDefaultCmd propname = "ApprovedByDefault" values_to_set = [dbus.Boolean(True)] + class TestDenyByDefaultCmd(TestPropertyCmd): command = DenyByDefaultCmd propname = "ApprovedByDefault" values_to_set = [dbus.Boolean(False)] + class TestPropertyValueCmd(TestPropertyCmd): """Abstract class for tests of PropertyValueCmd classes""" def runTest(self): @@ -1228,16 +1594,19 @@ def run_command(self, value, clients): self.command(value).run(clients, self.bus) + class TestSetCheckerCmd(TestPropertyValueCmd): command = SetCheckerCmd propname = "Checker" values_to_set = ["", ":", "fping -q -- %s"] + class TestSetHostCmd(TestPropertyValueCmd): command = SetHostCmd propname = "Host" values_to_set = ["192.0.2.3", "foo.example.org"] + class TestSetSecretCmd(TestPropertyValueCmd): command = SetSecretCmd propname = "Secret" @@ -1245,6 +1614,7 @@ io.BytesIO(b"secret\0xyzzy\nbar")] values_to_get = [b"", b"secret\0xyzzy\nbar"] + class TestSetTimeoutCmd(TestPropertyValueCmd): command = SetTimeoutCmd propname = "Timeout" @@ -1255,6 +1625,7 @@ datetime.timedelta(weeks=52)] values_to_get = [0, 300000, 1000, 604800000, 31449600000] + class TestSetExtendedTimeoutCmd(TestPropertyValueCmd): command = SetExtendedTimeoutCmd propname = "ExtendedTimeout" @@ -1265,6 +1636,7 @@ datetime.timedelta(weeks=52)] values_to_get = [0, 300000, 1000, 604800000, 31449600000] + class TestSetIntervalCmd(TestPropertyValueCmd): command = SetIntervalCmd propname = "Interval" @@ -1275,6 +1647,7 @@ datetime.timedelta(weeks=52)] values_to_get = [0, 300000, 1000, 604800000, 31449600000] + class TestSetApprovalDelayCmd(TestPropertyValueCmd): command = SetApprovalDelayCmd propname = "ApprovalDelay" @@ -1285,6 +1658,7 @@ datetime.timedelta(weeks=52)] values_to_get = [0, 300000, 1000, 604800000, 31449600000] + class TestSetApprovalDurationCmd(TestPropertyValueCmd): command = SetApprovalDurationCmd propname = "ApprovalDuration" @@ -1295,335 +1669,6 @@ datetime.timedelta(weeks=52)] values_to_get = [0, 300000, 1000, 604800000, 31449600000] -class Test_command_from_options(unittest.TestCase): - def setUp(self): - self.parser = argparse.ArgumentParser() - add_command_line_options(self.parser) - def assert_command_from_args(self, args, command_cls, - **cmd_attrs): - """Assert that parsing ARGS should result in an instance of -COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS).""" - options = self.parser.parse_args(args) - check_option_syntax(self.parser, options) - commands = commands_from_options(options) - self.assertEqual(len(commands), 1) - command = commands[0] - self.assertIsInstance(command, command_cls) - for key, value in cmd_attrs.items(): - self.assertEqual(getattr(command, key), value) - def test_print_table(self): - self.assert_command_from_args([], PrintTableCmd, - verbose=False) - - def test_print_table_verbose(self): - self.assert_command_from_args(["--verbose"], PrintTableCmd, - verbose=True) - - def test_print_table_verbose_short(self): - self.assert_command_from_args(["-v"], PrintTableCmd, - verbose=True) - - def test_enable(self): - self.assert_command_from_args(["--enable", "foo"], EnableCmd) - - def test_enable_short(self): - self.assert_command_from_args(["-e", "foo"], EnableCmd) - - def test_disable(self): - self.assert_command_from_args(["--disable", "foo"], - DisableCmd) - - def test_disable_short(self): - self.assert_command_from_args(["-d", "foo"], DisableCmd) - - def test_bump_timeout(self): - self.assert_command_from_args(["--bump-timeout", "foo"], - BumpTimeoutCmd) - - def test_bump_timeout_short(self): - self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd) - - def test_start_checker(self): - self.assert_command_from_args(["--start-checker", "foo"], - StartCheckerCmd) - - def test_stop_checker(self): - self.assert_command_from_args(["--stop-checker", "foo"], - StopCheckerCmd) - - def test_remove(self): - self.assert_command_from_args(["--remove", "foo"], - RemoveCmd) - - def test_remove_short(self): - self.assert_command_from_args(["-r", "foo"], RemoveCmd) - - def test_checker(self): - self.assert_command_from_args(["--checker", ":", "foo"], - SetCheckerCmd, value_to_set=":") - - def test_checker_empty(self): - self.assert_command_from_args(["--checker", "", "foo"], - SetCheckerCmd, value_to_set="") - - def test_checker_short(self): - self.assert_command_from_args(["-c", ":", "foo"], - SetCheckerCmd, value_to_set=":") - - def test_timeout(self): - self.assert_command_from_args(["--timeout", "PT5M", "foo"], - SetTimeoutCmd, - value_to_set=300000) - - def test_timeout_short(self): - self.assert_command_from_args(["-t", "PT5M", "foo"], - SetTimeoutCmd, - value_to_set=300000) - - def test_extended_timeout(self): - self.assert_command_from_args(["--extended-timeout", "PT15M", - "foo"], - SetExtendedTimeoutCmd, - value_to_set=900000) - - def test_interval(self): - self.assert_command_from_args(["--interval", "PT2M", "foo"], - SetIntervalCmd, - value_to_set=120000) - - def test_interval_short(self): - self.assert_command_from_args(["-i", "PT2M", "foo"], - SetIntervalCmd, - value_to_set=120000) - - def test_approve_by_default(self): - self.assert_command_from_args(["--approve-by-default", "foo"], - ApproveByDefaultCmd) - - def test_deny_by_default(self): - self.assert_command_from_args(["--deny-by-default", "foo"], - DenyByDefaultCmd) - - def test_approval_delay(self): - self.assert_command_from_args(["--approval-delay", "PT30S", - "foo"], SetApprovalDelayCmd, - value_to_set=30000) - - def test_approval_duration(self): - self.assert_command_from_args(["--approval-duration", "PT1S", - "foo"], SetApprovalDurationCmd, - value_to_set=1000) - - def test_host(self): - self.assert_command_from_args(["--host", "foo.example.org", - "foo"], SetHostCmd, - value_to_set="foo.example.org") - - def test_host_short(self): - self.assert_command_from_args(["-H", "foo.example.org", - "foo"], SetHostCmd, - value_to_set="foo.example.org") - - def test_secret_devnull(self): - self.assert_command_from_args(["--secret", os.path.devnull, - "foo"], SetSecretCmd, - value_to_set=b"") - - def test_secret_tempfile(self): - with tempfile.NamedTemporaryFile(mode="r+b") as f: - value = b"secret\0xyzzy\nbar" - f.write(value) - f.seek(0) - self.assert_command_from_args(["--secret", f.name, - "foo"], SetSecretCmd, - value_to_set=value) - - def test_secret_devnull_short(self): - self.assert_command_from_args(["-s", os.path.devnull, "foo"], - SetSecretCmd, value_to_set=b"") - - def test_secret_tempfile_short(self): - with tempfile.NamedTemporaryFile(mode="r+b") as f: - value = b"secret\0xyzzy\nbar" - f.write(value) - f.seek(0) - self.assert_command_from_args(["-s", f.name, "foo"], - SetSecretCmd, - value_to_set=value) - - def test_approve(self): - self.assert_command_from_args(["--approve", "foo"], - ApproveCmd) - - def test_approve_short(self): - self.assert_command_from_args(["-A", "foo"], ApproveCmd) - - def test_deny(self): - self.assert_command_from_args(["--deny", "foo"], DenyCmd) - - def test_deny_short(self): - self.assert_command_from_args(["-D", "foo"], DenyCmd) - - def test_dump_json(self): - self.assert_command_from_args(["--dump-json"], DumpJSONCmd) - - def test_is_enabled(self): - self.assert_command_from_args(["--is-enabled", "foo"], - IsEnabledCmd) - - def test_is_enabled_short(self): - self.assert_command_from_args(["-V", "foo"], IsEnabledCmd) - - def test_deny_before_remove(self): - options = self.parser.parse_args(["--deny", "--remove", - "foo"]) - check_option_syntax(self.parser, options) - commands = commands_from_options(options) - self.assertEqual(len(commands), 2) - self.assertIsInstance(commands[0], DenyCmd) - self.assertIsInstance(commands[1], RemoveCmd) - - def test_deny_before_remove_reversed(self): - options = self.parser.parse_args(["--remove", "--deny", - "--all"]) - check_option_syntax(self.parser, options) - commands = commands_from_options(options) - self.assertEqual(len(commands), 2) - self.assertIsInstance(commands[0], DenyCmd) - self.assertIsInstance(commands[1], RemoveCmd) - - -class Test_check_option_syntax(unittest.TestCase): - # This mostly corresponds to the definition from has_actions() in - # check_option_syntax() - actions = { - # The actual values set here are not that important, but we do - # at least stick to the correct types, even though they are - # never used - "enable": True, - "disable": True, - "bump_timeout": True, - "start_checker": True, - "stop_checker": True, - "is_enabled": True, - "remove": True, - "checker": "x", - "timeout": datetime.timedelta(), - "extended_timeout": datetime.timedelta(), - "interval": datetime.timedelta(), - "approved_by_default": True, - "approval_delay": datetime.timedelta(), - "approval_duration": datetime.timedelta(), - "host": "x", - "secret": io.BytesIO(b"x"), - "approve": True, - "deny": True, - } - - def setUp(self): - self.parser = argparse.ArgumentParser() - add_command_line_options(self.parser) - - @contextlib.contextmanager - def assertParseError(self): - with self.assertRaises(SystemExit) as e: - with self.temporarily_suppress_stderr(): - yield - # Exit code from argparse is guaranteed to be "2". Reference: - # https://docs.python.org/3/library - # /argparse.html#exiting-methods - self.assertEqual(e.exception.code, 2) - - @staticmethod - @contextlib.contextmanager - def temporarily_suppress_stderr(): - null = os.open(os.path.devnull, os.O_RDWR) - stderrcopy = os.dup(sys.stderr.fileno()) - os.dup2(null, sys.stderr.fileno()) - os.close(null) - try: - yield - finally: - # restore stderr - os.dup2(stderrcopy, sys.stderr.fileno()) - os.close(stderrcopy) - - def check_option_syntax(self, options): - check_option_syntax(self.parser, options) - - def test_actions_requires_client_or_all(self): - for action, value in self.actions.items(): - options = self.parser.parse_args() - setattr(options, action, value) - with self.assertParseError(): - self.check_option_syntax(options) - - def test_actions_conflicts_with_verbose(self): - for action, value in self.actions.items(): - options = self.parser.parse_args() - setattr(options, action, value) - options.verbose = True - with self.assertParseError(): - self.check_option_syntax(options) - - def test_dump_json_conflicts_with_verbose(self): - options = self.parser.parse_args() - options.dump_json = True - options.verbose = True - with self.assertParseError(): - self.check_option_syntax(options) - - def test_dump_json_conflicts_with_action(self): - for action, value in self.actions.items(): - options = self.parser.parse_args() - setattr(options, action, value) - options.dump_json = True - with self.assertParseError(): - self.check_option_syntax(options) - - def test_all_can_not_be_alone(self): - options = self.parser.parse_args() - options.all = True - with self.assertParseError(): - self.check_option_syntax(options) - - def test_all_is_ok_with_any_action(self): - for action, value in self.actions.items(): - options = self.parser.parse_args() - setattr(options, action, value) - options.all = True - self.check_option_syntax(options) - - def test_is_enabled_fails_without_client(self): - options = self.parser.parse_args() - options.is_enabled = True - with self.assertParseError(): - self.check_option_syntax(options) - - def test_is_enabled_works_with_one_client(self): - options = self.parser.parse_args() - options.is_enabled = True - options.client = ["foo"] - self.check_option_syntax(options) - - def test_is_enabled_fails_with_two_clients(self): - options = self.parser.parse_args() - options.is_enabled = True - options.client = ["foo", "barbar"] - with self.assertParseError(): - self.check_option_syntax(options) - - def test_remove_can_only_be_combined_with_action_deny(self): - for action, value in self.actions.items(): - if action in {"remove", "deny"}: - continue - options = self.parser.parse_args() - setattr(options, action, value) - options.all = True - options.remove = True - with self.assertParseError(): - self.check_option_syntax(options) - def should_only_run_tests():