=== modified file 'mandos-ctl' --- mandos-ctl 2019-03-30 21:23:59 +0000 +++ mandos-ctl 2019-03-31 04:39:25 +0000 @@ -122,57 +122,98 @@ help="Select all clients") parser.add_argument("-v", "--verbose", action="store_true", help="Print all fields") - parser.add_argument("-j", "--dump-json", action="store_true", + parser.add_argument("-j", "--dump-json", dest="commands", + action="append_const", default=[], + const=command.DumpJSON(), help="Dump client data in JSON format") enable_disable = parser.add_mutually_exclusive_group() - enable_disable.add_argument("-e", "--enable", action="store_true", + enable_disable.add_argument("-e", "--enable", dest="commands", + action="append_const", default=[], + const=command.Enable(), help="Enable client") - enable_disable.add_argument("-d", "--disable", - action="store_true", + enable_disable.add_argument("-d", "--disable", dest="commands", + action="append_const", default=[], + const=command.Disable(), help="disable client") - parser.add_argument("-b", "--bump-timeout", action="store_true", + parser.add_argument("-b", "--bump-timeout", dest="commands", + action="append_const", default=[], + const=command.BumpTimeout(), help="Bump timeout for client") start_stop_checker = parser.add_mutually_exclusive_group() start_stop_checker.add_argument("--start-checker", - action="store_true", + dest="commands", + action="append_const", default=[], + const=command.StartChecker(), help="Start checker for client") - start_stop_checker.add_argument("--stop-checker", - action="store_true", + start_stop_checker.add_argument("--stop-checker", dest="commands", + action="append_const", default=[], + const=command.StopChecker(), help="Stop checker for client") - parser.add_argument("-V", "--is-enabled", action="store_true", + parser.add_argument("-V", "--is-enabled", dest="commands", + action="append_const", default=[], + const=command.IsEnabled(), help="Check if client is enabled") - parser.add_argument("-r", "--remove", action="store_true", + parser.add_argument("-r", "--remove", dest="commands", + action="append_const", default=[], + const=command.Remove(), help="Remove client") - parser.add_argument("-c", "--checker", + parser.add_argument("-c", "--checker", dest="commands", + action="append", default=[], + metavar="COMMAND", type=command.SetChecker, help="Set checker command for client") - parser.add_argument("-t", "--timeout", type=string_to_delta, - help="Set timeout for client") - parser.add_argument("--extended-timeout", type=string_to_delta, - help="Set extended timeout for client") - parser.add_argument("-i", "--interval", type=string_to_delta, - help="Set checker interval for client") + parser.add_argument( + "-t", "--timeout", dest="commands", action="append", + default=[], metavar="TIME", + type=command.SetTimeout.argparse(string_to_delta), + help="Set timeout for client") + parser.add_argument( + "--extended-timeout", dest="commands", action="append", + default=[], metavar="TIME", + type=command.SetExtendedTimeout.argparse(string_to_delta), + help="Set extended timeout for client") + parser.add_argument( + "-i", "--interval", dest="commands", action="append", + default=[], metavar="TIME", + type=command.SetInterval.argparse(string_to_delta), + help="Set checker interval for client") approve_deny_default = parser.add_mutually_exclusive_group() approve_deny_default.add_argument( - "--approve-by-default", action="store_true", - default=None, dest="approved_by_default", + "--approve-by-default", dest="commands", + action="append_const", default=[], + const=command.ApproveByDefault(), help="Set client to be approved by default") approve_deny_default.add_argument( - "--deny-by-default", action="store_false", - dest="approved_by_default", + "--deny-by-default", dest="commands", + action="append_const", default=[], + const=command.DenyByDefault(), help="Set client to be denied by default") - parser.add_argument("--approval-delay", type=string_to_delta, - help="Set delay before client approve/deny") - parser.add_argument("--approval-duration", type=string_to_delta, - help="Set duration of one client approval") - parser.add_argument("-H", "--host", help="Set host for client") - parser.add_argument("-s", "--secret", - type=argparse.FileType(mode="rb"), - help="Set password blob (file) for client") + parser.add_argument( + "--approval-delay", dest="commands", action="append", + default=[], metavar="TIME", + type=command.SetApprovalDelay.argparse(string_to_delta), + help="Set delay before client approve/deny") + parser.add_argument( + "--approval-duration", dest="commands", action="append", + default=[], metavar="TIME", + type=command.SetApprovalDuration.argparse(string_to_delta), + help="Set duration of one client approval") + parser.add_argument("-H", "--host", dest="commands", + action="append", default=[], metavar="STRING", + type=command.SetHost, + help="Set host for client") + parser.add_argument( + "-s", "--secret", dest="commands", action="append", + default=[], metavar="FILENAME", + type=command.SetSecret.argparse(argparse.FileType(mode="rb")), + help="Set password blob (file) for client") approve_deny = parser.add_mutually_exclusive_group() approve_deny.add_argument( - "-A", "--approve", action="store_true", + "-A", "--approve", dest="commands", action="append_const", + default=[], const=command.Approve(), help="Approve any current client request") - approve_deny.add_argument("-D", "--deny", action="store_true", + approve_deny.add_argument("-D", "--deny", dest="commands", + action="append_const", default=[], + const=command.Deny(), help="Deny any current client request") parser.add_argument("--debug", action="store_true", help="Debug mode (show D-Bus commands)") @@ -369,43 +410,46 @@ """Apply additional restrictions on options, not expressible in argparse""" - def has_actions(options): - return any((options.enable, - options.disable, - options.bump_timeout, - options.start_checker, - options.stop_checker, - options.is_enabled, - options.remove, - options.checker is not None, - options.timeout is not None, - options.extended_timeout is not None, - options.interval is not None, - options.approved_by_default is not None, - options.approval_delay is not None, - options.approval_duration is not None, - options.host is not None, - options.secret is not None, - options.approve, - options.deny)) + def has_commands(options, commands=None): + if commands is None: + commands = (command.Enable, + command.Disable, + command.BumpTimeout, + command.StartChecker, + command.StopChecker, + command.IsEnabled, + command.Remove, + command.SetChecker, + command.SetTimeout, + command.SetExtendedTimeout, + command.SetInterval, + command.ApproveByDefault, + command.DenyByDefault, + command.SetApprovalDelay, + command.SetApprovalDuration, + command.SetHost, + command.SetSecret, + command.Approve, + command.Deny) + return any(isinstance(cmd, commands) + for cmd in options.commands) - if has_actions(options) and not (options.client or options.all): + if has_commands(options) and not (options.client or options.all): parser.error("Options require clients names or --all.") - if options.verbose and has_actions(options): + if options.verbose and has_commands(options): parser.error("--verbose can only be used alone.") - if options.dump_json and (options.verbose - or has_actions(options)): + if (has_commands(options, (command.DumpJSON,)) + and (options.verbose or len(options.commands) > 1)): parser.error("--dump-json can only be used alone.") - if options.all and not has_actions(options): + if options.all and not has_commands(options): parser.error("--all requires an action.") - if options.is_enabled and len(options.client) > 1: + if (has_commands(options, (command.IsEnabled,)) + and len(options.client) > 1): parser.error("--is-enabled requires exactly one client") - if options.remove: - options.remove = False - if has_actions(options) and not options.deny: - parser.error("--remove can only be combined with --deny") - options.remove = True - + if (len(options.commands) > 1 + and has_commands(options, (command.Remove,)) + and not has_commands(options, (command.Deny,))): + parser.error("--remove can only be combined with --deny") class dbus(object): @@ -551,70 +595,28 @@ def commands_from_options(options): - commands = [] - - if options.is_enabled: - commands.append(command.IsEnabled()) - - if options.approve: - commands.append(command.Approve()) - - if options.deny: - commands.append(command.Deny()) - - if options.remove: - commands.append(command.Remove()) - - if options.dump_json: - commands.append(command.DumpJSON()) - - if options.enable: - commands.append(command.Enable()) - - if options.disable: - commands.append(command.Disable()) - - if options.bump_timeout: - commands.append(command.BumpTimeout()) - - if options.start_checker: - commands.append(command.StartChecker()) - - if options.stop_checker: - commands.append(command.StopChecker()) - - if options.approved_by_default is not None: - if options.approved_by_default: - commands.append(command.ApproveByDefault()) + commands = list(options.commands) + + def find_cmd(cmd, commands): + i = 0 + for i, c in enumerate(commands): + if isinstance(c, cmd): + return i + return i+1 + + # If command.Remove is present, move any instances of command.Deny + # to occur ahead of command.Remove. + index_of_remove = find_cmd(command.Remove, commands) + before_remove = commands[:index_of_remove] + after_remove = commands[index_of_remove:] + cleaned_after = [] + for cmd in after_remove: + if isinstance(cmd, command.Deny): + before_remove.append(cmd) else: - commands.append(command.DenyByDefault()) - - if options.checker is not None: - commands.append(command.SetChecker(options.checker)) - - if options.host is not None: - commands.append(command.SetHost(options.host)) - - if options.secret is not None: - commands.append(command.SetSecret(options.secret)) - - if options.timeout is not None: - commands.append(command.SetTimeout(options.timeout)) - - if options.extended_timeout: - commands.append( - command.SetExtendedTimeout(options.extended_timeout)) - - if options.interval is not None: - commands.append(command.SetInterval(options.interval)) - - if options.approval_delay is not None: - commands.append( - command.SetApprovalDelay(options.approval_delay)) - - if options.approval_duration is not None: - commands.append( - command.SetApprovalDuration(options.approval_duration)) + cleaned_after.append(cmd) + if cleaned_after != after_remove: + commands = before_remove + cleaned_after # If no command option has been given, show table of clients, # optionally verbosely @@ -835,6 +837,11 @@ def __init__(self, value): self.value_to_set = value + @classmethod + def argparse(cls, argtype): + def cmdtype(arg): + return cls(argtype(arg)) + return cmdtype class SetChecker(PropertySetterValue): propname = "Checker" @@ -966,37 +973,41 @@ def test_actions_requires_client_or_all(self): for action, value in self.actions.items(): - options = self.parser.parse_args() - setattr(options, action, value) + args = self.actionargs(action, value) with self.assertParseError(): - self.check_option_syntax(options) + self.parse_args(args) - # This mostly corresponds to the definition from has_actions() in + # This mostly corresponds to the definition from has_commands() in # check_option_syntax() actions = { - # The actual values set here are not that important, but we do - # at least stick to the correct types, even though they are - # never used - "enable": True, - "disable": True, - "bump_timeout": True, - "start_checker": True, - "stop_checker": True, - "is_enabled": True, - "remove": True, - "checker": "x", - "timeout": datetime.timedelta(), - "extended_timeout": datetime.timedelta(), - "interval": datetime.timedelta(), - "approved_by_default": True, - "approval_delay": datetime.timedelta(), - "approval_duration": datetime.timedelta(), - "host": "x", - "secret": io.BytesIO(b"x"), - "approve": True, - "deny": True, + "--enable": None, + "--disable": None, + "--bump-timeout": None, + "--start-checker": None, + "--stop-checker": None, + "--is-enabled": None, + "--remove": None, + "--checker": "x", + "--timeout": "PT0S", + "--extended-timeout": "PT0S", + "--interval": "PT0S", + "--approve-by-default": None, + "--deny-by-default": None, + "--approval-delay": "PT0S", + "--approval-duration": "PT0S", + "--host": "hostname", + "--secret": "/dev/null", + "--approve": None, + "--deny": None, } + @staticmethod + def actionargs(action, value, *args): + if value is not None: + return [action, value] + list(args) + else: + return [action] + list(args) + @contextlib.contextmanager def assertParseError(self): with self.assertRaises(SystemExit) as e: @@ -1007,6 +1018,10 @@ # /argparse.html#exiting-methods self.assertEqual(2, e.exception.code) + def parse_args(self, args): + options = self.parser.parse_args(args) + check_option_syntax(self.parser, options) + @staticmethod @contextlib.contextmanager def redirect_stderr_to_devnull(): @@ -1023,107 +1038,85 @@ def test_actions_all_conflicts_with_verbose(self): for action, value in self.actions.items(): - options = self.parser.parse_args() - setattr(options, action, value) - options.all = True - options.verbose = True + args = self.actionargs(action, value, "--all", + "--verbose") with self.assertParseError(): - self.check_option_syntax(options) + self.parse_args(args) def test_actions_with_client_conflicts_with_verbose(self): for action, value in self.actions.items(): - options = self.parser.parse_args() - setattr(options, action, value) - options.verbose = True - options.client = ["client"] + args = self.actionargs(action, value, "--verbose", + "client") with self.assertParseError(): - self.check_option_syntax(options) + self.parse_args(args) def test_dump_json_conflicts_with_verbose(self): - options = self.parser.parse_args() - options.dump_json = True - options.verbose = True + args = ["--dump-json", "--verbose"] with self.assertParseError(): - self.check_option_syntax(options) + self.parse_args(args) def test_dump_json_conflicts_with_action(self): for action, value in self.actions.items(): - options = self.parser.parse_args() - setattr(options, action, value) - options.dump_json = True + args = self.actionargs(action, value, "--dump-json") with self.assertParseError(): - self.check_option_syntax(options) + self.parse_args(args) def test_all_can_not_be_alone(self): - options = self.parser.parse_args() - options.all = True + args = ["--all"] with self.assertParseError(): - self.check_option_syntax(options) + self.parse_args(args) def test_all_is_ok_with_any_action(self): for action, value in self.actions.items(): - options = self.parser.parse_args() - setattr(options, action, value) - options.all = True - self.check_option_syntax(options) + args = self.actionargs(action, value, "--all") + self.parse_args(args) def test_any_action_is_ok_with_one_client(self): for action, value in self.actions.items(): - options = self.parser.parse_args() - setattr(options, action, value) - options.client = ["client"] - self.check_option_syntax(options) + args = self.actionargs(action, value, "client") + self.parse_args(args) def test_one_client_with_all_actions_except_is_enabled(self): - options = self.parser.parse_args() for action, value in self.actions.items(): - if action == "is_enabled": + if action == "--is-enabled": continue - setattr(options, action, value) - options.client = ["client"] - self.check_option_syntax(options) + args = self.actionargs(action, value, "client") + self.parse_args(args) def test_two_clients_with_all_actions_except_is_enabled(self): - options = self.parser.parse_args() for action, value in self.actions.items(): - if action == "is_enabled": + if action == "--is-enabled": continue - setattr(options, action, value) - options.client = ["client1", "client2"] - self.check_option_syntax(options) + args = self.actionargs(action, value, "client1", + "client2") + self.parse_args(args) def test_two_clients_are_ok_with_actions_except_is_enabled(self): for action, value in self.actions.items(): - if action == "is_enabled": + if action == "--is-enabled": continue - options = self.parser.parse_args() - setattr(options, action, value) - options.client = ["client1", "client2"] - self.check_option_syntax(options) + args = self.actionargs(action, value, "client1", + "client2") + self.parse_args(args) def test_is_enabled_fails_without_client(self): - options = self.parser.parse_args() - options.is_enabled = True + args = ["--is-enabled"] with self.assertParseError(): - self.check_option_syntax(options) + self.parse_args(args) def test_is_enabled_fails_with_two_clients(self): - options = self.parser.parse_args() - options.is_enabled = True - options.client = ["client1", "client2"] + args = ["--is-enabled", "client1", "client2"] with self.assertParseError(): - self.check_option_syntax(options) + self.parse_args(args) def test_remove_can_only_be_combined_with_action_deny(self): for action, value in self.actions.items(): - if action in {"remove", "deny"}: + if action in {"--remove", "--deny"}: continue - options = self.parser.parse_args() - setattr(options, action, value) - options.all = True - options.remove = True + args = self.actionargs(action, value, "--all", + "--remove") with self.assertParseError(): - self.check_option_syntax(options) + self.parse_args(args) class Test_dbus_exceptions(unittest.TestCase):