/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-12 19:15:52 UTC
  • Revision ID: teddy@recompile.se-20190312191552-f1di4dzya1pzxc9a
mandos-ctl: Refactor

* mandos-ctl (TestPrintTableCmd.test_verbose): Reformat for easier
                                               editing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
43
43
import unittest
44
44
import logging
45
45
import io
 
46
import tempfile
 
47
import contextlib
46
48
 
47
49
import dbus
48
50
 
62
64
 
63
65
locale.setlocale(locale.LC_ALL, "")
64
66
 
65
 
domain = "se.recompile"
66
 
busname = domain + ".Mandos"
67
 
server_path = "/"
68
 
server_interface = domain + ".Mandos"
69
 
client_interface = domain + ".Mandos.Client"
 
67
dbus_busname_domain = "se.recompile"
 
68
dbus_busname = dbus_busname_domain + ".Mandos"
 
69
server_dbus_path = "/"
 
70
server_dbus_interface = dbus_busname_domain + ".Mandos"
 
71
client_dbus_interface = dbus_busname_domain + ".Mandos.Client"
 
72
del dbus_busname_domain
70
73
version = "1.8.3"
71
74
 
72
75
 
274
277
# Abstract classes first
275
278
class Command(object):
276
279
    """Abstract class for commands"""
277
 
    def run(self, mandos, clients):
 
280
    def run(self, clients, bus=None, mandos=None):
278
281
        """Normal commands should implement run_on_one_client(), but
279
282
        commands which want to operate on all clients at the same time
280
283
        can override this run() method instead."""
281
284
        self.mandos = mandos
282
 
        for client, properties in clients.items():
 
285
        for clientpath, properties in clients.items():
 
286
            log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
 
287
                      dbus_busname, str(clientpath))
 
288
            client = bus.get_object(dbus_busname, clientpath)
283
289
            self.run_on_one_client(client, properties)
284
290
 
285
291
class PrintCmd(Command):
291
297
                    "LastApprovalRequest", "ApprovalDelay",
292
298
                    "ApprovalDuration", "Checker", "ExtendedTimeout",
293
299
                    "Expires", "LastCheckerStatus")
294
 
    def run(self, mandos, clients):
295
 
        print(self.output(clients))
 
300
    def run(self, clients, bus=None, mandos=None):
 
301
        print(self.output(clients.values()))
 
302
    def output(self, clients):
 
303
        raise NotImplementedError()
296
304
 
297
305
class PropertyCmd(Command):
298
306
    """Abstract class for Actions for setting one client property"""
299
307
    def run_on_one_client(self, client, properties):
300
308
        """Set the Client's D-Bus property"""
301
 
        client.Set(client_interface, self.property, self.value_to_set,
 
309
        log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
 
310
                  client.__dbus_object_path__,
 
311
                  dbus.PROPERTIES_IFACE, client_dbus_interface,
 
312
                  self.propname, self.value_to_set
 
313
                  if not isinstance(self.value_to_set, dbus.Boolean)
 
314
                  else bool(self.value_to_set))
 
315
        client.Set(client_dbus_interface, self.propname,
 
316
                   self.value_to_set,
302
317
                   dbus_interface=dbus.PROPERTIES_IFACE)
 
318
    @property
 
319
    def propname(self):
 
320
        raise NotImplementedError()
303
321
 
304
 
class ValueArgumentMixIn(object):
305
 
    """Mixin class for commands taking a value as argument"""
 
322
class PropertyValueCmd(PropertyCmd):
 
323
    """Abstract class for PropertyCmd recieving a value as argument"""
306
324
    def __init__(self, value):
307
325
        self.value_to_set = value
308
326
 
309
 
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
310
 
    """Mixin class for commands taking a value argument as
311
 
    milliseconds."""
 
327
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
 
328
    """Abstract class for PropertyValueCmd taking a value argument as
 
329
a datetime.timedelta() but should store it as milliseconds."""
312
330
    @property
313
331
    def value_to_set(self):
314
332
        return self._vts
315
333
    @value_to_set.setter
316
334
    def value_to_set(self, value):
317
 
        """When setting, convert value to a datetime.timedelta"""
318
 
        self._vts = string_to_delta(value).total_seconds() * 1000
 
335
        """When setting, convert value from a datetime.timedelta"""
 
336
        self._vts = int(round(value.total_seconds() * 1000))
319
337
 
320
338
# Actual (non-abstract) command classes
321
339
 
324
342
        self.verbose = verbose
325
343
 
326
344
    def output(self, clients):
327
 
        default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
 
345
        default_keywords = ("Name", "Enabled", "Timeout",
 
346
                            "LastCheckedOK")
328
347
        keywords = default_keywords
329
348
        if self.verbose:
330
349
            keywords = self.all_keywords
331
 
        return str(self.TableOfClients(clients.values(), keywords))
 
350
        return str(self.TableOfClients(clients, keywords))
332
351
 
333
352
    class TableOfClients(object):
334
353
        tableheaders = {
420
439
        return value
421
440
 
422
441
class IsEnabledCmd(Command):
423
 
    def run_on_one_client(self, client, properties):
 
442
    def run(self, clients, bus=None, mandos=None):
 
443
        client, properties = next(iter(clients.items()))
424
444
        if self.is_enabled(client, properties):
425
445
            sys.exit(0)
426
446
        sys.exit(1)
427
447
    def is_enabled(self, client, properties):
428
 
        return bool(properties["Enabled"])
 
448
        return properties["Enabled"]
429
449
 
430
450
class RemoveCmd(Command):
431
451
    def run_on_one_client(self, client, properties):
 
452
        log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname,
 
453
                  server_dbus_path, server_dbus_interface,
 
454
                  str(client.__dbus_object_path__))
432
455
        self.mandos.RemoveClient(client.__dbus_object_path__)
433
456
 
434
457
class ApproveCmd(Command):
435
458
    def run_on_one_client(self, client, properties):
 
459
        log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
 
460
                  client.__dbus_object_path__, client_dbus_interface)
436
461
        client.Approve(dbus.Boolean(True),
437
 
                       dbus_interface=client_interface)
 
462
                       dbus_interface=client_dbus_interface)
438
463
 
439
464
class DenyCmd(Command):
440
465
    def run_on_one_client(self, client, properties):
 
466
        log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
 
467
                  client.__dbus_object_path__, client_dbus_interface)
441
468
        client.Approve(dbus.Boolean(False),
442
 
                       dbus_interface=client_interface)
 
469
                       dbus_interface=client_dbus_interface)
443
470
 
444
471
class EnableCmd(PropertyCmd):
445
 
    property = "Enabled"
 
