#!/usr/bin/python
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
#
# Mandos Monitor - Control and monitor the Mandos server
#
# Copyright © 2008-2019 Teddy Hogeborn
# Copyright © 2008-2019 Björn Påhlsson
#
# This file is part of Mandos.
#
# Mandos is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
#     Mandos is distributed in the hope that it will be useful, but
#     WITHOUT ANY WARRANTY; without even the implied warranty of
#     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#     GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Mandos.  If not, see <http://www.gnu.org/licenses/>.
#
# Contact the authors at <mandos@recompile.se>.
#

from __future__ import (division, absolute_import, print_function,
                        unicode_literals)

try:
    from future_builtins import *
except ImportError:
    pass

import sys
import argparse
import locale
import datetime
import re
import os
import collections
import json
import unittest
import logging
import io
import tempfile

import dbus

# Show warnings by default
if not sys.warnoptions:
    import warnings
    warnings.simplefilter("default")

log = logging.getLogger(sys.argv[0])
logging.basicConfig(level="INFO", # Show info level messages
                    format="%(message)s") # Show basic log messages

logging.captureWarnings(True)   # Show warnings via the logging system

if sys.version_info.major == 2:
    str = unicode

locale.setlocale(locale.LC_ALL, "")

domain = "se.recompile"
busname = domain + ".Mandos"
server_path = "/"
server_interface = domain + ".Mandos"
client_interface = domain + ".Mandos.Client"
version = "1.8.3"


try:
    dbus.OBJECT_MANAGER_IFACE
except AttributeError:
    dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"


def milliseconds_to_string(ms):
    td = datetime.timedelta(0, 0, 0, ms)
    return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
            .format(days="{}T".format(td.days) if td.days else "",
                    hours=td.seconds // 3600,
                    minutes=(td.seconds % 3600) // 60,
                    seconds=td.seconds % 60))


def rfc3339_duration_to_delta(duration):
    """Parse an RFC 3339 "duration" and return a datetime.timedelta

    >>> rfc3339_duration_to_delta("P7D")
    datetime.timedelta(7)
    >>> rfc3339_duration_to_delta("PT60S")
    datetime.timedelta(0, 60)
    >>> rfc3339_duration_to_delta("PT60M")
    datetime.timedelta(0, 3600)
    >>> rfc3339_duration_to_delta("P60M")
    datetime.timedelta(1680)
    >>> rfc3339_duration_to_delta("PT24H")
    datetime.timedelta(1)
    >>> rfc3339_duration_to_delta("P1W")
    datetime.timedelta(7)
    >>> rfc3339_duration_to_delta("PT5M30S")
    datetime.timedelta(0, 330)
    >>> rfc3339_duration_to_delta("P1DT3M20S")
    datetime.timedelta(1, 200)
    >>> # Can not be empty:
    >>> rfc3339_duration_to_delta("")
    Traceback (most recent call last):
    ...
    ValueError: Invalid RFC 3339 duration: u''
    >>> # Must start with "P":
    >>> rfc3339_duration_to_delta("1D")
    Traceback (most recent call last):
    ...
    ValueError: Invalid RFC 3339 duration: u'1D'
    >>> # Must use correct order
    >>> rfc3339_duration_to_delta("PT1S2M")
    Traceback (most recent call last):
    ...
    ValueError: Invalid RFC 3339 duration: u'PT1S2M'
    >>> # Time needs time marker
    >>> rfc3339_duration_to_delta("P1H2S")
    Traceback (most recent call last):
    ...
    ValueError: Invalid RFC 3339 duration: u'P1H2S'
    >>> # Weeks can not be combined with anything else
    >>> rfc3339_duration_to_delta("P1D2W")
    Traceback (most recent call last):
    ...
    ValueError: Invalid RFC 3339 duration: u'P1D2W'
    >>> rfc3339_duration_to_delta("P2W2H")
    Traceback (most recent call last):
    ...
    ValueError: Invalid RFC 3339 duration: u'P2W2H'
    """

    # Parsing an RFC 3339 duration with regular expressions is not
    # possible - there would have to be multiple places for the same
    # values, like seconds.  The current code, while more esoteric, is
    # cleaner without depending on a parsing library.  If Python had a
    # built-in library for parsing we would use it, but we'd like to
    # avoid excessive use of external libraries.

    # New type for defining tokens, syntax, and semantics all-in-one
    Token = collections.namedtuple("Token", (
        "regexp",  # To match token; if "value" is not None, must have
                   # a "group" containing digits
        "value",   # datetime.timedelta or None
        "followers"))           # Tokens valid after this token
    # RFC 3339 "duration" tokens, syntax, and semantics; taken from
    # the "duration" ABNF definition in RFC 3339, Appendix A.
    token_end = Token(re.compile(r"$"), None, frozenset())
    token_second = Token(re.compile(r"(\d+)S"),
                         datetime.timedelta(seconds=1),
                         frozenset((token_end, )))
    token_minute = Token(re.compile(r"(\d+)M"),
                         datetime.timedelta(minutes=1),
                         frozenset((token_second, token_end)))
    token_hour = Token(re.compile(r"(\d+)H"),
                       datetime.timedelta(hours=1),
                       frozenset((token_minute, token_end)))
    token_time = Token(re.compile(r"T"),
                       None,
                       frozenset((token_hour, token_minute,
                                  token_second)))
    token_day = Token(re.compile(r"(\d+)D"),
                      datetime.timedelta(days=1),
                      frozenset((token_time, token_end)))
    token_month = Token(re.compile(r"(\d+)M"),
                        datetime.timedelta(weeks=4),
                        frozenset((token_day, token_end)))
    token_year = Token(re.compile(r"(\d+)Y"),
                       datetime.timedelta(weeks=52),
                       frozenset((token_month, token_end)))
    token_week = Token(re.compile(r"(\d+)W"),
                       datetime.timedelta(weeks=1),
                       frozenset((token_end, )))
    token_duration = Token(re.compile(r"P"), None,
                           frozenset((token_year, token_month,
                                      token_day, token_time,
                                      token_week)))
    # Define starting values:
    # Value so far
    value = datetime.timedelta()
    found_token = None
    # Following valid tokens
    followers = frozenset((token_duration, ))
    # String left to parse
    s = duration
    # Loop until end token is found
    while found_token is not token_end:
        # Search for any currently valid tokens
        for token in followers:
            match = token.regexp.match(s)
            if match is not None:
                # Token found
                if token.value is not None:
                    # Value found, parse digits
                    factor = int(match.group(1), 10)
                    # Add to value so far
                    value += factor * token.value
                # Strip token from string
                s = token.regexp.sub("", s, 1)
                # Go to found token
                found_token = token
                # Set valid next tokens
                followers = found_token.followers
                break
        else:
            # No currently valid tokens were found
            raise ValueError("Invalid RFC 3339 duration: {!r}"
                             .format(duration))
    # End token found
    return value


