=== modified file 'mandos-ctl' --- mandos-ctl 2019-03-16 17:11:12 +0000 +++ mandos-ctl 2019-03-17 10:59:24 +0000 @@ -477,365 +477,377 @@ commands = [] if options.is_enabled: - commands.append(IsEnabledCmd()) + commands.append(command.IsEnabled()) if options.approve: - commands.append(ApproveCmd()) + commands.append(command.Approve()) if options.deny: - commands.append(DenyCmd()) + commands.append(command.Deny()) if options.remove: - commands.append(RemoveCmd()) + commands.append(command.Remove()) if options.dump_json: - commands.append(DumpJSONCmd()) + commands.append(command.DumpJSON()) if options.enable: - commands.append(EnableCmd()) + commands.append(command.Enable()) if options.disable: - commands.append(DisableCmd()) + commands.append(command.Disable()) if options.bump_timeout: - commands.append(BumpTimeoutCmd()) + commands.append(command.BumpTimeout()) if options.start_checker: - commands.append(StartCheckerCmd()) + commands.append(command.StartChecker()) if options.stop_checker: - commands.append(StopCheckerCmd()) + commands.append(command.StopChecker()) if options.approved_by_default is not None: if options.approved_by_default: - commands.append(ApproveByDefaultCmd()) + commands.append(command.ApproveByDefault()) else: - commands.append(DenyByDefaultCmd()) + commands.append(command.DenyByDefault()) if options.checker is not None: - commands.append(SetCheckerCmd(options.checker)) + commands.append(command.SetChecker(options.checker)) if options.host is not None: - commands.append(SetHostCmd(options.host)) + commands.append(command.SetHost(options.host)) if options.secret is not None: - commands.append(SetSecretCmd(options.secret)) + commands.append(command.SetSecret(options.secret)) if options.timeout is not None: - commands.append(SetTimeoutCmd(options.timeout)) + commands.append(command.SetTimeout(options.timeout)) if options.extended_timeout: commands.append( - SetExtendedTimeoutCmd(options.extended_timeout)) + command.SetExtendedTimeout(options.extended_timeout)) if options.interval is not None: - commands.append(SetIntervalCmd(options.interval)) + commands.append(command.SetInterval(options.interval)) if options.approval_delay is not None: - commands.append(SetApprovalDelayCmd(options.approval_delay)) + commands.append( + command.SetApprovalDelay(options.approval_delay)) if options.approval_duration is not None: commands.append( - SetApprovalDurationCmd(options.approval_duration)) + command.SetApprovalDuration(options.approval_duration)) # If no command option has been given, show table of clients, # optionally verbosely if not commands: - commands.append(PrintTableCmd(verbose=options.verbose)) + commands.append(command.PrintTable(verbose=options.verbose)) return commands -class Command(object): - """Abstract class for commands""" - def run(self, clients, bus=None, mandos=None): - """Normal commands should implement run_on_one_client(), but - commands which want to operate on all clients at the same time - can override this run() method instead.""" - self.mandos = mandos - for clientpath, properties in clients.items(): - log.debug("D-Bus: Connect to: (busname=%r, path=%r)", - dbus_busname, str(clientpath)) - client = bus.get_object(dbus_busname, clientpath) - self.run_on_one_client(client, properties) - - -class IsEnabledCmd(Command): - def run(self, clients, bus=None, mandos=None): - client, properties = next(iter(clients.items())) - if self.is_enabled(client, properties): - sys.exit(0) - sys.exit(1) - def is_enabled(self, client, properties): - return properties["Enabled"] - - -class ApproveCmd(Command): - def run_on_one_client(self, client, properties): - log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname, - client.__dbus_object_path__, client_dbus_interface) - client.Approve(dbus.Boolean(True), - dbus_interface=client_dbus_interface) - - -class DenyCmd(Command): - def run_on_one_client(self, client, properties): - log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname, - client.__dbus_object_path__, client_dbus_interface) - client.Approve(dbus.Boolean(False), - dbus_interface=client_dbus_interface) - - -class RemoveCmd(Command): - def run_on_one_client(self, client, properties): - log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname, - server_dbus_path, server_dbus_interface, - str(client.__dbus_object_path__)) - self.mandos.RemoveClient(client.__dbus_object_path__) - - -class OutputCmd(Command): - """Abstract class for commands outputting client details""" - all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK", - "Created", "Interval", "Host", "KeyID", - "Fingerprint", "CheckerRunning", "LastEnabled", - "ApprovalPending", "ApprovedByDefault", - "LastApprovalRequest", "ApprovalDelay", - "ApprovalDuration", "Checker", "ExtendedTimeout", - "Expires", "LastCheckerStatus") - - def run(self, clients, bus=None, mandos=None): - print(self.output(clients.values())) - - def output(self, clients): - raise NotImplementedError() - - -class DumpJSONCmd(OutputCmd): - def output(self, clients): - data = {client["Name"]: - {key: self.dbus_boolean_to_bool(client[key]) - for key in self.all_keywords} - for client in clients} - return json.dumps(data, indent=4, separators=(',', ': ')) - - @staticmethod - def dbus_boolean_to_bool(value): - if isinstance(value, dbus.Boolean): - value = bool(value) - return value - - -class PrintTableCmd(OutputCmd): - def __init__(self, verbose=False): - self.verbose = verbose - - def output(self, clients): - default_keywords = ("Name", "Enabled", "Timeout", - "LastCheckedOK") - keywords = default_keywords - if self.verbose: - keywords = self.all_keywords - return str(self.TableOfClients(clients, keywords)) - - class TableOfClients(object): - tableheaders = { - "Name": "Name", - "Enabled": "Enabled", - "Timeout": "Timeout", - "LastCheckedOK": "Last Successful Check", - "LastApprovalRequest": "Last Approval Request", - "Created": "Created", - "Interval": "Interval", - "Host": "Host", - "Fingerprint": "Fingerprint", - "KeyID": "Key ID", - "CheckerRunning": "Check Is Running", - "LastEnabled": "Last Enabled", - "ApprovalPending": "Approval Is Pending", - "ApprovedByDefault": "Approved By Default", - "ApprovalDelay": "Approval Delay", - "ApprovalDuration": "Approval Duration", - "Checker": "Checker", - "ExtendedTimeout": "Extended Timeout", - "Expires": "Expires", - "LastCheckerStatus": "Last Checker Status", - } - - def __init__(self, clients, keywords): - self.clients = clients - self.keywords = keywords - - def __str__(self): - return "\n".join(self.rows()) - - if sys.version_info.major == 2: - __unicode__ = __str__ +class command(object): + """A namespace for command classes""" + + class Base(object): + """Abstract base class for commands""" + def run(self, clients, bus=None, mandos=None): + """Normal commands should implement run_on_one_client(), +but commands which want to operate on all clients at the same time can +override this run() method instead. +""" + self.mandos = mandos + for clientpath, properties in clients.items(): + log.debug("D-Bus: Connect to: (busname=%r, path=%r)", + dbus_busname, str(clientpath)) + client = bus.get_object(dbus_busname, clientpath) + self.run_on_one_client(client, properties) + + + class IsEnabled(Base): + def run(self, clients, bus=None, mandos=None): + client, properties = next(iter(clients.items())) + if self.is_enabled(client, properties): + sys.exit(0) + sys.exit(1) + def is_enabled(self, client, properties): + return properties["Enabled"] + + + class Approve(Base): + def run_on_one_client(self, client, properties): + log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname, + client.__dbus_object_path__, + client_dbus_interface) + client.Approve(dbus.Boolean(True), + dbus_interface=client_dbus_interface) + + + class Deny(Base): + def run_on_one_client(self, client, properties): + log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname, + client.__dbus_object_path__, + client_dbus_interface) + client.Approve(dbus.Boolean(False), + dbus_interface=client_dbus_interface) + + + class Remove(Base): + def run_on_one_client(self, client, properties): + log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", + dbus_busname, server_dbus_path, + server_dbus_interface, + str(client.__dbus_object_path__)) + self.mandos.RemoveClient(client.__dbus_object_path__) + + + class Output(Base): + """Abstract class for commands outputting client details""" + all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK", + "Created", "Interval", "Host", "KeyID", + "Fingerprint", "CheckerRunning", + "LastEnabled", "ApprovalPending", + "ApprovedByDefault", "LastApprovalRequest", + "ApprovalDelay", "ApprovalDuration", + "Checker", "ExtendedTimeout", "Expires", + "LastCheckerStatus") + + def run(self, clients, bus=None, mandos=None): + print(self.output(clients.values())) + + def output(self, clients): + raise NotImplementedError() + + + class DumpJSON(Output): + def output(self, clients): + data = {client["Name"]: + {key: self.dbus_boolean_to_bool(client[key]) + for key in self.all_keywords} + for client in clients} + return json.dumps(data, indent=4, separators=(',', ': ')) + + @staticmethod + def dbus_boolean_to_bool(value): + if isinstance(value, dbus.Boolean): + value = bool(value) + return value + + + class PrintTable(Output): + def __init__(self, verbose=False): + self.verbose = verbose + + def output(self, clients): + default_keywords = ("Name", "Enabled", "Timeout", + "LastCheckedOK") + keywords = default_keywords + if self.verbose: + keywords = self.all_keywords + return str(self.TableOfClients(clients, keywords)) + + class TableOfClients(object): + tableheaders = { + "Name": "Name", + "Enabled": "Enabled", + "Timeout": "Timeout", + "LastCheckedOK": "Last Successful Check", + "LastApprovalRequest": "Last Approval Request", + "Created": "Created", + "Interval": "Interval", + "Host": "Host", + "Fingerprint": "Fingerprint", + "KeyID": "Key ID", + "CheckerRunning": "Check Is Running", + "LastEnabled": "Last Enabled", + "ApprovalPending": "Approval Is Pending", + "ApprovedByDefault": "Approved By Default", + "ApprovalDelay": "Approval Delay", + "ApprovalDuration": "Approval Duration", + "Checker": "Checker", + "ExtendedTimeout": "Extended Timeout", + "Expires": "Expires", + "LastCheckerStatus": "Last Checker Status", + } + + def __init__(self, clients, keywords): + self.clients = clients + self.keywords = keywords + def __str__(self): - return str(self).encode(locale.getpreferredencoding()) - - def rows(self): - format_string = self.row_formatting_string() - rows = [self.header_line(format_string)] - rows.extend(self.client_line(client, format_string) - for client in self.clients) - return rows - - def row_formatting_string(self): - "Format string used to format table rows" - return " ".join("{{{key}:{width}}}".format( - width=max(len(self.tableheaders[key]), - *(len(self.string_from_client(client, key)) - for client in self.clients)), - key=key) - for key in self.keywords) - - def string_from_client(self, client, key): - return self.valuetostring(client[key], key) - - @classmethod - def valuetostring(cls, value, keyword): - if isinstance(value, dbus.Boolean): - return "Yes" if value else "No" - if keyword in ("Timeout", "Interval", "ApprovalDelay", - "ApprovalDuration", "ExtendedTimeout"): - return cls.milliseconds_to_string(value) - return str(value) - - def header_line(self, format_string): - return format_string.format(**self.tableheaders) - - def client_line(self, client, format_string): - return format_string.format( - **{key: self.string_from_client(client, key) - for key in self.keywords}) - - @staticmethod - def milliseconds_to_string(ms): - td = datetime.timedelta(0, 0, 0, ms) - return ("{days}{hours:02}:{minutes:02}:{seconds:02}" - .format(days="{}T".format(td.days) - if td.days else "", - hours=td.seconds // 3600, - minutes=(td.seconds % 3600) // 60, - seconds=td.seconds % 60)) - - -class PropertyCmd(Command): - """Abstract class for Actions for setting one client property""" - - def run_on_one_client(self, client, properties): - """Set the Client's D-Bus property""" - log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname, - client.__dbus_object_path__, - dbus.PROPERTIES_IFACE, client_dbus_interface, - self.propname, self.value_to_set - if not isinstance(self.value_to_set, dbus.Boolean) - else bool(self.value_to_set)) - client.Set(client_dbus_interface, self.propname, - self.value_to_set, - dbus_interface=dbus.PROPERTIES_IFACE) - - @property - def propname(self): - raise NotImplementedError() - - -class EnableCmd(PropertyCmd): - propname = "Enabled" - value_to_set = dbus.Boolean(True) - - -class DisableCmd(PropertyCmd): - propname = "Enabled" - value_to_set = dbus.Boolean(False) - - -class BumpTimeoutCmd(PropertyCmd): - propname = "LastCheckedOK" - value_to_set = "" - - -class StartCheckerCmd(PropertyCmd): - propname = "CheckerRunning" - value_to_set = dbus.Boolean(True) - - -class StopCheckerCmd(PropertyCmd): - propname = "CheckerRunning" - value_to_set = dbus.Boolean(False) - - -class ApproveByDefaultCmd(PropertyCmd): - propname = "ApprovedByDefault" - value_to_set = dbus.Boolean(True) - - -class DenyByDefaultCmd(PropertyCmd): - propname = "ApprovedByDefault" - value_to_set = dbus.Boolean(False) - - -class PropertyValueCmd(PropertyCmd): - """Abstract class for PropertyCmd recieving a value as argument""" - def __init__(self, value): - self.value_to_set = value - - -class SetCheckerCmd(PropertyValueCmd): - propname = "Checker" - - -class SetHostCmd(PropertyValueCmd): - propname = "Host" - - -class SetSecretCmd(PropertyValueCmd): - propname = "Secret" - - @property - def value_to_set(self): - return self._vts - - @value_to_set.setter - def value_to_set(self, value): - """When setting, read data from supplied file object""" - self._vts = value.read() - value.close() - - -class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd): - """Abstract class for PropertyValueCmd taking a value argument as + return "\n".join(self.rows()) + + if sys.version_info.major == 2: + __unicode__ = __str__ + def __str__(self): + return str(self).encode( + locale.getpreferredencoding()) + + def rows(self): + format_string = self.row_formatting_string() + rows = [self.header_line(format_string)] + rows.extend(self.client_line(client, format_string) + for client in self.clients) + return rows + + def row_formatting_string(self): + "Format string used to format table rows" + return " ".join("{{{key}:{width}}}".format( + width=max(len(self.tableheaders[key]), + *(len(self.string_from_client(client, + key)) + for client in self.clients)), + key=key) + for key in self.keywords) + + def string_from_client(self, client, key): + return self.valuetostring(client[key], key) + + @classmethod + def valuetostring(cls, value, keyword): + if isinstance(value, dbus.Boolean): + return "Yes" if value else "No" + if keyword in ("Timeout", "Interval", "ApprovalDelay", + "ApprovalDuration", "ExtendedTimeout"): + return cls.milliseconds_to_string(value) + return str(value) + + def header_line(self, format_string): + return format_string.format(**self.tableheaders) + + def client_line(self, client, format_string): + return format_string.format( + **{key: self.string_from_client(client, key) + for key in self.keywords}) + + @staticmethod + def milliseconds_to_string(ms): + td = datetime.timedelta(0, 0, 0, ms) + return ("{days}{hours:02}:{minutes:02}:{seconds:02}" + .format(days="{}T".format(td.days) + if td.days else "", + hours=td.seconds // 3600, + minutes=(td.seconds % 3600) // 60, + seconds=td.seconds % 60)) + + + class Property(Base): + "Abstract class for Actions for setting one client property" + + def run_on_one_client(self, client, properties): + """Set the Client's D-Bus property""" + log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname, + client.__dbus_object_path__, + dbus.PROPERTIES_IFACE, client_dbus_interface, + self.propname, self.value_to_set + if not isinstance(self.value_to_set, + dbus.Boolean) + else bool(self.value_to_set)) + client.Set(client_dbus_interface, self.propname, + self.value_to_set, + dbus_interface=dbus.PROPERTIES_IFACE) + + @property + def propname(self): + raise NotImplementedError() + + + class Enable(Property): + propname = "Enabled" + value_to_set = dbus.Boolean(True) + + + class Disable(Property): + propname = "Enabled" + value_to_set = dbus.Boolean(False) + + + class BumpTimeout(Property): + propname = "LastCheckedOK" + value_to_set = "" + + + class StartChecker(Property): + propname = "CheckerRunning" + value_to_set = dbus.Boolean(True) + + + class StopChecker(Property): + propname = "CheckerRunning" + value_to_set = dbus.Boolean(False) + + + class ApproveByDefault(Property): + propname = "ApprovedByDefault" + value_to_set = dbus.Boolean(True) + + + class DenyByDefault(Property): + propname = "ApprovedByDefault" + value_to_set = dbus.Boolean(False) + + + class PropertyValue(Property): + "Abstract class for Property recieving a value as argument" + def __init__(self, value): + self.value_to_set = value + + + class SetChecker(PropertyValue): + propname = "Checker" + + + class SetHost(PropertyValue): + propname = "Host" + + + class SetSecret(PropertyValue): + propname = "Secret" + + @property + def value_to_set(self): + return self._vts + + @value_to_set.setter + def value_to_set(self, value): + """When setting, read data from supplied file object""" + self._vts = value.read() + value.close() + + + class MillisecondsPropertyValueArgument(PropertyValue): + """Abstract class for PropertyValue taking a value argument as a datetime.timedelta() but should store it as milliseconds.""" - @property - def value_to_set(self): - return self._vts - - @value_to_set.setter - def value_to_set(self, value): - """When setting, convert value from a datetime.timedelta""" - self._vts = int(round(value.total_seconds() * 1000)) - - -class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd): - propname = "Timeout" - - -class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd): - propname = "ExtendedTimeout" - - -class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd): - propname = "Interval" - - -class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd): - propname = "ApprovalDelay" - - -class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd): - propname = "ApprovalDuration" + @property + def value_to_set(self): + return self._vts + + @value_to_set.setter + def value_to_set(self, value): + "When setting, convert value from a datetime.timedelta" + self._vts = int(round(value.total_seconds() * 1000)) + + + class SetTimeout(MillisecondsPropertyValueArgument): + propname = "Timeout" + + + class SetExtendedTimeout(MillisecondsPropertyValueArgument): + propname = "ExtendedTimeout" + + + class SetInterval(MillisecondsPropertyValueArgument): + propname = "Interval" + + + class SetApprovalDelay(MillisecondsPropertyValueArgument): + propname = "ApprovalDelay" + + + class SetApprovalDuration(MillisecondsPropertyValueArgument): + propname = "ApprovalDuration" @@ -1144,7 +1156,7 @@ def test_is_enabled(self): self.assert_command_from_args(["--is-enabled", "foo"], - IsEnabledCmd) + command.IsEnabled) def assert_command_from_args(self, args, command_cls, **cmd_attrs): @@ -1160,24 +1172,25 @@ self.assertEqual(getattr(command, key), value) def test_is_enabled_short(self): - self.assert_command_from_args(["-V", "foo"], IsEnabledCmd) + self.assert_command_from_args(["-V", "foo"], + command.IsEnabled) def test_approve(self): self.assert_command_from_args(["--approve", "foo"], - ApproveCmd) + command.Approve) def test_approve_short(self): - self.assert_command_from_args(["-A", "foo"], ApproveCmd) + self.assert_command_from_args(["-A", "foo"], command.Approve) def test_deny(self): - self.assert_command_from_args(["--deny", "foo"], DenyCmd) + self.assert_command_from_args(["--deny", "foo"], command.Deny) def test_deny_short(self): - self.assert_command_from_args(["-D", "foo"], DenyCmd) + self.assert_command_from_args(["-D", "foo"], command.Deny) def test_remove(self): self.assert_command_from_args(["--remove", "foo"], - RemoveCmd) + command.Remove) def test_deny_before_remove(self): options = self.parser.parse_args(["--deny", "--remove", @@ -1185,8 +1198,8 @@ check_option_syntax(self.parser, options) commands = commands_from_options(options) self.assertEqual(len(commands), 2) - self.assertIsInstance(commands[0], DenyCmd) - self.assertIsInstance(commands[1], RemoveCmd) + self.assertIsInstance(commands[0], command.Deny) + self.assertIsInstance(commands[1], command.Remove) def test_deny_before_remove_reversed(self): options = self.parser.parse_args(["--remove", "--deny", @@ -1194,76 +1207,82 @@ check_option_syntax(self.parser, options) commands = commands_from_options(options) self.assertEqual(len(commands), 2) - self.assertIsInstance(commands[0], DenyCmd) - self.assertIsInstance(commands[1], RemoveCmd) + self.assertIsInstance(commands[0], command.Deny) + self.assertIsInstance(commands[1], command.Remove) def test_remove_short(self): - self.assert_command_from_args(["-r", "foo"], RemoveCmd) + self.assert_command_from_args(["-r", "foo"], command.Remove) def test_dump_json(self): - self.assert_command_from_args(["--dump-json"], DumpJSONCmd) + self.assert_command_from_args(["--dump-json"], + command.DumpJSON) def test_enable(self): - self.assert_command_from_args(["--enable", "foo"], EnableCmd) + self.assert_command_from_args(["--enable", "foo"], + command.Enable) def test_enable_short(self): - self.assert_command_from_args(["-e", "foo"], EnableCmd) + self.assert_command_from_args(["-e", "foo"], command.Enable) def test_disable(self): self.assert_command_from_args(["--disable", "foo"], - DisableCmd) + command.Disable) def test_disable_short(self): - self.assert_command_from_args(["-d", "foo"], DisableCmd) + self.assert_command_from_args(["-d", "foo"], command.Disable) def test_bump_timeout(self): self.assert_command_from_args(["--bump-timeout", "foo"], - BumpTimeoutCmd) + command.BumpTimeout) def test_bump_timeout_short(self): - self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd) + self.assert_command_from_args(["-b", "foo"], + command.BumpTimeout) def test_start_checker(self): self.assert_command_from_args(["--start-checker", "foo"], - StartCheckerCmd) + command.StartChecker) def test_stop_checker(self): self.assert_command_from_args(["--stop-checker", "foo"], - StopCheckerCmd) + command.StopChecker) def test_approve_by_default(self): self.assert_command_from_args(["--approve-by-default", "foo"], - ApproveByDefaultCmd) + command.ApproveByDefault) def test_deny_by_default(self): self.assert_command_from_args(["--deny-by-default", "foo"], - DenyByDefaultCmd) + command.DenyByDefault) def test_checker(self): self.assert_command_from_args(["--checker", ":", "foo"], - SetCheckerCmd, value_to_set=":") + command.SetChecker, + value_to_set=":") def test_checker_empty(self): self.assert_command_from_args(["--checker", "", "foo"], - SetCheckerCmd, value_to_set="") + command.SetChecker, + value_to_set="") def test_checker_short(self): self.assert_command_from_args(["-c", ":", "foo"], - SetCheckerCmd, value_to_set=":") + command.SetChecker, + value_to_set=":") def test_host(self): self.assert_command_from_args(["--host", "foo.example.org", - "foo"], SetHostCmd, + "foo"], command.SetHost, value_to_set="foo.example.org") def test_host_short(self): self.assert_command_from_args(["-H", "foo.example.org", - "foo"], SetHostCmd, + "foo"], command.SetHost, value_to_set="foo.example.org") def test_secret_devnull(self): self.assert_command_from_args(["--secret", os.path.devnull, - "foo"], SetSecretCmd, + "foo"], command.SetSecret, value_to_set=b"") def test_secret_tempfile(self): @@ -1272,12 +1291,13 @@ f.write(value) f.seek(0) self.assert_command_from_args(["--secret", f.name, - "foo"], SetSecretCmd, + "foo"], command.SetSecret, value_to_set=value) def test_secret_devnull_short(self): self.assert_command_from_args(["-s", os.path.devnull, "foo"], - SetSecretCmd, value_to_set=b"") + command.SetSecret, + value_to_set=b"") def test_secret_tempfile_short(self): with tempfile.NamedTemporaryFile(mode="r+b") as f: @@ -1285,59 +1305,62 @@ f.write(value) f.seek(0) self.assert_command_from_args(["-s", f.name, "foo"], - SetSecretCmd, + command.SetSecret, value_to_set=value) def test_timeout(self): self.assert_command_from_args(["--timeout", "PT5M", "foo"], - SetTimeoutCmd, + command.SetTimeout, value_to_set=300000) def test_timeout_short(self): self.assert_command_from_args(["-t", "PT5M", "foo"], - SetTimeoutCmd, + command.SetTimeout, value_to_set=300000) def test_extended_timeout(self): self.assert_command_from_args(["--extended-timeout", "PT15M", "foo"], - SetExtendedTimeoutCmd, + command.SetExtendedTimeout, value_to_set=900000) def test_interval(self): self.assert_command_from_args(["--interval", "PT2M", "foo"], - SetIntervalCmd, + command.SetInterval, value_to_set=120000) def test_interval_short(self): self.assert_command_from_args(["-i", "PT2M", "foo"], - SetIntervalCmd, + command.SetInterval, value_to_set=120000) def test_approval_delay(self): self.assert_command_from_args(["--approval-delay", "PT30S", - "foo"], SetApprovalDelayCmd, + "foo"], + command.SetApprovalDelay, value_to_set=30000) def test_approval_duration(self): self.assert_command_from_args(["--approval-duration", "PT1S", - "foo"], SetApprovalDurationCmd, + "foo"], + command.SetApprovalDuration, value_to_set=1000) def test_print_table(self): - self.assert_command_from_args([], PrintTableCmd, + self.assert_command_from_args([], command.PrintTable, verbose=False) def test_print_table_verbose(self): - self.assert_command_from_args(["--verbose"], PrintTableCmd, + self.assert_command_from_args(["--verbose"], + command.PrintTable, verbose=True) def test_print_table_verbose_short(self): - self.assert_command_from_args(["-v"], PrintTableCmd, + self.assert_command_from_args(["-v"], command.PrintTable, verbose=True) -class TestCmd(unittest.TestCase): +class TestCommand(unittest.TestCase): """Abstract class for tests of command classes""" def setUp(self): @@ -1433,16 +1456,17 @@ return Bus() -class TestIsEnabledCmd(TestCmd): +class TestBaseCommands(TestCommand): + def test_is_enabled(self): - self.assertTrue(all(IsEnabledCmd().is_enabled(client, + self.assertTrue(all(command.IsEnabled().is_enabled(client, properties) for client, properties in self.clients.items())) def test_is_enabled_run_exits_successfully(self): with self.assertRaises(SystemExit) as e: - IsEnabledCmd().run(self.one_client) + command.IsEnabled().run(self.one_client) if e.exception.code is not None: self.assertEqual(e.exception.code, 0) else: @@ -1451,32 +1475,26 @@ def test_is_enabled_run_exits_with_failure(self): self.client.attributes["Enabled"] = dbus.Boolean(False) with self.assertRaises(SystemExit) as e: - IsEnabledCmd().run(self.one_client) + command.IsEnabled().run(self.one_client) if isinstance(e.exception.code, int): self.assertNotEqual(e.exception.code, 0) else: self.assertIsNotNone(e.exception.code) - -class TestApproveCmd(TestCmd): def test_approve(self): - ApproveCmd().run(self.clients, self.bus) + command.Approve().run(self.clients, self.bus) for clientpath in self.clients: client = self.bus.get_object(dbus_busname, clientpath) self.assertIn(("Approve", (True, client_dbus_interface)), client.calls) - -class TestDenyCmd(TestCmd): def test_deny(self): - DenyCmd().run(self.clients, self.bus) + command.Deny().run(self.clients, self.bus) for clientpath in self.clients: client = self.bus.get_object(dbus_busname, clientpath) self.assertIn(("Approve", (False, client_dbus_interface)), client.calls) - -class TestRemoveCmd(TestCmd): def test_remove(self): class MockMandos(object): def __init__(self): @@ -1484,83 +1502,77 @@ def RemoveClient(self, dbus_path): self.calls.append(("RemoveClient", (dbus_path,))) mandos = MockMandos() - super(TestRemoveCmd, self).setUp() - RemoveCmd().run(self.clients, self.bus, mandos) + super(TestBaseCommands, self).setUp() + command.Remove().run(self.clients, self.bus, mandos) self.assertEqual(len(mandos.calls), 2) for clientpath in self.clients: self.assertIn(("RemoveClient", (clientpath,)), mandos.calls) - -class TestDumpJSONCmd(TestCmd): - def setUp(self): - self.expected_json = { - "foo": { - "Name": "foo", - "KeyID": ("92ed150794387c03ce684574b1139a65" - "94a34f895daaaf09fd8ea90a27cddb12"), - "Host": "foo.example.org", - "Enabled": True, - "Timeout": 300000, - "LastCheckedOK": "2019-02-03T00:00:00", - "Created": "2019-01-02T00:00:00", - "Interval": 120000, - "Fingerprint": ("778827225BA7DE539C5A" - "7CFA59CFF7CDBD9A5920"), - "CheckerRunning": False, - "LastEnabled": "2019-01-03T00:00:00", - "ApprovalPending": False, - "ApprovedByDefault": True, - "LastApprovalRequest": "", - "ApprovalDelay": 0, - "ApprovalDuration": 1000, - "Checker": "fping -q -- %(host)s", - "ExtendedTimeout": 900000, - "Expires": "2019-02-04T00:00:00", - "LastCheckerStatus": 0, - }, - "barbar": { - "Name": "barbar", - "KeyID": ("0558568eedd67d622f5c83b35a115f79" - "6ab612cff5ad227247e46c2b020f441c"), - "Host": "192.0.2.3", - "Enabled": True, - "Timeout": 300000, - "LastCheckedOK": "2019-02-04T00:00:00", - "Created": "2019-01-03T00:00:00", - "Interval": 120000, - "Fingerprint": ("3E393AEAEFB84C7E89E2" - "F547B3A107558FCA3A27"), - "CheckerRunning": True, - "LastEnabled": "2019-01-04T00:00:00", - "ApprovalPending": False, - "ApprovedByDefault": False, - "LastApprovalRequest": "2019-01-03T00:00:00", - "ApprovalDelay": 30000, - "ApprovalDuration": 93785000, - "Checker": ":", - "ExtendedTimeout": 900000, - "Expires": "2019-02-05T00:00:00", - "LastCheckerStatus": -2, - }, - } - return super(TestDumpJSONCmd, self).setUp() - - def test_normal(self): - output = DumpJSONCmd().output(self.clients.values()) + expected_json = { + "foo": { + "Name": "foo", + "KeyID": ("92ed150794387c03ce684574b1139a65" + "94a34f895daaaf09fd8ea90a27cddb12"), + "Host": "foo.example.org", + "Enabled": True, + "Timeout": 300000, + "LastCheckedOK": "2019-02-03T00:00:00", + "Created": "2019-01-02T00:00:00", + "Interval": 120000, + "Fingerprint": ("778827225BA7DE539C5A" + "7CFA59CFF7CDBD9A5920"), + "CheckerRunning": False, + "LastEnabled": "2019-01-03T00:00:00", + "ApprovalPending": False, + "ApprovedByDefault": True, + "LastApprovalRequest": "", + "ApprovalDelay": 0, + "ApprovalDuration": 1000, + "Checker": "fping -q -- %(host)s", + "ExtendedTimeout": 900000, + "Expires": "2019-02-04T00:00:00", + "LastCheckerStatus": 0, + }, + "barbar": { + "Name": "barbar", + "KeyID": ("0558568eedd67d622f5c83b35a115f79" + "6ab612cff5ad227247e46c2b020f441c"), + "Host": "192.0.2.3", + "Enabled": True, + "Timeout": 300000, + "LastCheckedOK": "2019-02-04T00:00:00", + "Created": "2019-01-03T00:00:00", + "Interval": 120000, + "Fingerprint": ("3E393AEAEFB84C7E89E2" + "F547B3A107558FCA3A27"), + "CheckerRunning": True, + "LastEnabled": "2019-01-04T00:00:00", + "ApprovalPending": False, + "ApprovedByDefault": False, + "LastApprovalRequest": "2019-01-03T00:00:00", + "ApprovalDelay": 30000, + "ApprovalDuration": 93785000, + "Checker": ":", + "ExtendedTimeout": 900000, + "Expires": "2019-02-05T00:00:00", + "LastCheckerStatus": -2, + }, + } + + def test_DumpJSON_normal(self): + output = command.DumpJSON().output(self.clients.values()) json_data = json.loads(output) self.assertDictEqual(json_data, self.expected_json) - def test_one_client(self): - output = DumpJSONCmd().output(self.one_client.values()) + def test_DumpJSON_one_client(self): + output = command.DumpJSON().output(self.one_client.values()) json_data = json.loads(output) expected_json = {"foo": self.expected_json["foo"]} self.assertDictEqual(json_data, expected_json) - -class TestPrintTableCmd(TestCmd): - def test_normal(self): - output = PrintTableCmd().output(self.clients.values()) + def test_PrintTable_normal(self): + output = command.PrintTable().output(self.clients.values()) expected_output = "\n".join(( "Name Enabled Timeout Last Successful Check", "foo Yes 00:05:00 2019-02-03T00:00:00 ", @@ -1568,8 +1580,8 @@ )) self.assertEqual(output, expected_output) - def test_verbose(self): - output = PrintTableCmd(verbose=True).output( + def test_PrintTable_verbose(self): + output = command.PrintTable(verbose=True).output( self.clients.values()) columns = ( ( @@ -1663,8 +1675,8 @@ for line in range(num_lines)) self.assertEqual(output, expected_output) - def test_one_client(self): - output = PrintTableCmd().output(self.one_client.values()) + def test_PrintTable_one_client(self): + output = command.PrintTable().output(self.one_client.values()) expected_output = "\n".join(( "Name Enabled Timeout Last Successful Check", "foo Yes 00:05:00 2019-02-03T00:00:00 ", @@ -1672,8 +1684,8 @@ self.assertEqual(output, expected_output) -class TestPropertyCmd(TestCmd): - """Abstract class for tests of PropertyCmd classes""" +class TestPropertyCmd(TestCommand): + """Abstract class for tests of command.Property classes""" def runTest(self): if not hasattr(self, "command"): return @@ -1702,43 +1714,43 @@ class TestEnableCmd(TestPropertyCmd): - command = EnableCmd + command = command.Enable propname = "Enabled" values_to_set = [dbus.Boolean(True)] class TestDisableCmd(TestPropertyCmd): - command = DisableCmd + command = command.Disable propname = "Enabled" values_to_set = [dbus.Boolean(False)] class TestBumpTimeoutCmd(TestPropertyCmd): - command = BumpTimeoutCmd + command = command.BumpTimeout propname = "LastCheckedOK" values_to_set = [""] class TestStartCheckerCmd(TestPropertyCmd): - command = StartCheckerCmd + command = command.StartChecker propname = "CheckerRunning" values_to_set = [dbus.Boolean(True)] class TestStopCheckerCmd(TestPropertyCmd): - command = StopCheckerCmd + command = command.StopChecker propname = "CheckerRunning" values_to_set = [dbus.Boolean(False)] class TestApproveByDefaultCmd(TestPropertyCmd): - command = ApproveByDefaultCmd + command = command.ApproveByDefault propname = "ApprovedByDefault" values_to_set = [dbus.Boolean(True)] class TestDenyByDefaultCmd(TestPropertyCmd): - command = DenyByDefaultCmd + command = command.DenyByDefault propname = "ApprovedByDefault" values_to_set = [dbus.Boolean(False)] @@ -1756,19 +1768,19 @@ class TestSetCheckerCmd(TestPropertyValueCmd): - command = SetCheckerCmd + command = command.SetChecker propname = "Checker" values_to_set = ["", ":", "fping -q -- %s"] class TestSetHostCmd(TestPropertyValueCmd): - command = SetHostCmd + command = command.SetHost propname = "Host" values_to_set = ["192.0.2.3", "foo.example.org"] class TestSetSecretCmd(TestPropertyValueCmd): - command = SetSecretCmd + command = command.SetSecret propname = "Secret" values_to_set = [io.BytesIO(b""), io.BytesIO(b"secret\0xyzzy\nbar")] @@ -1776,7 +1788,7 @@ class TestSetTimeoutCmd(TestPropertyValueCmd): - command = SetTimeoutCmd + command = command.SetTimeout propname = "Timeout" values_to_set = [datetime.timedelta(), datetime.timedelta(minutes=5), @@ -1787,7 +1799,7 @@ class TestSetExtendedTimeoutCmd(TestPropertyValueCmd): - command = SetExtendedTimeoutCmd + command = command.SetExtendedTimeout propname = "ExtendedTimeout" values_to_set = [datetime.timedelta(), datetime.timedelta(minutes=5), @@ -1798,7 +1810,7 @@ class TestSetIntervalCmd(TestPropertyValueCmd): - command = SetIntervalCmd + command = command.SetInterval propname = "Interval" values_to_set = [datetime.timedelta(), datetime.timedelta(minutes=5), @@ -1809,7 +1821,7 @@ class TestSetApprovalDelayCmd(TestPropertyValueCmd): - command = SetApprovalDelayCmd + command = command.SetApprovalDelay propname = "ApprovalDelay" values_to_set = [datetime.timedelta(), datetime.timedelta(minutes=5), @@ -1820,7 +1832,7 @@ class TestSetApprovalDurationCmd(TestPropertyValueCmd): - command = SetApprovalDurationCmd + command = command.SetApprovalDuration propname = "ApprovalDuration" values_to_set = [datetime.timedelta(), datetime.timedelta(minutes=5),