472
    propname = "Enabled"
446
473
    value_to_set = dbus.Boolean(True)
447
474
 
448
475
class DisableCmd(PropertyCmd):
449
 
    property = "Enabled"
 
476
    propname = "Enabled"
450
477
    value_to_set = dbus.Boolean(False)
451
478
 
452
479
class BumpTimeoutCmd(PropertyCmd):
453
 
    property = "LastCheckedOK"
 
480
    propname = "LastCheckedOK"
454
481
    value_to_set = ""
455
482
 
456
483
class StartCheckerCmd(PropertyCmd):
457
 
    property = "CheckerRunning"
 
484
    propname = "CheckerRunning"
458
485
    value_to_set = dbus.Boolean(True)
459
486
 
460
487
class StopCheckerCmd(PropertyCmd):
461
 
    property = "CheckerRunning"
 
488
    propname = "CheckerRunning"
462
489
    value_to_set = dbus.Boolean(False)
463
490
 
464
491
class ApproveByDefaultCmd(PropertyCmd):
465
 
    property = "ApprovedByDefault"
 
492
    propname = "ApprovedByDefault"
466
493
    value_to_set = dbus.Boolean(True)
467
494
 
468
495
class DenyByDefaultCmd(PropertyCmd):
469
 
    property = "ApprovedByDefault"
 
496
    propname = "ApprovedByDefault"
470
497
    value_to_set = dbus.Boolean(False)
471
498
 
472
 
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
473
 
    property = "Checker"
474
 
 
475
 
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
476
 
    property = "Host"
477
 
 
478
 
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
 
499
class SetCheckerCmd(PropertyValueCmd):
 
500
    propname = "Checker"
 
501
 
 
502
class SetHostCmd(PropertyValueCmd):
 
503
    propname = "Host"
 
504
 
 
505
class SetSecretCmd(PropertyValueCmd):
 
506
    propname = "Secret"
479
507
    @property
480
508
    def value_to_set(self):
481
509
        return self._vts
484
512
        """When setting, read data from supplied file object"""
485
513
        self._vts = value.read()
486
514
        value.close()
487
 
    property = "Secret"
488
 
 
489
 
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
490
 
    property = "Timeout"
491
 
 
492
 
class SetExtendedTimeoutCmd(PropertyCmd,
493
 
                            MillisecondsValueArgumentMixIn):
494
 
    property = "ExtendedTimeout"
495
 
 
496
 
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
497
 
    property = "Interval"
498
 
 
499
 
class SetApprovalDelayCmd(PropertyCmd,
500
 
                          MillisecondsValueArgumentMixIn):
501
 
    property = "ApprovalDelay"
502
 
 
503
 
class SetApprovalDurationCmd(PropertyCmd,
504
 
                             MillisecondsValueArgumentMixIn):
505
 
    property = "ApprovalDuration"
506
 
 
507
 
def has_actions(options):
508
 
    return any((options.enable,
509
 
                options.disable,
510
 
                options.bump_timeout,
511
 
                options.start_checker,
512
 
                options.stop_checker,
513
 
                options.is_enabled,
514
 
                options.remove,
515
 
                options.checker is not None,
516
 
                options.timeout is not None,
517
 
                options.extended_timeout is not None,
518
 
                options.interval is not None,
519
 
                options.approved_by_default is not None,
520
 
                options.approval_delay is not None,
521
 
                options.approval_duration is not None,
522
 
                options.host is not None,
523
 
                options.secret is not None,
524
 
                options.approve,
525
 
                options.deny))
 
515
 
 
516
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
 
517
    propname = "Timeout"
 
518
 
 
519
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
 
520
    propname = "ExtendedTimeout"
 
521
 
 
522
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
 
523
    propname = "Interval"
 
524
 
 
525
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
 
526
    propname = "ApprovalDelay"
 
527
 
 
528
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
 
529
    propname = "ApprovalDuration"
526
530
 
527
531
def add_command_line_options(parser):
528
532
    parser.add_argument("--version", action="version",
555
559
                        help="Remove client")
556
560
    parser.add_argument("-c", "--checker",
557
561
                        help="Set checker command for client")