def string_to_delta(interval):
    """Parse a string and return a datetime.timedelta"""

    try:
        return rfc3339_duration_to_delta(interval)
    except ValueError as e:
        log.warning("%s - Parsing as pre-1.6.1 interval instead",
                    ' '.join(e.args))
    return parse_pre_1_6_1_interval(interval)


def parse_pre_1_6_1_interval(interval):
    """Parse an interval string as documented by Mandos before 1.6.1,
    and return a datetime.timedelta

    >>> parse_pre_1_6_1_interval('7d')
    datetime.timedelta(7)
    >>> parse_pre_1_6_1_interval('60s')
    datetime.timedelta(0, 60)
    >>> parse_pre_1_6_1_interval('60m')
    datetime.timedelta(0, 3600)
    >>> parse_pre_1_6_1_interval('24h')
    datetime.timedelta(1)
    >>> parse_pre_1_6_1_interval('1w')
    datetime.timedelta(7)
    >>> parse_pre_1_6_1_interval('5m 30s')
    datetime.timedelta(0, 330)
    >>> parse_pre_1_6_1_interval('')
    datetime.timedelta(0)
    >>> # Ignore unknown characters, allow any order and repetitions
    >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
    datetime.timedelta(2, 480, 18000)

    """

    value = datetime.timedelta(0)
    regexp = re.compile(r"(\d+)([dsmhw]?)")

    for num, suffix in regexp.findall(interval):
        if suffix == "d":
            value += datetime.timedelta(int(num))
        elif suffix == "s":
            value += datetime.timedelta(0, int(num))
        elif suffix == "m":
            value += datetime.timedelta(0, 0, 0, 0, int(num))
        elif suffix == "h":
            value += datetime.timedelta(0, 0, 0, 0, 0, int(num))
        elif suffix == "w":
            value += datetime.timedelta(0, 0, 0, 0, 0, 0, int(num))
        elif suffix == "":
            value += datetime.timedelta(0, 0, 0, int(num))
    return value


## Classes for commands.

# Abstract classes first
class Command(object):
    """Abstract class for commands"""
    def run(self, mandos, clients):
        """Normal commands should implement run_on_one_client(), but
        commands which want to operate on all clients at the same time
        can override this run() method instead."""
        self.mandos = mandos
        for client, properties in clients.items():
            self.run_on_one_client(client, properties)

class PrintCmd(Command):
    """Abstract class for commands printing client details"""
    all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
                    "Created", "Interval", "Host", "KeyID",
                    "Fingerprint", "CheckerRunning", "LastEnabled",
                    "ApprovalPending", "ApprovedByDefault",
                    "LastApprovalRequest", "ApprovalDelay",
                    "ApprovalDuration", "Checker", "ExtendedTimeout",
                    "Expires", "LastCheckerStatus")
    def run(self, mandos, clients):
        print(self.output(clients))

class PropertyCmd(Command):
    """Abstract class for Actions for setting one client property"""
    def run_on_one_client(self, client, properties):
        """Set the Client's D-Bus property"""
        client.Set(client_interface, self.property, self.value_to_set,
                   dbus_interface=dbus.PROPERTIES_IFACE)

class ValueArgumentMixIn(object):
    """Mixin class for commands taking a value as argument"""
    def __init__(self, value):
        self.value_to_set = value

class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
    """Mixin class for commands taking a value argument as
    milliseconds."""
    @property
    def value_to_set(self):
        return self._vts
    @value_to_set.setter
    def value_to_set(self, value):
        """When setting, convert value to a datetime.timedelta"""
        self._vts = int(round(value.total_seconds() * 1000))

# Actual (non-abstract) command classes

class PrintTableCmd(PrintCmd):
    def __init__(self, verbose=False):
        self.verbose = verbose

    def output(self, clients):
        default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
        keywords = default_keywords
        if self.verbose:
            keywords = self.all_keywords
        return str(self.TableOfClients(clients.values(), keywords))

    class TableOfClients(object):
        tableheaders = {
            "Name": "Name",
            "Enabled": "Enabled",
            "Timeout": "Timeout",
            "LastCheckedOK": "Last Successful Check",
            "LastApprovalRequest": "Last Approval Request",
            "Created": "Created",
            "Interval": "Interval",
            "Host": "Host",
            "Fingerprint": "Fingerprint",
            "KeyID": "Key ID",
            "CheckerRunning": "Check Is Running",
            "LastEnabled": "Last Enabled",
            "ApprovalPending": "Approval Is Pending",
            "ApprovedByDefault": "Approved By Default",
            "ApprovalDelay": "Approval Delay",
            "ApprovalDuration": "Approval Duration",
            "Checker": "Checker",
            "ExtendedTimeout": "Extended Timeout",
            "Expires": "Expires",
            "LastCheckerStatus": "Last Checker Status",
        }

        def __init__(self, clients, keywords, tableheaders=None):
            self.clients = clients
            self.keywords = keywords
            if tableheaders is not None:
                self.tableheaders = tableheaders

        def __str__(self):
            return "\n".join(self.rows())

        if sys.version_info.major == 2:
            __unicode__ = __str__
            def __str__(self):
                return str(self).encode(locale.getpreferredencoding())

        def rows(self):
            format_string = self.row_formatting_string()
            rows = [self.header_line(format_string)]
            rows.extend(self.client_line(client, format_string)
                        for client in self.clients)
            return rows

        def row_formatting_string(self):
            "Format string used to format table rows"
            return " ".join("{{{key}:{width}}}".format(
                width=max(len(self.tableheaders[key]),
                          *(len(self.string_from_client(client, key))
                            for client in self.clients)),
                key=key)
                            for key in self.keywords)

        def string_from_client(self, client, key):
            return self.valuetostring(client[key], key)

        @staticmethod
        def valuetostring(value, keyword):
            if isinstance(value, dbus.Boolean):
                return "Yes" if value else "No"
            if keyword in ("Timeout", "Interval", "ApprovalDelay",
                           "ApprovalDuration", "ExtendedTimeout"):
                return milliseconds_to_string(value)
            return str(value)

        def header_line(self, format_string):
            return format_string.format(**self.tableheaders)

        def client_line(self, client, format_string):
            return format_string.format(
                **{key: self.string_from_client(client, key)
                   for key in self.keywords})



class DumpJSONCmd(PrintCmd):
    def output(self, clients):
        data = {client["Name"]:
                {key: self.dbus_boolean_to_bool(client[key])
                 for key in self.all_keywords}
                for client in clients.values()}
        return json.dumps(data, indent=4, separators=(',', ': '))
    @staticmethod
    def dbus_boolean_to_bool(value):
        if isinstance(value, dbus.Boolean):
            value = bool(value)
        return value

class IsEnabledCmd(Command):
    def run_on_one_client(self, client, properties):
        if self.is_enabled(client, properties):
            sys.exit(0)
        sys.exit(1)
    def is_enabled(self, client, properties):
        return bool(properties["Enabled"])

class RemoveCmd(Command):
    def run_on_one_client(self, client, properties):
        self.mandos.RemoveClient(client.__dbus_object_path__)

class ApproveCmd(Command):
    def run_on_one_client(self, client, properties):
        client.Approve(dbus.Boolean(True),
                       dbus_interface=client_interface)

class DenyCmd(Command):
    def run_on_one_client(self, client, properties):
        client.Approve(dbus.Boolean(False),
                       dbus_interface=client_interface)

class EnableCmd(PropertyCmd):
    property = "Enabled"
    value_to_set = dbus.Boolean(True)

class DisableCmd(PropertyCmd):
    property = "Enabled"
    value_to_set = dbus.Boolean(False)

class BumpTimeoutCmd(PropertyCmd):
    property = "LastCheckedOK"
    value_to_set = ""

class StartCheckerCmd(PropertyCmd):
    property = "CheckerRunning"
    value_to_set = dbus.Boolean(True)

class StopCheckerCmd(PropertyCmd):
    property = "CheckerRunning"
    value_to_set = dbus.Boolean(False)

class ApproveByDefaultCmd(PropertyCmd):
    property = "ApprovedByDefault"
    value_to_set = dbus.Boolean(True)

class DenyByDefaultCmd(PropertyCmd):
    property = "ApprovedByDefault"
    value_to_set = dbus.Boolean(False)

class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
    property = "Checker"

class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
    property = "Host"

class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
    @property
    def value_to_set(self):
        return self._vts
    @value_to_set.setter
    def value_to_set(self, value):
        """When setting, read data from supplied file object"""
        self._vts = value.read()
        value.close()
    property = "Secret"

class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
    property = "Timeout"

class SetExtendedTimeoutCmd(PropertyCmd,
                            MillisecondsValueArgumentMixIn):
    property = "ExtendedTimeout"

class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
    property = "Interval"

class SetApprovalDelayCmd(PropertyCmd,
                          MillisecondsValueArgumentMixIn):
    property = "ApprovalDelay"

class SetApprovalDurationCmd(PropertyCmd,
                             MillisecondsValueArgumentMixIn):
    property = "ApprovalDuration"