558
 
    parser.add_argument("-t", "--timeout",
 
562
    parser.add_argument("-t", "--timeout", type=string_to_delta,
559
563
                        help="Set timeout for client")
560
 
    parser.add_argument("--extended-timeout",
 
564
    parser.add_argument("--extended-timeout", type=string_to_delta,
561
565
                        help="Set extended timeout for client")
562
 
    parser.add_argument("-i", "--interval",
 
566
    parser.add_argument("-i", "--interval", type=string_to_delta,
563
567
                        help="Set checker interval for client")
564
568
    approve_deny_default = parser.add_mutually_exclusive_group()
565
569
    approve_deny_default.add_argument(
570
574
        "--deny-by-default", action="store_false",
571
575
        dest="approved_by_default",
572
576
        help="Set client to be denied by default")
573
 
    parser.add_argument("--approval-delay",
 
577
    parser.add_argument("--approval-delay", type=string_to_delta,
574
578
                        help="Set delay before client approve/deny")
575
 
    parser.add_argument("--approval-duration",
 
579
    parser.add_argument("--approval-duration", type=string_to_delta,
576
580
                        help="Set duration of one client approval")
577
581
    parser.add_argument("-H", "--host", help="Set host for client")
578
582
    parser.add_argument("-s", "--secret",
584
588
        help="Approve any current client request")
585
589
    approve_deny.add_argument("-D", "--deny", action="store_true",
586
590
                              help="Deny any current client request")
 
591
    parser.add_argument("--debug", action="store_true",
 
592
                        help="Debug mode (show D-Bus commands)")
587
593
    parser.add_argument("--check", action="store_true",
588
594
                        help="Run self-test")
589
595
    parser.add_argument("client", nargs="*", help="Client name")
614
620
    if options.is_enabled:
615
621
        commands.append(IsEnabledCmd())
616
622
 
617
 
    if options.remove:
618
 
        commands.append(RemoveCmd())
619
 
 
620
623
    if options.checker is not None:
621
624
        commands.append(SetCheckerCmd(options.checker))
622
625
 
655
658
    if options.deny:
656
659
        commands.append(DenyCmd())
657
660
 
 
661
    if options.remove:
 
662
        commands.append(RemoveCmd())
 
663
 
658
664
    # If no command option has been given, show table of clients,
659
665
    # optionally verbosely
660
666
    if not commands:
663
669
    return commands
664
670
 
665
671
 
666
 
def main():
667
 
    parser = argparse.ArgumentParser()
668
 
 
669
 
    add_command_line_options(parser)
670
 
 
671
 
    options = parser.parse_args()
 
672
def check_option_syntax(parser, options):
 
673
    """Apply additional restrictions on options, not expressible in
 
674
argparse"""
 
675
 
 
676
    def has_actions(options):
 
677
        return any((options.enable,
 
678
                    options.disable,
 
679
                    options.bump_timeout,
 
680
                    options.start_checker,
 
681
                    options.stop_checker,
 
682
                    options.is_enabled,
 
683
                    options.remove,
 
684
                    options.checker is not None,
 
685
                    options.timeout is not None,
 
686
                    options.extended_timeout is not None,
 
687
                    options.interval is not None,
 
688
                    options.approved_by_default is not None,
 
689
                    options.approval_delay is not None,
 
690
                    options.approval_duration is not None,
 
691
                    options.host is not None,
 
692
                    options.secret is not None,
 
693
                    options.approve,
 
694
                    options.deny))
672
695
 
673
696
    if has_actions(options) and not (options.client or options.all):
674
697
        parser.error("Options require clients names or --all.")
681
704
        parser.error("--all requires an action.")
682
705
    if options.is_enabled and len(options.client) > 1:
683
706
        parser.error("--is-enabled requires exactly one client")
 
707
    if options.remove:
 
708
        options.remove = False
 
709
        if has_actions(options) and not options.deny:
 
710
            parser.error("--remove can only be combined with --deny")
 
711
        options.remove = True
 
712
 
 
713
 
 
714
def main():
 
715
    parser = argparse.ArgumentParser()
 
716
 
 
717
    add_command_line_options(parser)
 
718
 
 
719
    options = parser.parse_args()
 
720
 
 
721
    check_option_syntax(parser, options)
684
722
 
685
723
    clientnames = options.client
686
724
 
 
725
    if options.debug:
 
726
        log.setLevel(logging.DEBUG)
 
727
 
687
728
    try:
688
729
        bus = dbus.SystemBus()
689
 
        mandos_dbus_objc = bus.get_object(busname, server_path)
 
730
        log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
 
731
                  dbus_busname, server_dbus_path)
 
732
        mandos_dbus_objc = bus.get_object(dbus_busname,
 
733
                                          server_dbus_path)
690
734
    except dbus.exceptions.DBusException:
691
735
        log.critical("Could not connect to Mandos server")
692
736
        sys.exit(1)
693
737
 
694
738
    mandos_serv = dbus.Interface(mandos_dbus_objc,
695
 
                                 dbus_interface=server_interface)
 
739
                                 dbus_interface=server_dbus_interface)
696
740
    mandos_serv_object_manager = dbus.Interface(
697
741
        mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
698
742
 
704
748
    dbus_filter = NullFilter()
705
749
    try:
706
750
        dbus_logger.addFilter(dbus_filter)
707
 
        mandos_clients = {path: ifs_and_props[client_interface]
 
751
        log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
 
752
                  server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
 
753
        mandos_clients = {path: ifs_and_props[client_dbus_interface]
708
754
                          for path, ifs_and_props in
709
755
                          mandos_serv_object_manager
710
756
                          .GetManagedObjects().items()
711
 
                          if client_interface in ifs_and_props}
 
757
                          if client_dbus_interface in ifs_and_props}
712
758
    except dbus.exceptions.DBusException as e:
713
759
        log.critical("Failed to access Mandos server through D-Bus:"
714
760
                     "\n%s", e)
721
767
    clients = {}
722
768
 
723
769
    if not clientnames:
724
 
        clients = {bus.get_object(busname, path): properties
725
 
                   for path, properties in mandos_clients.items()}
 
770
        clients = {objpath: properties
 
771
                   for objpath, properties in mandos_clients.items()}
726
772
    else:
727
773
        for name in clientnames:
728
 
            for path, client in mandos_clients.items():
729
 
                if client["Name"] == name:
730
 
                    client_objc = bus.get_object(busname, path)
731
 
                    clients[client_objc] = client
 
774
            for objpath, properties in mandos_clients.items():
 
775
                if properties["Name"] == name:
 
776
                    clients[objpath] = properties
732
777
                    break
733
778
            else:
734
779
                log.critical("Client not found on server: %r", name)
737
782
    # Run all commands on clients
738
783
    commands = commands_from_options(options)
739
784
    for command in commands:
740
 
        command.run(mandos_serv, clients)
 
785
        command.run(clients, bus, mandos_serv)
741
786
 
742
787
 
743
788
class Test_milliseconds_to_string(unittest.TestCase):
792
837
        testcase = self
793
838
        class MockClient(object):
794
839
            def __init__(self, name, **attributes):
795
 
                self.__dbus_object_path__ = "objpath_{}".format(name)
 
840
                self.__dbus_object_path__ = "/clients/{}".format(name)
796
841
                self.attributes = attributes
797
842
                self.attributes["Name"] = name
798
843
                self.calls = []
799
 
            def Set(self, interface, property, value, dbus_interface):
800
 
                testcase.assertEqual(interface, client_interface)
801
 
                testcase.assertEqual(dbus_interface,
802
 
                                     dbus.PROPERTIES_IFACE)
803
 
                self.attributes[property] = value
804
 
            def Get(self, interface, property, dbus_interface):
805
 
                testcase.assertEqual(interface, client_interface)
806
 
                testcase.assertEqual(dbus_interface,
807
 
                                     dbus.PROPERTIES_IFACE)
808
 
                return self.attributes[property]
 
844
            def Set(self, interface, propname, value, dbus_interface):
 
845
                testcase.assertEqual(interface, client_dbus_interface)
 
846
                testcase.assertEqual(dbus_interface,
 
847
                                     dbus.PROPERTIES_IFACE)
 
848
                self.attributes[propname] = value
 
849
            def Get(self, interface, propname, dbus_interface):
 
850
                testcase.assertEqual(interface, client_dbus_interface)
 
851
                testcase.assertEqual(dbus_interface,
 
852
                                     dbus.PROPERTIES_IFACE)
 
853
                return self.attributes[propname]
809
854
            def Approve(self, approve, dbus_interface):
810
 
                testcase.assertEqual(dbus_interface, client_interface)
 
855
                testcase.assertEqual(dbus_interface,
 
856
                                     client_dbus_interface)
811
857
                self.calls.append(("Approve", (approve,
812
858
                                               dbus_interface)))
813
859
        self.client = MockClient(
860
906
            LastCheckerStatus=-2)
861
907
        self.clients =  collections.OrderedDict(
862
908
            [
863
 
                (self.client, self.client.attributes),
864
 
                (self.other_client, self.other_client.attributes),
 
909
                ("/clients/foo", self.client.attributes),
 
910
                ("/clients/barbar", self.other_client.attributes),
865
911
            ])
866
 
        self.one_client = {self.client: self.client.attributes}
 
912
        self.one_client = {"/clients/foo": self.client.attributes}
 
913
    @property
 
914
    def bus(self):
 
915
        class Bus(object):
 
916
            @staticmethod
 
917
            def get_object(client_bus_name, path):
 
918
                self.assertEqual(client_bus_name, dbus_busname)
 
919
                return {
 
920
                    "/clients/foo": self.client,
 
921
                    "/clients/barbar": self.other_client,
 
922
                }[path]
 
923
        return Bus()
867
924
 
868
925
class TestPrintTableCmd(TestCmd):
869
926
    def test_normal(self):
870
 
        output = PrintTableCmd().output(self.clients)
871
 
        expected_output = """
872
 
Name   Enabled Timeout  Last Successful Check
873
 
foo    Yes     00:05:00 2019-02-03T00:00:00  
874
 
barbar Yes     00:05:00 2019-02-04T00:00:00  
875
 
"""[1:-1]
 
927
        output = PrintTableCmd().output(self.clients.values())
 
928
        expected_output = "\n".join((
 
929
            "Name   Enabled Timeout  Last Successful Check",
 
930
            "foo    Yes     00:05:00 2019-02-03T00:00:00  ",
 
931
            "barbar Yes     00:05:00 2019-02-04T00:00:00  ",
 
932
        ))
876
933
        self.assertEqual(output, expected_output)
877
934
    def test_verbose(self):
878
 
        output = PrintTableCmd(verbose=True).output(self.clients)
879
 
        expected_output = """
880
 
Name   Enabled Timeout  Last Successful Check Created             Interval Host            Key ID                                                           Fingerprint                              Check Is Running Last Enabled        Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker              Extended Timeout Expires             Last Checker Status
881
 
foo    Yes     00:05:00 2019-02-03T00:00:00   2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No               2019-01-03T00:00:00 No                  Yes                                       00:00:00       00:00:01          fping -q -- %(host)s 00:15:00         2019-02-04T00:00:00 0                  
882
 
barbar Yes     00:05:00 2019-02-04T00:00:00   2019-01-03T00:00:00 00:02:00 192.0.2.3       0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes              2019-01-04T00:00:00 No                  No                  2019-01-03T00:00:00   00:00:30       00:00:01          :                    00:15:00         2019-02-05T00:00:00 -2                 
883
 
"""[1:-1]
 
935
        output = PrintTableCmd(verbose=True).output(
 
936
            self.clients.values())
 
937
        columns = (
 
938
            (
 
939
                "Name   ",
 
940
                "foo    ",
 
941
                "barbar ",
 
942
            ),(
 
943
                "Enabled ",
 
944
                "Yes     ",
 
945
                "Yes     ",
 
946
            ),(
 
947
                "Timeout  ",
 
948
                "00:05:00 ",
 
949
                "00:05:00 ",
 
950
            ),(
 
951
                "Last Successful Check ",
 
952
                "2019-02-03T00:00:00   ",
 
953
                "2019-02-04T00:00:00   ",
 
954
            ),(
 
955
                "Created             ",
 
956
                "2019-01-02T00:00:00 ",
 
957
                "2019-01-03T00:00:00 ",
 
958
            ),(
 
959
                "Interval ",
 
960
                "00:02:00 ",
 
961
                "00:02:00 ",
 
962
            ),(
 
963
                "Host            ",
 
964
                "foo.example.org ",
 
965
                "192.0.2.3       ",
 
966
            ),(
 
967
                ("Key ID                                             "
 
968
                 "              "),
 
969
                ("92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8"
 
970
                 "ea90a27cddb12 "),
 
971
                ("0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e"
 
972
                 "46c2b020f441c "),
 
973
            ),(
 
974
                "Fingerprint                              ",
 
975
                "778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 ",
 
976
                "3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 ",
 
977
            ),(
 
978
                "Check Is Running ",
 
979
                "No               ",
 
980
                "Yes              ",
 
981
            ),(
 
982
                "Last Enabled        ",
 
983
                "2019-01-03T00:00:00 ",
 
984
                "2019-01-04T00:00:00 ",
 
985
            ),(
 
986
                "Approval Is Pending ",
 
987
                "No                  ",
 
988
                "No                  ",
 
989
            ),(
 
990
                "Approved By Default ",
 
991
                "Yes                 ",
 
992
                "No                  ",
 
993
            ),(
 
994
                "Last Approval Request ",
 
995
                "                      ",
 
996
                "2019-01-03T00:00:00   ",
 
997
            ),(
 
998
                "Approval Delay ",
 
999
                "00:00:00       ",
 
1000
                "00:00:30       ",
 
1001
            ),(
 
1002
                "Approval Duration ",
 
1003
                "00:00:01          ",
 
1004
                "00:00:01          ",
 
1005
            ),(
 
1006
                "Checker              ",
 
1007
                "fping -q -- %(host)s ",
 
1008
                ":                    ",
 
1009
            ),(
 
1010
                "Extended Timeout ",
 
1011
                "00:15:00         ",
 
1012
                "00:15:00         ",
 
1013
            ),(
 
1014
                "Expires             ",
 
1015
                "2019-02-04T00:00:00 ",
 
1016
                "2019-02-05T00:00:00 ",
 
1017
            ),(
 
1018
                "Last Checker Status",
 
1019
                "0                  ",
 
1020
                "-2                 ",
 
1021
            )
 
1022
        )
 
1023
        num_lines = max(len(rows) for rows in columns)
 
1024
        expected_output = "\n".join("".join(rows[line]
 
1025
                                            for rows in columns)
 
1026
                                    for line in range(num_lines))
884
1027
        self.assertEqual(output, expected_output)
885
1028
    def test_one_client(self):
886
 
        output = PrintTableCmd().output(self.one_client)
 
1029
        output = PrintTableCmd().output(self.one_client.values())
887
1030
        expected_output = """
888
1031
Name Enabled Timeout  Last Successful Check
889
1032
foo  Yes     00:05:00 2019-02-03T00:00:00  
954
1097
 
955
1098
class TestIsEnabledCmd(TestCmd):
956
1099
    def test_is_enabled(self):
957
 
        self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
958
 
                            for client, properties in self.clients.items()))
 
1100
        self.assertTrue(all(IsEnabledCmd().is_enabled(client,
 
1101
                                                      properties)
 
1102
                            for client, properties
 
1103
                            in self.clients.items()))
959
1104
    def test_is_enabled_run_exits_successfully(self):
960
1105
        with self.assertRaises(SystemExit) as e:
961
 
            IsEnabledCmd().run(None, self.one_client)
 
1106
            IsEnabledCmd().run(self.one_client)
962
1107
        if e.exception.code is not None:
963
1108
            self.assertEqual(e.exception.code, 0)
964
1109
        else:
966
1111
    def test_is_enabled_run_exits_with_failure(self):
967
1112
        self.client.attributes["Enabled"] = dbus.Boolean(False)
968
1113
        with self.assertRaises(SystemExit) as e:
969
 
            IsEnabledCmd().run(None, self.one_client)
 
1114
            IsEnabledCmd().run(self.one_client)
970
1115
        if isinstance(e.exception.code, int):
971
1116
            self.assertNotEqual(e.exception.code, 0)
972
1117
        else:
981
1126
                self.calls.append(("RemoveClient", (dbus_path,)))
982
1127
        mandos = MockMandos()
983
1128
        super(TestRemoveCmd, self).setUp()
984
 
        RemoveCmd().run(mandos, self.clients)
 
1129
        RemoveCmd().run(self.clients, self.bus, mandos)
985
1130
        self.assertEqual(len(mandos.calls), 2)
986
 
        for client in self.clients:
987
 
            self.assertIn(("RemoveClient",
988
 
                           (client.__dbus_object_path__,)),
 
1131
        for clientpath in self.clients:
 
1132
            self.assertIn(("RemoveClient", (clientpath,)),
989
1133
                          mandos.calls)
990
1134
 
991
1135
class TestApproveCmd(TestCmd):
992
1136
    def test_approve(self):
993
 
        ApproveCmd().run(None, self.clients)
994
 
        for client in self.clients:
995
 
            self.assertIn(("Approve", (True, client_interface)),
 
1137
        ApproveCmd().run(self.clients, self.bus)
 
1138
        for clientpath in self.clients:
 
1139
            client = self.bus.get_object(dbus_busname, clientpath)
 
1140
            self.assertIn(("Approve", (True, client_dbus_interface)),
996
1141
                          client.calls)
997
1142
 
998
1143
class TestDenyCmd(TestCmd):
999
1144
    def test_deny(self):
1000
 
        DenyCmd().run(None, self.clients)
1001
 
        for client in self.clients:
1002
 
            self.assertIn(("Approve", (False, client_interface)),
 
1145
        DenyCmd().run(self.clients, self.bus)
 
1146
        for clientpath in self.clients:
 
1147
            client = self.bus.get_object(dbus_busname, clientpath)
 
1148
            self.assertIn(("Approve", (False, client_dbus_interface)),
1003
1149
                          client.calls)
1004
1150
 
1005
1151
class TestEnableCmd(TestCmd):
1006
1152
    def test_enable(self):
1007
 
        for client in self.clients:
 
1153
        for clientpath in self.clients:
 
1154
            client = self.bus.get_object(dbus_busname, clientpath)
1008
1155
            client.attributes["Enabled"] = False
1009
1156
 
1010
 
        EnableCmd().run(None, self.clients)
 
1157
        EnableCmd().run(self.clients, self.bus)
1011
1158
 
1012
 
        for client in self.clients:
 
1159
        for clientpath in self.clients:
 
1160
            client = self.bus.get_object(dbus_busname, clientpath)
1013
1161
            self.assertTrue(client.attributes["Enabled"])
1014
1162
 
1015
1163
class TestDisableCmd(TestCmd):
1016
1164
    def test_disable(self):
1017
 
        DisableCmd().run(None, self.clients)
1018
 
 
1019
 
        for client in self.clients:
 
1165
        DisableCmd().run(self.clients, self.bus)
 
1166
        for clientpath in self.clients:
 
1167
            client = self.bus.get_object(dbus_busname, clientpath)
1020
1168
            self.assertFalse(client.attributes["Enabled"])
1021
1169
 
1022
1170
class Unique(object):
1032
1180
                                self.values_to_set)
1033
1181
        for value_to_set, value_to_get in zip(self.values_to_set,
1034
1182
                                              values_to_get):
1035
 
            for client in self.clients:
1036
 
                old_value = client.attributes[self.property]
 
1183
            for clientpath in self.clients:
 
1184
                client = self.bus.get_object(dbus_busname, clientpath)
 
1185
                old_value = client.attributes[self.propname]
1037
1186
                self.assertNotIsInstance(old_value, Unique)
1038
 
                client.attributes[self.property] = Unique()
 
1187
                client.attributes[self.propname] = Unique()
1039
1188
            self.run_command(value_to_set, self.clients)
1040
 
            for client in self.clients:
1041
 
                value = client.attributes[self.property]
 
1189
            for clientpath in self.clients:
 
1190
                client = self.bus.get_object(dbus_busname, clientpath)
 
1191
                value = client.attributes[self.propname]
1042
1192
                self.assertNotIsInstance(value, Unique)
1043
1193
                self.assertEqual(value, value_to_get)
1044
1194
    def run_command(self, value, clients):
1045
 
        self.command().run(None, clients)
 
1195
        self.command().run(clients, self.bus)
1046
1196
 
1047
1197
class TestBumpTimeoutCmd(TestPropertyCmd):
1048
1198
    command = BumpTimeoutCmd
1049
 
    property = "LastCheckedOK"
 
1199
    propname = "LastCheckedOK"
1050
1200
    values_to_set = [""]
1051
1201
 
1052
1202
class TestStartCheckerCmd(TestPropertyCmd):
1053
1203
    command = StartCheckerCmd
1054
 
    property = "CheckerRunning"
 
1204
    propname = "CheckerRunning"
1055
1205
    values_to_set = [dbus.Boolean(True)]
1056
1206
 
1057
1207
class TestStopCheckerCmd(TestPropertyCmd):
1058
1208
    command = StopCheckerCmd
1059
 
    property = "CheckerRunning"
 
1209
    propname = "CheckerRunning"
1060
1210
    values_to_set = [dbus.Boolean(False)]
1061
1211
 
1062
1212
class TestApproveByDefaultCmd(TestPropertyCmd):
1063
1213
    command = ApproveByDefaultCmd
1064
 
    property = "ApprovedByDefault"
 
1214
    propname = "ApprovedByDefault"
1065
1215
    values_to_set = [dbus.Boolean(True)]
1066
1216
 
1067
1217
class TestDenyByDefaultCmd(TestPropertyCmd):
1068
1218
    command = DenyByDefaultCmd
1069
 
    property = "ApprovedByDefault"
 
1219
    propname = "ApprovedByDefault"
1070
1220
    values_to_set = [dbus.Boolean(False)]
1071
1221
 
1072
 
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1073
 
    """Abstract class for tests of PropertyCmd classes using the
1074
 
ValueArgumentMixIn"""
 
1222
class TestPropertyValueCmd(TestPropertyCmd):
 
1223
    """Abstract class for tests of PropertyValueCmd classes"""
1075
1224
    def runTest(self):
1076
 
        if type(self) is TestValueArgumentPropertyCmd:
 
1225
        if type(self) is TestPropertyValueCmd:
1077
1226
            return
1078
 
        return super(TestValueArgumentPropertyCmd, self).runTest()
 
1227
        return super(TestPropertyValueCmd, self).runTest()
1079
1228
    def run_command(self, value, clients):
1080
 
        self.command(value).run(None, clients)
 
1229
        self.command(value).run(clients, self.bus)
1081
1230
 
1082
 
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
 
1231
class TestSetCheckerCmd(TestPropertyValueCmd):
1083
1232
    command = SetCheckerCmd
1084
 
    property = "Checker"
 
1233
    propname = "Checker"
1085
1234
    values_to_set = ["", ":", "fping -q -- %s"]
1086
1235
 
1087
 
class TestSetHostCmd(TestValueArgumentPropertyCmd):
 
1236
class TestSetHostCmd(TestPropertyValueCmd):
1088
1237
    command = SetHostCmd
1089
 
    property = "Host"
 
1238
    propname = "Host"
1090
1239
    values_to_set = ["192.0.2.3", "foo.example.org"]
1091
1240
 
1092
 
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
 
1241
class TestSetSecretCmd(TestPropertyValueCmd):
1093
1242
    command = SetSecretCmd
1094
 
    property = "Secret"
1095
 
    values_to_set = [open("/dev/null", "rb"),
 
1243
    propname = "Secret"
 
1244
    values_to_set = [io.BytesIO(b""),
1096
1245
                     io.BytesIO(b"secret\0xyzzy\nbar")]
1097
1246
    values_to_get = [b"", b"secret\0xyzzy\nbar"]
1098
1247
 
1099
 
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
 
1248
class TestSetTimeoutCmd(TestPropertyValueCmd):
1100
1249
    command = SetTimeoutCmd
1101
 
    property = "Timeout"
1102
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1103
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1250
    propname = "Timeout"
 
1251
    values_to_set = [datetime.timedelta(),
 
1252
                     datetime.timedelta(minutes=5),
 
1253
                     datetime.timedelta(seconds=1),
 
1254
                     datetime.timedelta(weeks=1),
 
1255
                     datetime.timedelta(weeks=52)]
 
1256
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1104
1257
 
1105
 
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
 
1258
class TestSetExtendedTimeoutCmd(TestPropertyValueCmd):
1106
1259
    command = SetExtendedTimeoutCmd
1107
 
    property = "ExtendedTimeout"
1108
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1109
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1260
    propname = "ExtendedTimeout"
 
1261
    values_to_set = [datetime.timedelta(),
 
1262
                     datetime.timedelta(minutes=5),
 
1263
                     datetime.timedelta(seconds=1),
 
1264
                     datetime.timedelta(weeks=1),
 
1265
                     datetime.timedelta(weeks=52)]
 
1266
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1110
1267
 
1111
 
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
 
1268
class TestSetIntervalCmd(TestPropertyValueCmd):
1112
1269
    command = SetIntervalCmd
1113
 
    property = "Interval"
1114
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1115
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1270
    propname = "Interval"
 
1271
    values_to_set = [datetime.timedelta(),
 
1272
                     datetime.timedelta(minutes=5),
 
1273
                     datetime.timedelta(seconds=1),
 
1274
                     datetime.timedelta(weeks=1),
 
1275
                     datetime.timedelta(weeks=52)]
 
1276
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1116
1277
 
1117
 
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
 
1278
class TestSetApprovalDelayCmd(TestPropertyValueCmd):
1118
1279
    command = SetApprovalDelayCmd
1119
 
    property = "ApprovalDelay"
1120
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1121
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1280
    propname = "ApprovalDelay"
 
1281
    values_to_set = [datetime.timedelta(),
 
1282
                     datetime.timedelta(minutes=5),
 
1283
                     datetime.timedelta(seconds=1),
 
1284
                     datetime.timedelta(weeks=1),
 
1285
                     datetime.timedelta(weeks=52)]
 
1286
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1122
1287
 
1123
 
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
 
1288
class TestSetApprovalDurationCmd(TestPropertyValueCmd):
1124
1289
    command = SetApprovalDurationCmd
1125
 
    property = "ApprovalDuration"
1126
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1127
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1290
    propname = "ApprovalDuration"
 
1291
    values_to_set = [datetime.timedelta(),
 
1292
                     datetime.timedelta(minutes=5),
 
1293
                     datetime.timedelta(seconds=1),
 
1294
                     datetime.timedelta(weeks=1),
 
1295
                     datetime.timedelta(weeks=52)]
 
1296
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1128
1297
 
1129
 
class TestOptions(unittest.TestCase):
 
1298
class Test_command_from_options(unittest.TestCase):
1130
1299
    def setUp(self):
1131
1300
        self.parser = argparse.ArgumentParser()
1132
1301
        add_command_line_options(self.parser)
1133
 
    def assert_command_from_args(self, args, command_cls, **cmd_attrs):
 
1302
    def assert_command_from_args(self, args, command_cls,
 
1303
                                 **cmd_attrs):
1134
1304
        """Assert that parsing ARGS should result in an instance of
1135
1305
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1136
1306
        options = self.parser.parse_args(args)
 
1307
        check_option_syntax(self.parser, options)
1137
1308
        commands = commands_from_options(options)
1138
1309
        self.assertEqual(len(commands), 1)
1139
1310
        command = commands[0]
1140
1311
        self.assertIsInstance(command, command_cls)
1141
1312
        for key, value in cmd_attrs.items():
1142
1313
            self.assertEqual(getattr(command, key), value)
1143
 
    def test_default_is_show_table(self):
 
1314
    def test_print_table(self):
1144
1315
        self.assert_command_from_args([], PrintTableCmd,
1145
1316
                                      verbose=False)
1146
 
    def test_show_table_verbose(self):
 
1317
 
 
1318
    def test_print_table_verbose(self):
1147
1319
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
1148
1320
                                      verbose=True)
 
1321
 
 
1322
    def test_print_table_verbose_short(self):
 
1323
        self.assert_command_from_args(["-v"], PrintTableCmd,
 
1324
                                      verbose=True)
 
1325
 
1149
1326
    def test_enable(self):
1150
1327
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)
 
1328
 
 
1329
    def test_enable_short(self):
 
1330
        self.assert_command_from_args(["-e", "foo"], EnableCmd)
 
1331
 
1151
1332
    def test_disable(self):
1152
1333
        self.assert_command_from_args(["--disable", "foo"],
1153
1334
                                      DisableCmd)
1154
1335
 
 
1336
    def test_disable_short(self):
 
1337
        self.assert_command_from_args(["-d", "foo"], DisableCmd)
 
1338
 
 
1339
    def test_bump_timeout(self):
 
1340
        self.assert_command_from_args(["--bump-timeout", "foo"],
 
1341
                                      BumpTimeoutCmd)
 
1342
 
 
1343
    def test_bump_timeout_short(self):
 
1344
        self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
 
1345
 
 
1346
    def test_start_checker(self):
 
1347
        self.assert_command_from_args(["--start-checker", "foo"],
 
1348
                                      StartCheckerCmd)
 
1349
 
 
1350
    def test_stop_checker(self):
 
1351
        self.assert_command_from_args(["--stop-checker", "foo"],
 
1352
                                      StopCheckerCmd)
 
1353
 
 
1354
    def test_remove(self):
 
1355
        self.assert_command_from_args(["--remove", "foo"],
 
1356
                                      RemoveCmd)
 
1357
 
 
1358
    def test_remove_short(self):
 
1359
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
 
1360
 
 
1361
    def test_checker(self):
 
1362
        self.assert_command_from_args(["--checker", ":", "foo"],
 
1363
                                      SetCheckerCmd, value_to_set=":")
 
1364
 
 
1365
    def test_checker_empty(self):
 
1366
        self.assert_command_from_args(["--checker", "", "foo"],
 
1367
                                      SetCheckerCmd, value_to_set="")
 
1368
 
 
1369
    def test_checker_short(self):
 
1370
        self.assert_command_from_args(["-c", ":", "foo"],
 
1371
                                      SetCheckerCmd, value_to_set=":")
 
1372
 
 
1373
    def test_timeout(self):
 
1374
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
 
1375
                                      SetTimeoutCmd,
 
1376
                                      value_to_set=300000)
 
1377
 
 
1378
    def test_timeout_short(self):
 
1379
        self.assert_command_from_args(["-t", "PT5M", "foo"],
 
1380
                                      SetTimeoutCmd,
 
1381
                                      value_to_set=300000)
 
1382
 
 
1383
    def test_extended_timeout(self):
 
1384
        self.assert_command_from_args(["--extended-timeout", "PT15M",
 
1385
                                       "foo"],
 
1386
                                      SetExtendedTimeoutCmd,
 
1387
                                      value_to_set=900000)
 
1388
 
 
1389
    def test_interval(self):
 
1390
        self.assert_command_from_args(["--interval", "PT2M", "foo"],
 
1391
                                      SetIntervalCmd,
 
1392
                                      value_to_set=120000)
 
1393
 
 
1394
    def test_interval_short(self):
 
1395
        self.assert_command_from_args(["-i", "PT2M", "foo"],
 
1396
                                      SetIntervalCmd,
 
1397
                                      value_to_set=120000)
 
1398
 
 
1399
    def test_approve_by_default(self):
 
1400
        self.assert_command_from_args(["--approve-by-default", "foo"],
 
1401
                                      ApproveByDefaultCmd)
 
1402
 
 
1403
    def test_deny_by_default(self):
 
1404
        self.assert_command_from_args(["--deny-by-default", "foo"],
 
1405
                                      DenyByDefaultCmd)
 
1406
 
 
1407
    def test_approval_delay(self):
 
1408
        self.assert_command_from_args(["--approval-delay", "PT30S",
 
1409
                                       "foo"], SetApprovalDelayCmd,
 
1410
                                      value_to_set=30000)
 
1411
 
 
1412
    def test_approval_duration(self):
 
1413
        self.assert_command_from_args(["--approval-duration", "PT1S",
 
1414
                                       "foo"], SetApprovalDurationCmd,
 
1415
                                      value_to_set=1000)
 
1416
 
 
1417
    def test_host(self):
 
1418
        self.assert_command_from_args(["--host", "foo.example.org",
 
1419
                                       "foo"], SetHostCmd,
 
1420
                                      value_to_set="foo.example.org")
 
1421
 
 
1422
    def test_host_short(self):
 
1423
        self.assert_command_from_args(["-H", "foo.example.org",
 
1424
                                       "foo"], SetHostCmd,
 
1425
                                      value_to_set="foo.example.org")
 
1426
 
 
1427
    def test_secret_devnull(self):
 
1428
        self.assert_command_from_args(["--secret", os.path.devnull,
 
1429
                                       "foo"], SetSecretCmd,
 
1430
                                      value_to_set=b"")
 
1431
 
 
1432
    def test_secret_tempfile(self):
 
1433
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1434
            value = b"secret\0xyzzy\nbar"
 
1435
            f.write(value)
 
1436
            f.seek(0)
 
1437
            self.assert_command_from_args(["--secret", f.name,
 
1438
                                           "foo"], SetSecretCmd,
 
1439
                                          value_to_set=value)
 
1440
 
 
1441
    def test_secret_devnull_short(self):
 
1442
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
 
1443
                                      SetSecretCmd, value_to_set=b"")
 
1444
 
 
1445
    def test_secret_tempfile_short(self):
 
1446
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1447
            value = b"secret\0xyzzy\nbar"
 
1448
            f.write(value)
 
1449
            f.seek(0)
 
1450
            self.assert_command_from_args(["-s", f.name, "foo"],
 
1451
                                          SetSecretCmd,
 
1452
                                          value_to_set=value)
 
1453
 
 
1454
    def test_approve(self):
 
1455
        self.assert_command_from_args(["--approve", "foo"],
 
1456
                                      ApproveCmd)
 
1457
 
 
1458
    def test_approve_short(self):
 
1459
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
 
1460
 
 
1461
    def test_deny(self):
 
1462
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
 
1463
 
 
1464
    def test_deny_short(self):
 
1465
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
 
1466
 
 
1467
    def test_dump_json(self):
 
1468
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
 
1469
 
 
1470
    def test_is_enabled(self):
 
1471
        self.assert_command_from_args(["--is-enabled", "foo"],
 
1472
                                      IsEnabledCmd)
 
1473
 
 
1474
    def test_is_enabled_short(self):
 
1475
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
 
1476
 
 
1477
    def test_deny_before_remove(self):
 
1478
        options = self.parser.parse_args(["--deny", "--remove",
 
1479
                                          "foo"])
 