def add_command_line_options(parser):
    parser.add_argument("--version", action="version",
                        version="%(prog)s {}".format(version),
                        help="show version number and exit")
    parser.add_argument("-a", "--all", action="store_true",
                        help="Select all clients")
    parser.add_argument("-v", "--verbose", action="store_true",
                        help="Print all fields")
    parser.add_argument("-j", "--dump-json", action="store_true",
                        help="Dump client data in JSON format")
    enable_disable = parser.add_mutually_exclusive_group()
    enable_disable.add_argument("-e", "--enable", action="store_true",
                                help="Enable client")
    enable_disable.add_argument("-d", "--disable",
                                action="store_true",
                                help="disable client")
    parser.add_argument("-b", "--bump-timeout", action="store_true",
                        help="Bump timeout for client")
    start_stop_checker = parser.add_mutually_exclusive_group()
    start_stop_checker.add_argument("--start-checker",
                                    action="store_true",
                                    help="Start checker for client")
    start_stop_checker.add_argument("--stop-checker",
                                    action="store_true",
                                    help="Stop checker for client")
    parser.add_argument("-V", "--is-enabled", action="store_true",
                        help="Check if client is enabled")
    parser.add_argument("-r", "--remove", action="store_true",
                        help="Remove client")
    parser.add_argument("-c", "--checker",
                        help="Set checker command for client")
    parser.add_argument("-t", "--timeout", type=string_to_delta,
                        help="Set timeout for client")
    parser.add_argument("--extended-timeout", type=string_to_delta,
                        help="Set extended timeout for client")
    parser.add_argument("-i", "--interval", type=string_to_delta,
                        help="Set checker interval for client")
    approve_deny_default = parser.add_mutually_exclusive_group()
    approve_deny_default.add_argument(
        "--approve-by-default", action="store_true",
        default=None, dest="approved_by_default",
        help="Set client to be approved by default")
    approve_deny_default.add_argument(
        "--deny-by-default", action="store_false",
        dest="approved_by_default",
        help="Set client to be denied by default")
    parser.add_argument("--approval-delay", type=string_to_delta,
                        help="Set delay before client approve/deny")
    parser.add_argument("--approval-duration", type=string_to_delta,
                        help="Set duration of one client approval")
    parser.add_argument("-H", "--host", help="Set host for client")
    parser.add_argument("-s", "--secret",
                        type=argparse.FileType(mode="rb"),
                        help="Set password blob (file) for client")
    approve_deny = parser.add_mutually_exclusive_group()
    approve_deny.add_argument(
        "-A", "--approve", action="store_true",
        help="Approve any current client request")
    approve_deny.add_argument("-D", "--deny", action="store_true",
                              help="Deny any current client request")
    parser.add_argument("--check", action="store_true",
                        help="Run self-test")
    parser.add_argument("client", nargs="*", help="Client name")


def commands_from_options(options):

    commands = []

    if options.dump_json:
        commands.append(DumpJSONCmd())

    if options.enable:
        commands.append(EnableCmd())

    if options.disable:
        commands.append(DisableCmd())

    if options.bump_timeout:
        commands.append(BumpTimeoutCmd())

    if options.start_checker:
        commands.append(StartCheckerCmd())

    if options.stop_checker:
        commands.append(StopCheckerCmd())

    if options.is_enabled:
        commands.append(IsEnabledCmd())

    if options.remove:
        commands.append(RemoveCmd())

    if options.checker is not None:
        commands.append(SetCheckerCmd(options.checker))

    if options.timeout is not None:
        commands.append(SetTimeoutCmd(options.timeout))

    if options.extended_timeout:
        commands.append(
            SetExtendedTimeoutCmd(options.extended_timeout))

    if options.interval is not None:
        commands.append(SetIntervalCmd(options.interval))

    if options.approved_by_default is not None:
        if options.approved_by_default:
            commands.append(ApproveByDefaultCmd())
        else:
            commands.append(DenyByDefaultCmd())

    if options.approval_delay is not None:
        commands.append(SetApprovalDelayCmd(options.approval_delay))

    if options.approval_duration is not None:
        commands.append(
            SetApprovalDurationCmd(options.approval_duration))

    if options.host is not None:
        commands.append(SetHostCmd(options.host))

    if options.secret is not None:
        commands.append(SetSecretCmd(options.secret))

    if options.approve:
        commands.append(ApproveCmd())

    if options.deny:
        commands.append(DenyCmd())

    # If no command option has been given, show table of clients,
    # optionally verbosely
    if not commands:
        commands.append(PrintTableCmd(verbose=options.verbose))

    return commands


def main():
    parser = argparse.ArgumentParser()

    add_command_line_options(parser)

    options = parser.parse_args()

    def has_actions(options):
        return any((options.enable,
                    options.disable,
                    options.bump_timeout,
                    options.start_checker,
                    options.stop_checker,
                    options.is_enabled,
                    options.remove,
                    options.checker is not None,
                    options.timeout is not None,
                    options.extended_timeout is not None,
                    options.interval is not None,
                    options.approved_by_default is not None,
                    options.approval_delay is not None,
                    options.approval_duration is not None,
                    options.host is not None,
                    options.secret is not None,
                    options.approve,
                    options.deny))

    if has_actions(options) and not (options.client or options.all):
        parser.error("Options require clients names or --all.")
    if options.verbose and has_actions(options):
        parser.error("--verbose can only be used alone.")
    if options.dump_json and (options.verbose
                              or has_actions(options)):
        parser.error("--dump-json can only be used alone.")
    if options.all and not has_actions(options):
        parser.error("--all requires an action.")
    if options.is_enabled and len(options.client) > 1:
        parser.error("--is-enabled requires exactly one client")

    clientnames = options.client

    try:
        bus = dbus.SystemBus()
        mandos_dbus_objc = bus.get_object(busname, server_path)
    except dbus.exceptions.DBusException:
        log.critical("Could not connect to Mandos server")
        sys.exit(1)

    mandos_serv = dbus.Interface(mandos_dbus_objc,
                                 dbus_interface=server_interface)
    mandos_serv_object_manager = dbus.Interface(
        mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)

    # Filter out log message from dbus module
    dbus_logger = logging.getLogger("dbus.proxies")
    class NullFilter(logging.Filter):
        def filter(self, record):
            return False
    dbus_filter = NullFilter()
    try:
        dbus_logger.addFilter(dbus_filter)
        mandos_clients = {path: ifs_and_props[client_interface]
                          for path, ifs_and_props in
                          mandos_serv_object_manager
                          .GetManagedObjects().items()
                          if client_interface in ifs_and_props}
    except dbus.exceptions.DBusException as e:
        log.critical("Failed to access Mandos server through D-Bus:"
                     "\n%s", e)
        sys.exit(1)
    finally:
        # restore dbus logger
        dbus_logger.removeFilter(dbus_filter)

    # Compile dict of (clients: properties) to process
    clients = {}

    if not clientnames:
        clients = {bus.get_object(busname, path): properties
                   for path, properties in mandos_clients.items()}
    else:
        for name in clientnames:
            for path, client in mandos_clients.items():
                if client["Name"] == name:
                    client_objc = bus.get_object(busname, path)
                    clients[client_objc] = client
                    break
            else:
                log.critical("Client not found on server: %r", name)
                sys.exit(1)

    # Run all commands on clients
    commands = commands_from_options(options)
    for command in commands:
        command.run(mandos_serv, clients)