1480
        check_option_syntax(self.parser, options)
 
1481
        commands = commands_from_options(options)
 
1482
        self.assertEqual(len(commands), 2)
 
1483
        self.assertIsInstance(commands[0], DenyCmd)
 
1484
        self.assertIsInstance(commands[1], RemoveCmd)
 
1485
 
 
1486
    def test_deny_before_remove_reversed(self):
 
1487
        options = self.parser.parse_args(["--remove", "--deny",
 
1488
                                          "--all"])
 
1489
        check_option_syntax(self.parser, options)
 
1490
        commands = commands_from_options(options)
 
1491
        self.assertEqual(len(commands), 2)
 
1492
        self.assertIsInstance(commands[0], DenyCmd)
 
1493
        self.assertIsInstance(commands[1], RemoveCmd)
 
1494
 
 
1495
 
 
1496
class Test_check_option_syntax(unittest.TestCase):
 
1497
    # This mostly corresponds to the definition from has_actions() in
 
1498
    # check_option_syntax()
 
1499
    actions = {
 
1500
        # The actual values set here are not that important, but we do
 
1501
        # at least stick to the correct types, even though they are
 
1502
        # never used
 
1503
        "enable": True,
 
1504
        "disable": True,
 
1505
        "bump_timeout": True,
 
1506
        "start_checker": True,
 
1507
        "stop_checker": True,
 
1508
        "is_enabled": True,
 
1509
        "remove": True,
 
1510
        "checker": "x",
 
1511
        "timeout": datetime.timedelta(),
 
1512
        "extended_timeout": datetime.timedelta(),
 
1513
        "interval": datetime.timedelta(),
 
1514
        "approved_by_default": True,
 
1515
        "approval_delay": datetime.timedelta(),
 
1516
        "approval_duration": datetime.timedelta(),
 
1517
        "host": "x",
 
1518
        "secret": io.BytesIO(b"x"),
 
1519
        "approve": True,
 
1520
        "deny": True,
 
1521
    }
 
1522
 
 
1523
    def setUp(self):
 
1524
        self.parser = argparse.ArgumentParser()
 
1525
        add_command_line_options(self.parser)
 
1526
 
 
1527
    @contextlib.contextmanager
 
1528
    def assertParseError(self):
 
1529
        with self.assertRaises(SystemExit) as e:
 
1530
            with self.temporarily_suppress_stderr():
 
1531
                yield
 
1532
        # Exit code from argparse is guaranteed to be "2".  Reference:
 
1533
        # https://docs.python.org/3/library
 
1534
        # /argparse.html#exiting-methods
 
1535
        self.assertEqual(e.exception.code, 2)
 
1536
 
 
1537
    @staticmethod
 
1538
    @contextlib.contextmanager
 
1539
    def temporarily_suppress_stderr():
 
1540
        null = os.open(os.path.devnull, os.O_RDWR)
 
1541
        stderrcopy = os.dup(sys.stderr.fileno())
 
1542
        os.dup2(null, sys.stderr.fileno())
 
1543
        os.close(null)
 
1544
        try:
 
1545
            yield
 
1546
        finally:
 
1547
            # restore stderr
 
1548
            os.dup2(stderrcopy, sys.stderr.fileno())
 
1549
            os.close(stderrcopy)
 
1550
 
 
1551
    def check_option_syntax(self, options):
 