class Test_milliseconds_to_string(unittest.TestCase):
    def test_all(self):
        self.assertEqual(milliseconds_to_string(93785000),
                         "1T02:03:05")
    def test_no_days(self):
        self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
    def test_all_zero(self):
        self.assertEqual(milliseconds_to_string(0), "00:00:00")
    def test_no_fractional_seconds(self):
        self.assertEqual(milliseconds_to_string(400), "00:00:00")
        self.assertEqual(milliseconds_to_string(900), "00:00:00")
        self.assertEqual(milliseconds_to_string(1900), "00:00:01")

class Test_string_to_delta(unittest.TestCase):
    def test_handles_basic_rfc3339(self):
        self.assertEqual(string_to_delta("PT0S"),
                         datetime.timedelta())
        self.assertEqual(string_to_delta("P0D"),
                         datetime.timedelta())
        self.assertEqual(string_to_delta("PT1S"),
                         datetime.timedelta(0, 1))
        self.assertEqual(string_to_delta("PT2H"),
                         datetime.timedelta(0, 7200))
    def test_falls_back_to_pre_1_6_1_with_warning(self):
        # assertLogs only exists in Python 3.4
        if hasattr(self, "assertLogs"):
            with self.assertLogs(log, logging.WARNING):
                value = string_to_delta("2h")
        else:
            class WarningFilter(logging.Filter):
                """Don't show, but record the presence of, warnings"""
                def filter(self, record):
                    is_warning = record.levelno >= logging.WARNING
                    self.found = is_warning or getattr(self, "found",
                                                       False)
                    return not is_warning
            warning_filter = WarningFilter()
            log.addFilter(warning_filter)
            try:
                value = string_to_delta("2h")
            finally:
                log.removeFilter(warning_filter)
            self.assertTrue(getattr(warning_filter, "found", False))
        self.assertEqual(value, datetime.timedelta(0, 7200))


class TestCmd(unittest.TestCase):
    """Abstract class for tests of command classes"""
    def setUp(self):
        testcase = self
        class MockClient(object):
            def __init__(self, name, **attributes):
                self.__dbus_object_path__ = "objpath_{}".format(name)
                self.attributes = attributes
                self.attributes["Name"] = name
                self.calls = []
            def Set(self, interface, property, value, dbus_interface):
                testcase.assertEqual(interface, client_interface)
                testcase.assertEqual(dbus_interface,
                                     dbus.PROPERTIES_IFACE)
                self.attributes[property] = value
            def Get(self, interface, property, dbus_interface):
                testcase.assertEqual(interface, client_interface)
                testcase.assertEqual(dbus_interface,
                                     dbus.PROPERTIES_IFACE)
                return self.attributes[property]
            def Approve(self, approve, dbus_interface):
                testcase.assertEqual(dbus_interface, client_interface)
                self.calls.append(("Approve", (approve,
                                               dbus_interface)))
        self.client = MockClient(
            "foo",
            KeyID=("92ed150794387c03ce684574b1139a65"
                   "94a34f895daaaf09fd8ea90a27cddb12"),
            Secret=b"secret",
            Host="foo.example.org",
            Enabled=dbus.Boolean(True),
            Timeout=300000,
            LastCheckedOK="2019-02-03T00:00:00",
            Created="2019-01-02T00:00:00",
            Interval=120000,
            Fingerprint=("778827225BA7DE539C5A"
                         "7CFA59CFF7CDBD9A5920"),
            CheckerRunning=dbus.Boolean(False),
            LastEnabled="2019-01-03T00:00:00",
            ApprovalPending=dbus.Boolean(False),
            ApprovedByDefault=dbus.Boolean(True),
            LastApprovalRequest="",
            ApprovalDelay=0,
            ApprovalDuration=1000,
            Checker="fping -q -- %(host)s",
            ExtendedTimeout=900000,
            Expires="2019-02-04T00:00:00",
            LastCheckerStatus=0)
        self.other_client = MockClient(
            "barbar",
            KeyID=("0558568eedd67d622f5c83b35a115f79"
                   "6ab612cff5ad227247e46c2b020f441c"),
            Secret=b"secretbar",
            Host="192.0.2.3",
            Enabled=dbus.Boolean(True),
            Timeout=300000,
            LastCheckedOK="2019-02-04T00:00:00",
            Created="2019-01-03T00:00:00",
            Interval=120000,
            Fingerprint=("3E393AEAEFB84C7E89E2"
                         "F547B3A107558FCA3A27"),
            CheckerRunning=dbus.Boolean(True),
            LastEnabled="2019-01-04T00:00:00",
            ApprovalPending=dbus.Boolean(False),
            ApprovedByDefault=dbus.Boolean(False),
            LastApprovalRequest="2019-01-03T00:00:00",
            ApprovalDelay=30000,
            ApprovalDuration=1000,
            Checker=":",
            ExtendedTimeout=900000,
            Expires="2019-02-05T00:00:00",
            LastCheckerStatus=-2)
        self.clients =  collections.OrderedDict(
            [
                (self.client, self.client.attributes),
                (self.other_client, self.other_client.attributes),
            ])
        self.one_client = {self.client: self.client.attributes}