1552
        check_option_syntax(self.parser, options)
 
1553
 
 
1554
    def test_actions_requires_client_or_all(self):
 
1555
        for action, value in self.actions.items():
 
1556
            options = self.parser.parse_args()
 
1557
            setattr(options, action, value)
 
1558
            with self.assertParseError():
 
1559
                self.check_option_syntax(options)
 
1560
 
 
1561
    def test_actions_conflicts_with_verbose(self):
 
1562
        for action, value in self.actions.items():
 
1563
            options = self.parser.parse_args()
 
1564
            setattr(options, action, value)
 
1565
            options.verbose = True
 
1566
            with self.assertParseError():
 
1567
                self.check_option_syntax(options)
 
1568
 
 
1569
    def test_dump_json_conflicts_with_verbose(self):
 
1570
        options = self.parser.parse_args()
 
1571
        options.dump_json = True
 
1572
        options.verbose = True
 
1573
        with self.assertParseError():
 
1574
            self.check_option_syntax(options)
 
1575
 
 
1576
    def test_dump_json_conflicts_with_action(self):
 
1577
        for action, value in self.actions.items():
 
1578
            options = self.parser.parse_args()
 
1579
            setattr(options, action, value)
 
1580
            options.dump_json = True
 
1581
            with self.assertParseError():
 