class TestPrintTableCmd(TestCmd):
    def test_normal(self):
        output = PrintTableCmd().output(self.clients)
        expected_output = """
Name   Enabled Timeout  Last Successful Check
foo    Yes     00:05:00 2019-02-03T00:00:00  
barbar Yes     00:05:00 2019-02-04T00:00:00  
"""[1:-1]
        self.assertEqual(output, expected_output)
    def test_verbose(self):
        output = PrintTableCmd(verbose=True).output(self.clients)
        expected_output = """
Name   Enabled Timeout  Last Successful Check Created             Interval Host            Key ID                                                           Fingerprint                              Check Is Running Last Enabled        Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker              Extended Timeout Expires             Last Checker Status
foo    Yes     00:05:00 2019-02-03T00:00:00   2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No               2019-01-03T00:00:00 No                  Yes                                       00:00:00       00:00:01          fping -q -- %(host)s 00:15:00         2019-02-04T00:00:00 0                  
barbar Yes     00:05:00 2019-02-04T00:00:00   2019-01-03T00:00:00 00:02:00 192.0.2.3       0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes              2019-01-04T00:00:00 No                  No                  2019-01-03T00:00:00   00:00:30       00:00:01          :                    00:15:00         2019-02-05T00:00:00 -2                 
"""[1:-1]
        self.assertEqual(output, expected_output)
    def test_one_client(self):
        output = PrintTableCmd().output(self.one_client)
        expected_output = """
Name Enabled Timeout  Last Successful Check
foo  Yes     00:05:00 2019-02-03T00:00:00  
"""[1:-1]
        self.assertEqual(output, expected_output)

class TestDumpJSONCmd(TestCmd):
    def setUp(self):
        self.expected_json = {
            "foo": {
                "Name": "foo",
                "KeyID": ("92ed150794387c03ce684574b1139a65"
                          "94a34f895daaaf09fd8ea90a27cddb12"),
                "Host": "foo.example.org",
                "Enabled": True,
                "Timeout": 300000,
                "LastCheckedOK": "2019-02-03T00:00:00",
                "Created": "2019-01-02T00:00:00",
                "Interval": 120000,
                "Fingerprint": ("778827225BA7DE539C5A"
                                "7CFA59CFF7CDBD9A5920"),
                "CheckerRunning": False,
                "LastEnabled": "2019-01-03T00:00:00",
                "ApprovalPending": False,
                "ApprovedByDefault": True,
                "LastApprovalRequest": "",
                "ApprovalDelay": 0,
                "ApprovalDuration": 1000,
                "Checker": "fping -q -- %(host)s",
                "ExtendedTimeout": 900000,
                "Expires": "2019-02-04T00:00:00",
                "LastCheckerStatus": 0,
            },
            "barbar": {
                "Name": "barbar",
                "KeyID": ("0558568eedd67d622f5c83b35a115f79"
                          "6ab612cff5ad227247e46c2b020f441c"),
                "Host": "192.0.2.3",
                "Enabled": True,
                "Timeout": 300000,
                "LastCheckedOK": "2019-02-04T00:00:00",
                "Created": "2019-01-03T00:00:00",
                "Interval": 120000,
                "Fingerprint": ("3E393AEAEFB84C7E89E2"
                                "F547B3A107558FCA3A27"),
                "CheckerRunning": True,
                "LastEnabled": "2019-01-04T00:00:00",
                "ApprovalPending": False,
                "ApprovedByDefault": False,
                "LastApprovalRequest": "2019-01-03T00:00:00",
                "ApprovalDelay": 30000,
                "ApprovalDuration": 1000,
                "Checker": ":",
                "ExtendedTimeout": 900000,
                "Expires": "2019-02-05T00:00:00",
                "LastCheckerStatus": -2,
            },
        }
        return super(TestDumpJSONCmd, self).setUp()
    def test_normal(self):
        json_data = json.loads(DumpJSONCmd().output(self.clients))
        self.assertDictEqual(json_data, self.expected_json)
    def test_one_client(self):
        clients = self.one_client
        json_data = json.loads(DumpJSONCmd().output(clients))
        expected_json = {"foo": self.expected_json["foo"]}
        self.assertDictEqual(json_data, expected_json)

class TestIsEnabledCmd(TestCmd):
    def test_is_enabled(self):
        self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
                            for client, properties in self.clients.items()))
    def test_is_enabled_run_exits_successfully(self):
        with self.assertRaises(SystemExit) as e:
            IsEnabledCmd().run(None, self.one_client)
        if e.exception.code is not None:
            self.assertEqual(e.exception.code, 0)
        else:
            self.assertIsNone(e.exception.code)
    def test_is_enabled_run_exits_with_failure(self):
        self.client.attributes["Enabled"] = dbus.Boolean(False)
        with self.assertRaises(SystemExit) as e:
            IsEnabledCmd().run(None, self.one_client)
        if isinstance(e.exception.code, int):
            self.assertNotEqual(e.exception.code, 0)
        else:
            self.assertIsNotNone(e.exception.code)

class TestRemoveCmd(TestCmd):
    def test_remove(self):
        class MockMandos(object):
            def __init__(self):
                self.calls = []
            def RemoveClient(self, dbus_path):
                self.calls.append(("RemoveClient", (dbus_path,)))
        mandos = MockMandos()
        super(TestRemoveCmd, self).setUp()
        RemoveCmd().run(mandos, self.clients)
        self.assertEqual(len(mandos.calls), 2)
        for client in self.clients:
            self.assertIn(("RemoveClient",
                           (client.__dbus_object_path__,)),
                          mandos.calls)

class TestApproveCmd(TestCmd):
    def test_approve(self):
        ApproveCmd().run(None, self.clients)
        for client in self.clients:
            self.assertIn(("Approve", (True, client_interface)),
                          client.calls)

class TestDenyCmd(TestCmd):
    def test_deny(self):
        DenyCmd().run(None, self.clients)
        for client in self.clients:
            self.assertIn(("Approve", (False, client_interface)),
                          client.calls)

class TestEnableCmd(TestCmd):
    def test_enable(self):
        for client in self.clients:
            client.attributes["Enabled"] = False

        EnableCmd().run(None, self.clients)

        for client in self.clients:
            self.assertTrue(client.attributes["Enabled"])

class TestDisableCmd(TestCmd):
    def test_disable(self):
        DisableCmd().run(None, self.clients)

        for client in self.clients:
            self.assertFalse(client.attributes["Enabled"])

class Unique(object):
    """Class for objects which exist only to be unique objects, since
unittest.mock.sentinel only exists in Python 3.3"""

class TestPropertyCmd(TestCmd):
    """Abstract class for tests of PropertyCmd classes"""
    def runTest(self):
        if not hasattr(self, "command"):
            return
        values_to_get = getattr(self, "values_to_get",
                                self.values_to_set)
        for value_to_set, value_to_get in zip(self.values_to_set,
                                              values_to_get):
            for client in self.clients:
                old_value = client.attributes[self.property]
                self.assertNotIsInstance(old_value, Unique)
                client.attributes[self.property] = Unique()
            self.run_command(value_to_set, self.clients)
            for client in self.clients:
                value = client.attributes[self.property]
                self.assertNotIsInstance(value, Unique)
                self.assertEqual(value, value_to_get)
    def run_command(self, value, clients):
        self.command().run(None, clients)

class TestBumpTimeoutCmd(TestPropertyCmd):
    command = BumpTimeoutCmd
    property = "LastCheckedOK"
    values_to_set = [""]

class TestStartCheckerCmd(TestPropertyCmd):
    command = StartCheckerCmd
    property = "CheckerRunning"
    values_to_set = [dbus.Boolean(True)]

class TestStopCheckerCmd(TestPropertyCmd):
    command = StopCheckerCmd
    property = "CheckerRunning"
    values_to_set = [dbus.Boolean(False)]

class TestApproveByDefaultCmd(TestPropertyCmd):
    command = ApproveByDefaultCmd
    property = "ApprovedByDefault"
    values_to_set = [dbus.Boolean(True)]

class TestDenyByDefaultCmd(TestPropertyCmd):
    command = DenyByDefaultCmd
    property = "ApprovedByDefault"
    values_to_set = [dbus.Boolean(False)]

class TestValueArgumentPropertyCmd(TestPropertyCmd):
    """Abstract class for tests of PropertyCmd classes using the
ValueArgumentMixIn"""
    def runTest(self):
        if type(self) is TestValueArgumentPropertyCmd:
            return
        return super(TestValueArgumentPropertyCmd, self).runTest()
    def run_command(self, value, clients):
        self.command(value).run(None, clients)

class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
    command = SetCheckerCmd
    property = "Checker"
    values_to_set = ["", ":", "fping -q -- %s"]

class TestSetHostCmd(TestValueArgumentPropertyCmd):
    command = SetHostCmd
    property = "Host"
    values_to_set = ["192.0.2.3", "foo.example.org"]

class TestSetSecretCmd(TestValueArgumentPropertyCmd):
    command = SetSecretCmd
    property = "Secret"
    values_to_set = [open("/dev/null", "rb"),
                     io.BytesIO(b"secret\0xyzzy\nbar")]
    values_to_get = [b"", b"secret\0xyzzy\nbar"]

class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
    command = SetTimeoutCmd
    property = "Timeout"
    values_to_set = [datetime.timedelta(),
                     datetime.timedelta(minutes=5),
                     datetime.timedelta(seconds=1),
                     datetime.timedelta(weeks=1),
                     datetime.timedelta(weeks=52)]
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]

class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
    command = SetExtendedTimeoutCmd
    property = "ExtendedTimeout"
    values_to_set = [datetime.timedelta(),
                     datetime.timedelta(minutes=5),
                     datetime.timedelta(seconds=1),
                     datetime.timedelta(weeks=1),
                     datetime.timedelta(weeks=52)]
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]

class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
    command = SetIntervalCmd
    property = "Interval"
    values_to_set = [datetime.timedelta(),
                     datetime.timedelta(minutes=5),
                     datetime.timedelta(seconds=1),
                     datetime.timedelta(weeks=1),
                     datetime.timedelta(weeks=52)]
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]

class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
    command = SetApprovalDelayCmd
    property = "ApprovalDelay"
    values_to_set = [datetime.timedelta(),
                     datetime.timedelta(minutes=5),
                     datetime.timedelta(seconds=1),
                     datetime.timedelta(weeks=1),
                     datetime.timedelta(weeks=52)]
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]

class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
    command = SetApprovalDurationCmd
    property = "ApprovalDuration"
    values_to_set = [datetime.timedelta(),
                     datetime.timedelta(minutes=5),
                     datetime.timedelta(seconds=1),
                     datetime.timedelta(weeks=1),
                     datetime.timedelta(weeks=52)]
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]