1582
                self.check_option_syntax(options)
 
1583
 
 
1584
    def test_all_can_not_be_alone(self):
 
1585
        options = self.parser.parse_args()
 
1586
        options.all = True
 
1587
        with self.assertParseError():
 
1588
            self.check_option_syntax(options)
 
1589
 
 
1590
    def test_all_is_ok_with_any_action(self):
 
1591
        for action, value in self.actions.items():
 
1592
            options = self.parser.parse_args()
 
1593
            setattr(options, action, value)
 
1594
            options.all = True
 
1595
            self.check_option_syntax(options)
 
1596
 
 
1597
    def test_is_enabled_fails_without_client(self):
 
1598
        options = self.parser.parse_args()
 
1599
        options.is_enabled = True
 
1600
        with self.assertParseError():
 
1601
            self.check_option_syntax(options)
 
1602
 
 
1603
    def test_is_enabled_works_with_one_client(self):
 
1604
        options = self.parser.parse_args()
 
1605
        options.is_enabled = True
 
1606
        options.client = ["foo"]
 
1607
        self.check_option_syntax(options)
 
1608
 
 
1609
    def test_is_enabled_fails_with_two_clients(self):
 
1610
        options = self.parser.parse_args()
 
1611
        options.is_enabled = True
 
1612
        options.client = ["foo", "barbar"]
 
1613
        with self.assertParseError():
 
1614
            self.check_option_syntax(options)
 
1615
 
 
1616
    def test_remove_can_only_be_combined_with_action_deny(self):
 
1617
        for action, value in self.actions.items():
 
1618
            if action in {"remove", "deny"}:
 
1619
                continue
 
1620
            options = self.parser.parse_args()
 
1621
            setattr(options, action, value)
 
1622
            options.all = True
 
1623
            options.remove = True
 
1624
            with self.assertParseError():
 
1625
                self.check_option_syntax(options)
 
1626
 
1155
1627
 
1156
1628
 
1157
1629
def should_only_run_tests():