class Test_command_from_options(unittest.TestCase):
    def setUp(self):
        self.parser = argparse.ArgumentParser()
        add_command_line_options(self.parser)
    def assert_command_from_args(self, args, command_cls, **cmd_attrs):
        """Assert that parsing ARGS should result in an instance of
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
        options = self.parser.parse_args(args)
        commands = commands_from_options(options)
        self.assertEqual(len(commands), 1)
        command = commands[0]
        self.assertIsInstance(command, command_cls)
        for key, value in cmd_attrs.items():
            self.assertEqual(getattr(command, key), value)
    def test_print_table(self):
        self.assert_command_from_args([], PrintTableCmd,
                                      verbose=False)

    def test_print_table_verbose(self):
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
                                      verbose=True)

    def test_print_table_verbose_short(self):
        self.assert_command_from_args(["-v"], PrintTableCmd,
                                      verbose=True)

    def test_enable(self):
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)

    def test_enable_short(self):
        self.assert_command_from_args(["-e", "foo"], EnableCmd)

    def test_disable(self):
        self.assert_command_from_args(["--disable", "foo"],
                                      DisableCmd)

    def test_disable_short(self):
        self.assert_command_from_args(["-d", "foo"], DisableCmd)

    def test_bump_timeout(self):
        self.assert_command_from_args(["--bump-timeout", "foo"],
                                      BumpTimeoutCmd)

    def test_bump_timeout_short(self):
        self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)

    def test_start_checker(self):
        self.assert_command_from_args(["--start-checker", "foo"],
                                      StartCheckerCmd)

    def test_stop_checker(self):
        self.assert_command_from_args(["--stop-checker", "foo"],
                                      StopCheckerCmd)

    def test_remove(self):
        self.assert_command_from_args(["--remove", "foo"],
                                      RemoveCmd)

    def test_remove_short(self):
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)

    def test_checker(self):
        self.assert_command_from_args(["--checker", ":", "foo"],
                                      SetCheckerCmd, value_to_set=":")

    def test_checker_empty(self):
        self.assert_command_from_args(["--checker", "", "foo"],
                                      SetCheckerCmd, value_to_set="")

    def test_checker_short(self):
        self.assert_command_from_args(["-c", ":", "foo"],
                                      SetCheckerCmd, value_to_set=":")

    def test_timeout(self):
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
                                      SetTimeoutCmd,
                                      value_to_set=300000)

    def test_timeout_short(self):
        self.assert_command_from_args(["-t", "PT5M", "foo"],
                                      SetTimeoutCmd,
                                      value_to_set=300000)

    def test_extended_timeout(self):
        self.assert_command_from_args(["--extended-timeout", "PT15M",
                                       "foo"],
                                      SetExtendedTimeoutCmd,
                                      value_to_set=900000)

    def test_interval(self):
        self.assert_command_from_args(["--interval", "PT2M", "foo"],
                                      SetIntervalCmd,
                                      value_to_set=120000)

    def test_interval_short(self):
        self.assert_command_from_args(["-i", "PT2M", "foo"],
                                      SetIntervalCmd,
                                      value_to_set=120000)

    def test_approve_by_default(self):
        self.assert_command_from_args(["--approve-by-default", "foo"],
                                      ApproveByDefaultCmd)

    def test_deny_by_default(self):
        self.assert_command_from_args(["--deny-by-default", "foo"],
                                      DenyByDefaultCmd)

    def test_approval_delay(self):
        self.assert_command_from_args(["--approval-delay", "PT30S",
                                       "foo"], SetApprovalDelayCmd,
                                      value_to_set=30000)

    def test_approval_duration(self):
        self.assert_command_from_args(["--approval-duration", "PT1S",
                                       "foo"], SetApprovalDurationCmd,
                                      value_to_set=1000)

    def test_host(self):
        self.assert_command_from_args(["--host", "foo.example.org",
                                       "foo"], SetHostCmd,
                                      value_to_set="foo.example.org")

    def test_host_short(self):
        self.assert_command_from_args(["-H", "foo.example.org",
                                       "foo"], SetHostCmd,
                                      value_to_set="foo.example.org")

    def test_secret_devnull(self):
        self.assert_command_from_args(["--secret", os.path.devnull,
                                       "foo"], SetSecretCmd,
                                      value_to_set=b"")

    def test_secret_tempfile(self):
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
            value = b"secret\0xyzzy\nbar"
            f.write(value)
            f.seek(0)
            self.assert_command_from_args(["--secret", f.name,
                                           "foo"], SetSecretCmd,
                                          value_to_set=value)

    def test_secret_devnull_short(self):
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
                                      SetSecretCmd, value_to_set=b"")

    def test_secret_tempfile_short(self):
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
            value = b"secret\0xyzzy\nbar"
            f.write(value)
            f.seek(0)
            self.assert_command_from_args(["-s", f.name, "foo"],
                                          SetSecretCmd,
                                          value_to_set=value)

    def test_approve(self):
        self.assert_command_from_args(["--approve", "foo"],
                                      ApproveCmd)

    def test_approve_short(self):
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)

    def test_deny(self):
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)

    def test_deny_short(self):
        self.assert_command_from_args(["-D", "foo"], DenyCmd)

    def test_dump_json(self):
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)

    def test_is_enabled(self):
        self.assert_command_from_args(["--is-enabled", "foo"],
                                      IsEnabledCmd)

    def test_is_enabled_short(self):
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)



def should_only_run_tests():
    parser = argparse.ArgumentParser(add_help=False)
    parser.add_argument("--check", action='store_true')
    args, unknown_args = parser.parse_known_args()
    run_tests = args.check
    if run_tests:
        # Remove --check argument from sys.argv
        sys.argv[1:] = unknown_args
    return run_tests

# Add all tests from doctest strings
def load_tests(loader, tests, none):
    import doctest
    tests.addTests(doctest.DocTestSuite())
    return tests

if __name__ == "__main__":
    if should_only_run_tests():
        # Call using ./tdd-python-script --check [--verbose]
        unittest.main()
    else:
        main()
