/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-05 21:39:15 UTC
  • Revision ID: teddy@recompile.se-20190305213915-xm1vw00jyy3a5tfn
mandos-ctl: Add more tests, including tests for all commands

* mandos-ctl (Test_string_to_delta.test_handles_basic_rfc3339): Add a
                                                  few more test cases.
  (TestCmd.setUp.MockClient.Set, TestCmd.setUp.MockClient.Get): Don't
  append to self.calls, since nobody should use it to check for Set()
  or Get() calls; instead, the return value of Get() should be
  inspected, and the MockClient.attributes dict should be inspected
  after (implicitly) calling Set().
  (Unique): New; stand-in for unittest.mock.sentinel.
  (TestPropertyCmd): New; abstract class testing PropertyCmd classes.
  (TestBumpTimeoutCmd, TestStartCheckerCmd, TestStopCheckerCmd,
  TestApproveByDefaultCmd, TestDenyByDefaultCmd): New.
  (TestValueArgumentPropertyCmd): New; abstract class for testing
                                  those PropertyCmd classes which also
                                  inherit from ValueArgumentMixIn.
  (TestSetCheckerCmd, TestSetHostCmd, TestSetSecretCmd,
  TestSetTimeoutCmd, TestSetExtendedTimeoutCmd, TestSetIntervalCmd,
  TestSetApprovalDelayCmd, TestSetApprovalDurationCmd): New.

Show diffs side-by-side

added added

removed removed

Lines of Context:
42
42
import json
43
43
import unittest
44
44
import logging
45
 
import io
46
 
import tempfile
47
 
import contextlib
48
45
 
49
46
import dbus
50
47
 
294
291
                    "ApprovalDuration", "Checker", "ExtendedTimeout",
295
292
                    "Expires", "LastCheckerStatus")
296
293
    def run(self, mandos, clients):
297
 
        print(self.output(clients.values()))
298
 
    def output(self, clients):
299
 
        raise NotImplementedError()
 
294
        print(self.output(clients))
300
295
 
301
296
class PropertyCmd(Command):
302
297
    """Abstract class for Actions for setting one client property"""
303
298
    def run_on_one_client(self, client, properties):
304
299
        """Set the Client's D-Bus property"""
305
 
        log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
306
 
                  client.__dbus_object_path__,
307
 
                  dbus.PROPERTIES_IFACE, client_interface,
308
 
                  self.propname, self.value_to_set
309
 
                  if not isinstance(self.value_to_set, dbus.Boolean)
310
 
                  else bool(self.value_to_set))
311
 
        client.Set(client_interface, self.propname, self.value_to_set,
 
300
        client.Set(client_interface, self.property, self.value_to_set,
312
301
                   dbus_interface=dbus.PROPERTIES_IFACE)
313
 
    @property
314
 
    def propname(self):
315
 
        raise NotImplementedError()
316
302
 
317
303
class ValueArgumentMixIn(object):
318
304
    """Mixin class for commands taking a value as argument"""
328
314
    @value_to_set.setter
329
315
    def value_to_set(self, value):
330
316
        """When setting, convert value to a datetime.timedelta"""
331
 
        self._vts = int(round(value.total_seconds() * 1000))
 
317
        self._vts = string_to_delta(value).total_seconds() * 1000
332
318
 
333
319
# Actual (non-abstract) command classes
334
320
 
341
327
        keywords = default_keywords
342
328
        if self.verbose:
343
329
            keywords = self.all_keywords
344
 
        return str(self.TableOfClients(clients, keywords))
 
330
        return str(self.TableOfClients(clients.values(), keywords))
345
331
 
346
332
    class TableOfClients(object):
347
333
        tableheaders = {
438
424
            sys.exit(0)
439
425
        sys.exit(1)
440
426
    def is_enabled(self, client, properties):
441
 
        log.debug("D-Bus: %s:%s:%s.Get(%r, %r)", busname,
442
 
                  client.__dbus_object_path__,
443
 
                  dbus.PROPERTIES_IFACE, client_interface,
444
 
                  "Enabled")
445
 
        return bool(client.Get(client_interface, "Enabled",
446
 
                               dbus_interface=dbus.PROPERTIES_IFACE))
 
427
        return bool(properties["Enabled"])
447
428
 
448
429
class RemoveCmd(Command):
449
430
    def run_on_one_client(self, client, properties):
450
 
        log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
451
 
                  server_path, server_interface,
452
 
                  str(client.__dbus_object_path__))
453
431
        self.mandos.RemoveClient(client.__dbus_object_path__)
454
432
 
455
433
class ApproveCmd(Command):
456
434
    def run_on_one_client(self, client, properties):
457
 
        log.debug("D-Bus: %s:%s.Approve(True)",
458
 
                  client.__dbus_object_path__, client_interface)
459
435
        client.Approve(dbus.Boolean(True),
460
436
                       dbus_interface=client_interface)
461
437
 
462
438
class DenyCmd(Command):
463
439
    def run_on_one_client(self, client, properties):
464
 
        log.debug("D-Bus: %s:%s.Approve(False)",
465
 
                  client.__dbus_object_path__, client_interface)
466
440
        client.Approve(dbus.Boolean(False),
467
441
                       dbus_interface=client_interface)
468
442
 
469
443
class EnableCmd(PropertyCmd):
470
 
    propname = "Enabled"
 
444
    property = "Enabled"
471
445
    value_to_set = dbus.Boolean(True)
472
446
 
473
447
class DisableCmd(PropertyCmd):
474
 
    propname = "Enabled"
 
448
    property = "Enabled"
475
449
    value_to_set = dbus.Boolean(False)
476
450
 
477
451
class BumpTimeoutCmd(PropertyCmd):
478
 
    propname = "LastCheckedOK"
 
452
    property = "LastCheckedOK"
479
453
    value_to_set = ""
480
454
 
481
455
class StartCheckerCmd(PropertyCmd):
482
 
    propname = "CheckerRunning"
 
456
    property = "CheckerRunning"
483
457
    value_to_set = dbus.Boolean(True)
484
458
 
485
459
class StopCheckerCmd(PropertyCmd):
486
 
    propname = "CheckerRunning"
 
460
    property = "CheckerRunning"
487
461
    value_to_set = dbus.Boolean(False)
488
462
 
489
463
class ApproveByDefaultCmd(PropertyCmd):
490
 
    propname = "ApprovedByDefault"
 
464
    property = "ApprovedByDefault"
491
465
    value_to_set = dbus.Boolean(True)
492
466
 
493
467
class DenyByDefaultCmd(PropertyCmd):
494
 
    propname = "ApprovedByDefault"
 
468
    property = "ApprovedByDefault"
495
469
    value_to_set = dbus.Boolean(False)
496
470
 
497
471
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
498
 
    propname = "Checker"
 
472
    property = "Checker"
499
473
 
500
474
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
501
 
    propname = "Host"
 
475
    property = "Host"
502
476
 
503
477
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
504
 
    propname = "Secret"
505
 
    @property
506
 
    def value_to_set(self):
507
 
        return self._vts
508
 
    @value_to_set.setter
509
 
    def value_to_set(self, value):
510
 
        """When setting, read data from supplied file object"""
511
 
        self._vts = value.read()
512
 
        value.close()
 
478
    property = "Secret"
513
479
 
514
480
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
515
 
    propname = "Timeout"
 
481
    property = "Timeout"
516
482
 
517
483
class SetExtendedTimeoutCmd(PropertyCmd,
518
484
                            MillisecondsValueArgumentMixIn):
519
 
    propname = "ExtendedTimeout"
 
485
    property = "ExtendedTimeout"
520
486
 
521
487
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
522
 
    propname = "Interval"
 
488
    property = "Interval"
523
489
 
524
490
class SetApprovalDelayCmd(PropertyCmd,
525
491
                          MillisecondsValueArgumentMixIn):
526
 
    propname = "ApprovalDelay"
 
492
    property = "ApprovalDelay"
527
493
 
528
494
class SetApprovalDurationCmd(PropertyCmd,
529
495
                             MillisecondsValueArgumentMixIn):
530
 
    propname = "ApprovalDuration"
 
496
    property = "ApprovalDuration"
 
497
 
 
498
def has_actions(options):
 
499
    return any((options.enable,
 
500
                options.disable,
 
501
                options.bump_timeout,
 
502
                options.start_checker,
 
503
                options.stop_checker,
 
504
                options.is_enabled,
 
505
                options.remove,
 
506
                options.checker is not None,
 
507
                options.timeout is not None,
 
508
                options.extended_timeout is not None,
 
509
                options.interval is not None,
 
510
                options.approved_by_default is not None,
 
511
                options.approval_delay is not None,
 
512
                options.approval_duration is not None,
 
513
                options.host is not None,
 
514
                options.secret is not None,
 
515
                options.approve,
 
516
                options.deny))
531
517
 
532
518
def add_command_line_options(parser):
533
519
    parser.add_argument("--version", action="version",
560
546
                        help="Remove client")
561
547
    parser.add_argument("-c", "--checker",
562
548
                        help="Set checker command for client")
563
 
    parser.add_argument("-t", "--timeout", type=string_to_delta,
 
549
    parser.add_argument("-t", "--timeout",
564
550
                        help="Set timeout for client")
565
 
    parser.add_argument("--extended-timeout", type=string_to_delta,
 
551
    parser.add_argument("--extended-timeout",
566
552
                        help="Set extended timeout for client")
567
 
    parser.add_argument("-i", "--interval", type=string_to_delta,
 
553
    parser.add_argument("-i", "--interval",
568
554
                        help="Set checker interval for client")
569
555
    approve_deny_default = parser.add_mutually_exclusive_group()
570
556
    approve_deny_default.add_argument(
575
561
        "--deny-by-default", action="store_false",
576
562
        dest="approved_by_default",
577
563
        help="Set client to be denied by default")
578
 
    parser.add_argument("--approval-delay", type=string_to_delta,
 
564
    parser.add_argument("--approval-delay",
579
565
                        help="Set delay before client approve/deny")
580
 
    parser.add_argument("--approval-duration", type=string_to_delta,
 
566
    parser.add_argument("--approval-duration",
581
567
                        help="Set duration of one client approval")
582
568
    parser.add_argument("-H", "--host", help="Set host for client")
583
569
    parser.add_argument("-s", "--secret",
589
575
        help="Approve any current client request")
590
576
    approve_deny.add_argument("-D", "--deny", action="store_true",
591
577
                              help="Deny any current client request")
592
 
    parser.add_argument("--debug", action="store_true",
593
 
                        help="Debug mode (show D-Bus commands)")
594
578
    parser.add_argument("--check", action="store_true",
595
579
                        help="Run self-test")
596
580
    parser.add_argument("client", nargs="*", help="Client name")
621
605
    if options.is_enabled:
622
606
        commands.append(IsEnabledCmd())
623
607
 
 
608
    if options.remove:
 
609
        commands.append(RemoveCmd())
 
610
 
624
611
    if options.checker is not None:
625
 
        commands.append(SetCheckerCmd(options.checker))
 
612
        commands.append(SetCheckerCmd())
626
613
 
627
614
    if options.timeout is not None:
628
615
        commands.append(SetTimeoutCmd(options.timeout))
632
619
            SetExtendedTimeoutCmd(options.extended_timeout))
633
620
 
634
621
    if options.interval is not None:
635
 
        commands.append(SetIntervalCmd(options.interval))
 
622
        command.append(SetIntervalCmd(options.interval))
636
623
 
637
624
    if options.approved_by_default is not None:
638
625
        if options.approved_by_default:
639
 
            commands.append(ApproveByDefaultCmd())
 
626
            command.append(ApproveByDefaultCmd())
640
627
        else:
641
 
            commands.append(DenyByDefaultCmd())
 
628
            command.append(DenyByDefaultCmd())
642
629
 
643
630
    if options.approval_delay is not None:
644
 
        commands.append(SetApprovalDelayCmd(options.approval_delay))
 
631
        command.append(SetApprovalDelayCmd(options.approval_delay))
645
632
 
646
633
    if options.approval_duration is not None:
647
 
        commands.append(
 
634
        command.append(
648
635
            SetApprovalDurationCmd(options.approval_duration))
649
636
 
650
637
    if options.host is not None:
651
 
        commands.append(SetHostCmd(options.host))
 
638
        command.append(SetHostCmd(options.host))
652
639
 
653
640
    if options.secret is not None:
654
 
        commands.append(SetSecretCmd(options.secret))
 
641
        command.append(SetSecretCmd(options.secret))
655
642
 
656
643
    if options.approve:
657
644
        commands.append(ApproveCmd())
659
646
    if options.deny:
660
647
        commands.append(DenyCmd())
661
648
 
662
 
    if options.remove:
663
 
        commands.append(RemoveCmd())
664
 
 
665
649
    # If no command option has been given, show table of clients,
666
650
    # optionally verbosely
667
651
    if not commands:
670
654
    return commands
671
655
 
672
656
 
673
 
def check_option_syntax(parser, options):
674
 
    """Apply additional restrictions on options, not expressible in
675
 
argparse"""
676
 
 
677
 
    def has_actions(options):
678
 
        return any((options.enable,
679
 
                    options.disable,
680
 
                    options.bump_timeout,
681
 
                    options.start_checker,
682
 
                    options.stop_checker,
683
 
                    options.is_enabled,
684
 
                    options.remove,
685
 
                    options.checker is not None,
686
 
                    options.timeout is not None,
687
 
                    options.extended_timeout is not None,
688
 
                    options.interval is not None,
689
 
                    options.approved_by_default is not None,
690
 
                    options.approval_delay is not None,
691
 
                    options.approval_duration is not None,
692
 
                    options.host is not None,
693
 
                    options.secret is not None,
694
 
                    options.approve,
695
 
                    options.deny))
 
657
def main():
 
658
    parser = argparse.ArgumentParser()
 
659
 
 
660
    add_command_line_options(parser)
 
661
 
 
662
    options = parser.parse_args()
696
663
 
697
664
    if has_actions(options) and not (options.client or options.all):
698
665
        parser.error("Options require clients names or --all.")
705
672
        parser.error("--all requires an action.")
706
673
    if options.is_enabled and len(options.client) > 1:
707
674
        parser.error("--is-enabled requires exactly one client")
708
 
    if options.remove:
709
 
        options.remove = False
710
 
        if has_actions(options) and not options.deny:
711
 
            parser.error("--remove can only be combined with --deny")
712
 
        options.remove = True
713
 
 
714
 
 
715
 
def main():
716
 
    parser = argparse.ArgumentParser()
717
 
 
718
 
    add_command_line_options(parser)
719
 
 
720
 
    options = parser.parse_args()
721
 
 
722
 
    check_option_syntax(parser, options)
723
675
 
724
676
    clientnames = options.client
725
677
 
726
 
    if options.debug:
727
 
        log.setLevel(logging.DEBUG)
728
 
 
729
678
    try:
730
679
        bus = dbus.SystemBus()
731
 
        log.debug("D-Bus: Connect to: (name=%r, path=%r)", busname,
732
 
                  server_path)
733
680
        mandos_dbus_objc = bus.get_object(busname, server_path)
734
681
    except dbus.exceptions.DBusException:
735
682
        log.critical("Could not connect to Mandos server")
748
695
    dbus_filter = NullFilter()
749
696
    try:
750
697
        dbus_logger.addFilter(dbus_filter)
751
 
        log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", busname,
752
 
                  server_path, dbus.OBJECT_MANAGER_IFACE)
753
698
        mandos_clients = {path: ifs_and_props[client_interface]
754
699
                          for path, ifs_and_props in
755
700
                          mandos_serv_object_manager
767
712
    clients = {}
768
713
 
769
714
    if not clientnames:
770
 
        clients = {(log.debug("D-Bus: Connect to: (name=%r, path=%r)",
771
 
                              busname, str(path)) and False) or
772
 
                   bus.get_object(busname, path): properties
 
715
        clients = {bus.get_object(busname, path): properties
773
716
                   for path, properties in mandos_clients.items()}
774
717
    else:
775
718
        for name in clientnames:
776
719
            for path, client in mandos_clients.items():
777
720
                if client["Name"] == name:
778
 
                    log.debug("D-Bus: Connect to: (name=%r, path=%r)",
779
 
                              busname, str(path))
780
721
                    client_objc = bus.get_object(busname, path)
781
722
                    clients[client_objc] = client
782
723
                    break
846
787
                self.attributes = attributes
847
788
                self.attributes["Name"] = name
848
789
                self.calls = []
849
 
            def Set(self, interface, propname, value, dbus_interface):
850
 
                testcase.assertEqual(interface, client_interface)
851
 
                testcase.assertEqual(dbus_interface,
852
 
                                     dbus.PROPERTIES_IFACE)
853
 
                self.attributes[propname] = value
854
 
            def Get(self, interface, propname, dbus_interface):
855
 
                testcase.assertEqual(interface, client_interface)
856
 
                testcase.assertEqual(dbus_interface,
857
 
                                     dbus.PROPERTIES_IFACE)
858
 
                return self.attributes[propname]
 
790
            def Set(self, interface, property, value, dbus_interface):
 
791
                testcase.assertEqual(interface, client_interface)
 
792
                testcase.assertEqual(dbus_interface,
 
793
                                     dbus.PROPERTIES_IFACE)
 
794
                self.attributes[property] = value
 
795
            def Get(self, interface, property, dbus_interface):
 
796
                testcase.assertEqual(interface, client_interface)
 
797
                testcase.assertEqual(dbus_interface,
 
798
                                     dbus.PROPERTIES_IFACE)
 
799
                return self.attributes[property]
859
800
            def Approve(self, approve, dbus_interface):
860
801
                testcase.assertEqual(dbus_interface, client_interface)
861
802
                self.calls.append(("Approve", (approve,
917
858
 
918
859
class TestPrintTableCmd(TestCmd):
919
860
    def test_normal(self):
920
 
        output = PrintTableCmd().output(self.clients.values())
 
861
        output = PrintTableCmd().output(self.clients)
921
862
        expected_output = """
922
863
Name   Enabled Timeout  Last Successful Check
923
864
foo    Yes     00:05:00 2019-02-03T00:00:00  
925
866
"""[1:-1]
926
867
        self.assertEqual(output, expected_output)
927
868
    def test_verbose(self):
928
 
        output = PrintTableCmd(verbose=True).output(
929
 
            self.clients.values())
 
869
        output = PrintTableCmd(verbose=True).output(self.clients)
930
870
        expected_output = """
931
871
Name   Enabled Timeout  Last Successful Check Created             Interval Host            Key ID                                                           Fingerprint                              Check Is Running Last Enabled        Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker              Extended Timeout Expires             Last Checker Status
932
872
foo    Yes     00:05:00 2019-02-03T00:00:00   2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No               2019-01-03T00:00:00 No                  Yes                                       00:00:00       00:00:01          fping -q -- %(host)s 00:15:00         2019-02-04T00:00:00 0                  
934
874
"""[1:-1]
935
875
        self.assertEqual(output, expected_output)
936
876
    def test_one_client(self):
937
 
        output = PrintTableCmd().output(self.one_client.values())
 
877
        output = PrintTableCmd().output(self.one_client)
938
878
        expected_output = """
939
879
Name Enabled Timeout  Last Successful Check
940
880
foo  Yes     00:05:00 2019-02-03T00:00:00  
1084
1024
        for value_to_set, value_to_get in zip(self.values_to_set,
1085
1025
                                              values_to_get):
1086
1026
            for client in self.clients:
1087
 
                old_value = client.attributes[self.propname]
 
1027
                old_value = client.attributes[self.property]
1088
1028
                self.assertNotIsInstance(old_value, Unique)
1089
 
                client.attributes[self.propname] = Unique()
 
1029
                client.attributes[self.property] = Unique()
1090
1030
            self.run_command(value_to_set, self.clients)
1091
1031
            for client in self.clients:
1092
 
                value = client.attributes[self.propname]
 
1032
                value = client.attributes[self.property]
1093
1033
                self.assertNotIsInstance(value, Unique)
1094
1034
                self.assertEqual(value, value_to_get)
1095
1035
    def run_command(self, value, clients):
1097
1037
 
1098
1038
class TestBumpTimeoutCmd(TestPropertyCmd):
1099
1039
    command = BumpTimeoutCmd
1100
 
    propname = "LastCheckedOK"
 
1040
    property = "LastCheckedOK"
1101
1041
    values_to_set = [""]
1102
1042
 
1103
1043
class TestStartCheckerCmd(TestPropertyCmd):
1104
1044
    command = StartCheckerCmd
1105
 
    propname = "CheckerRunning"
 
1045
    property = "CheckerRunning"
1106
1046
    values_to_set = [dbus.Boolean(True)]
1107
1047
 
1108
1048
class TestStopCheckerCmd(TestPropertyCmd):
1109
1049
    command = StopCheckerCmd
1110
 
    propname = "CheckerRunning"
 
1050
    property = "CheckerRunning"
1111
1051
    values_to_set = [dbus.Boolean(False)]
1112
1052
 
1113
1053
class TestApproveByDefaultCmd(TestPropertyCmd):
1114
1054
    command = ApproveByDefaultCmd
1115
 
    propname = "ApprovedByDefault"
 
1055
    property = "ApprovedByDefault"
1116
1056
    values_to_set = [dbus.Boolean(True)]
1117
1057
 
1118
1058
class TestDenyByDefaultCmd(TestPropertyCmd):
1119
1059
    command = DenyByDefaultCmd
1120
 
    propname = "ApprovedByDefault"
 
1060
    property = "ApprovedByDefault"
1121
1061
    values_to_set = [dbus.Boolean(False)]
1122
1062
 
1123
1063
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1132
1072
 
1133
1073
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1134
1074
    command = SetCheckerCmd
1135
 
    propname = "Checker"
 
1075
    property = "Checker"
1136
1076
    values_to_set = ["", ":", "fping -q -- %s"]
1137
1077
 
1138
1078
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1139
1079
    command = SetHostCmd
1140
 
    propname = "Host"
 
1080
    property = "Host"
1141
1081
    values_to_set = ["192.0.2.3", "foo.example.org"]
1142
1082
 
1143
1083
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1144
1084
    command = SetSecretCmd
1145
 
    propname = "Secret"
1146
 
    values_to_set = [io.BytesIO(b""),
1147
 
                     io.BytesIO(b"secret\0xyzzy\nbar")]
1148
 
    values_to_get = [b"", b"secret\0xyzzy\nbar"]
 
1085
    property = "Secret"
 
1086
    values_to_set = [b"", b"secret"]
1149
1087
 
1150
1088
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1151
1089
    command = SetTimeoutCmd
1152
 
    propname = "Timeout"
1153
 
    values_to_set = [datetime.timedelta(),
1154
 
                     datetime.timedelta(minutes=5),
1155
 
                     datetime.timedelta(seconds=1),
1156
 
                     datetime.timedelta(weeks=1),
1157
 
                     datetime.timedelta(weeks=52)]
1158
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1090
    property = "Timeout"
 
1091
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1092
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1159
1093
 
1160
1094
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1161
1095
    command = SetExtendedTimeoutCmd
1162
 
    propname = "ExtendedTimeout"
1163
 
    values_to_set = [datetime.timedelta(),
1164
 
                     datetime.timedelta(minutes=5),
1165
 
                     datetime.timedelta(seconds=1),
1166
 
                     datetime.timedelta(weeks=1),
1167
 
                     datetime.timedelta(weeks=52)]
1168
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1096
    property = "ExtendedTimeout"
 
1097
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1098
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1169
1099
 
1170
1100
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1171
1101
    command = SetIntervalCmd
1172
 
    propname = "Interval"
1173
 
    values_to_set = [datetime.timedelta(),
1174
 
                     datetime.timedelta(minutes=5),
1175
 
                     datetime.timedelta(seconds=1),
1176
 
                     datetime.timedelta(weeks=1),
1177
 
                     datetime.timedelta(weeks=52)]
1178
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1102
    property = "Interval"
 
1103
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1104
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1179
1105
 
1180
1106
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1181
1107
    command = SetApprovalDelayCmd
1182
 
    propname = "ApprovalDelay"
1183
 
    values_to_set = [datetime.timedelta(),
1184
 
                     datetime.timedelta(minutes=5),
1185
 
                     datetime.timedelta(seconds=1),
1186
 
                     datetime.timedelta(weeks=1),
1187
 
                     datetime.timedelta(weeks=52)]
1188
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1108
    property = "ApprovalDelay"
 
1109
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1110
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1189
1111
 
1190
1112
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1191
1113
    command = SetApprovalDurationCmd
1192
 
    propname = "ApprovalDuration"
1193
 
    values_to_set = [datetime.timedelta(),
1194
 
                     datetime.timedelta(minutes=5),
1195
 
                     datetime.timedelta(seconds=1),
1196
 
                     datetime.timedelta(weeks=1),
1197
 
                     datetime.timedelta(weeks=52)]
1198
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1199
 
 
1200
 
class Test_command_from_options(unittest.TestCase):
1201
 
    def setUp(self):
1202
 
        self.parser = argparse.ArgumentParser()
1203
 
        add_command_line_options(self.parser)
1204
 
    def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1205
 
        """Assert that parsing ARGS should result in an instance of
1206
 
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1207
 
        options = self.parser.parse_args(args)
1208
 
        check_option_syntax(self.parser, options)
1209
 
        commands = commands_from_options(options)
1210
 
        self.assertEqual(len(commands), 1)
1211
 
        command = commands[0]
1212
 
        self.assertIsInstance(command, command_cls)
1213
 
        for key, value in cmd_attrs.items():
1214
 
            self.assertEqual(getattr(command, key), value)
1215
 
    def test_print_table(self):
1216
 
        self.assert_command_from_args([], PrintTableCmd,
1217
 
                                      verbose=False)
1218
 
 
1219
 
    def test_print_table_verbose(self):
1220
 
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
1221
 
                                      verbose=True)
1222
 
 
1223
 
    def test_print_table_verbose_short(self):
1224
 
        self.assert_command_from_args(["-v"], PrintTableCmd,
1225
 
                                      verbose=True)
1226
 
 
1227
 
    def test_enable(self):
1228
 
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1229
 
 
1230
 
    def test_enable_short(self):
1231
 
        self.assert_command_from_args(["-e", "foo"], EnableCmd)
1232
 
 
1233
 
    def test_disable(self):
1234
 
        self.assert_command_from_args(["--disable", "foo"],
1235
 
                                      DisableCmd)
1236
 
 
1237
 
    def test_disable_short(self):
1238
 
        self.assert_command_from_args(["-d", "foo"], DisableCmd)
1239
 
 
1240
 
    def test_bump_timeout(self):
1241
 
        self.assert_command_from_args(["--bump-timeout", "foo"],
1242
 
                                      BumpTimeoutCmd)
1243
 
 
1244
 
    def test_bump_timeout_short(self):
1245
 
        self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1246
 
 
1247
 
    def test_start_checker(self):
1248
 
        self.assert_command_from_args(["--start-checker", "foo"],
1249
 
                                      StartCheckerCmd)
1250
 
 
1251
 
    def test_stop_checker(self):
1252
 
        self.assert_command_from_args(["--stop-checker", "foo"],
1253
 
                                      StopCheckerCmd)
1254
 
 
1255
 
    def test_remove(self):
1256
 
        self.assert_command_from_args(["--remove", "foo"],
1257
 
                                      RemoveCmd)
1258
 
 
1259
 
    def test_remove_short(self):
1260
 
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1261
 
 
1262
 
    def test_checker(self):
1263
 
        self.assert_command_from_args(["--checker", ":", "foo"],
1264
 
                                      SetCheckerCmd, value_to_set=":")
1265
 
 
1266
 
    def test_checker_empty(self):
1267
 
        self.assert_command_from_args(["--checker", "", "foo"],
1268
 
                                      SetCheckerCmd, value_to_set="")
1269
 
 
1270
 
    def test_checker_short(self):
1271
 
        self.assert_command_from_args(["-c", ":", "foo"],
1272
 
                                      SetCheckerCmd, value_to_set=":")
1273
 
 
1274
 
    def test_timeout(self):
1275
 
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1276
 
                                      SetTimeoutCmd,
1277
 
                                      value_to_set=300000)
1278
 
 
1279
 
    def test_timeout_short(self):
1280
 
        self.assert_command_from_args(["-t", "PT5M", "foo"],
1281
 
                                      SetTimeoutCmd,
1282
 
                                      value_to_set=300000)
1283
 
 
1284
 
    def test_extended_timeout(self):
1285
 
        self.assert_command_from_args(["--extended-timeout", "PT15M",
1286
 
                                       "foo"],
1287
 
                                      SetExtendedTimeoutCmd,
1288
 
                                      value_to_set=900000)
1289
 
 
1290
 
    def test_interval(self):
1291
 
        self.assert_command_from_args(["--interval", "PT2M", "foo"],
1292
 
                                      SetIntervalCmd,
1293
 
                                      value_to_set=120000)
1294
 
 
1295
 
    def test_interval_short(self):
1296
 
        self.assert_command_from_args(["-i", "PT2M", "foo"],
1297
 
                                      SetIntervalCmd,
1298
 
                                      value_to_set=120000)
1299
 
 
1300
 
    def test_approve_by_default(self):
1301
 
        self.assert_command_from_args(["--approve-by-default", "foo"],
1302
 
                                      ApproveByDefaultCmd)
1303
 
 
1304
 
    def test_deny_by_default(self):
1305
 
        self.assert_command_from_args(["--deny-by-default", "foo"],
1306
 
                                      DenyByDefaultCmd)
1307
 
 
1308
 
    def test_approval_delay(self):
1309
 
        self.assert_command_from_args(["--approval-delay", "PT30S",
1310
 
                                       "foo"], SetApprovalDelayCmd,
1311
 
                                      value_to_set=30000)
1312
 
 
1313
 
    def test_approval_duration(self):
1314
 
        self.assert_command_from_args(["--approval-duration", "PT1S",
1315
 
                                       "foo"], SetApprovalDurationCmd,
1316
 
                                      value_to_set=1000)
1317
 
 
1318
 
    def test_host(self):
1319
 
        self.assert_command_from_args(["--host", "foo.example.org",
1320
 
                                       "foo"], SetHostCmd,
1321
 
                                      value_to_set="foo.example.org")
1322
 
 
1323
 
    def test_host_short(self):
1324
 
        self.assert_command_from_args(["-H", "foo.example.org",
1325
 
                                       "foo"], SetHostCmd,
1326
 
                                      value_to_set="foo.example.org")
1327
 
 
1328
 
    def test_secret_devnull(self):
1329
 
        self.assert_command_from_args(["--secret", os.path.devnull,
1330
 
                                       "foo"], SetSecretCmd,
1331
 
                                      value_to_set=b"")
1332
 
 
1333
 
    def test_secret_tempfile(self):
1334
 
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
1335
 
            value = b"secret\0xyzzy\nbar"
1336
 
            f.write(value)
1337
 
            f.seek(0)
1338
 
            self.assert_command_from_args(["--secret", f.name,
1339
 
                                           "foo"], SetSecretCmd,
1340
 
                                          value_to_set=value)
1341
 
 
1342
 
    def test_secret_devnull_short(self):
1343
 
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1344
 
                                      SetSecretCmd, value_to_set=b"")
1345
 
 
1346
 
    def test_secret_tempfile_short(self):
1347
 
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
1348
 
            value = b"secret\0xyzzy\nbar"
1349
 
            f.write(value)
1350
 
            f.seek(0)
1351
 
            self.assert_command_from_args(["-s", f.name, "foo"],
1352
 
                                          SetSecretCmd,
1353
 
                                          value_to_set=value)
1354
 
 
1355
 
    def test_approve(self):
1356
 
        self.assert_command_from_args(["--approve", "foo"],
1357
 
                                      ApproveCmd)
1358
 
 
1359
 
    def test_approve_short(self):
1360
 
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1361
 
 
1362
 
    def test_deny(self):
1363
 
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1364
 
 
1365
 
    def test_deny_short(self):
1366
 
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
1367
 
 
1368
 
    def test_dump_json(self):
1369
 
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1370
 
 
1371
 
    def test_is_enabled(self):
1372
 
        self.assert_command_from_args(["--is-enabled", "foo"],
1373
 
                                      IsEnabledCmd)
1374
 
 
1375
 
    def test_is_enabled_short(self):
1376
 
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1377
 
 
1378
 
    def test_deny_before_remove(self):
1379
 
        options = self.parser.parse_args(["--deny", "--remove", "foo"])
1380
 
        check_option_syntax(self.parser, options)
1381
 
        commands = commands_from_options(options)
1382
 
        self.assertEqual(len(commands), 2)
1383
 
        self.assertIsInstance(commands[0], DenyCmd)
1384
 
        self.assertIsInstance(commands[1], RemoveCmd)
1385
 
 
1386
 
    def test_deny_before_remove_reversed(self):
1387
 
        options = self.parser.parse_args(["--remove", "--deny", "--all"])
1388
 
        check_option_syntax(self.parser, options)
1389
 
        commands = commands_from_options(options)
1390
 
        self.assertEqual(len(commands), 2)
1391
 
        self.assertIsInstance(commands[0], DenyCmd)
1392
 
        self.assertIsInstance(commands[1], RemoveCmd)
1393
 
 
1394
 
 
1395
 
class Test_check_option_syntax(unittest.TestCase):
1396
 
    # This mostly corresponds to the definition from has_actions() in
1397
 
    # check_option_syntax()
1398
 
    actions = {
1399
 
        # The actual values set here are not that important, but we do
1400
 
        # at least stick to the correct types, even though they are
1401
 
        # never used
1402
 
        "enable": True,
1403
 
        "disable": True,
1404
 
        "bump_timeout": True,
1405
 
        "start_checker": True,
1406
 
        "stop_checker": True,
1407
 
        "is_enabled": True,
1408
 
        "remove": True,
1409
 
        "checker": "x",
1410
 
        "timeout": datetime.timedelta(),
1411
 
        "extended_timeout": datetime.timedelta(),
1412
 
        "interval": datetime.timedelta(),
1413
 
        "approved_by_default": True,
1414
 
        "approval_delay": datetime.timedelta(),
1415
 
        "approval_duration": datetime.timedelta(),
1416
 
        "host": "x",
1417
 
        "secret": io.BytesIO(b"x"),
1418
 
        "approve": True,
1419
 
        "deny": True,
1420
 
    }
1421
 
 
1422
 
    def setUp(self):
1423
 
        self.parser = argparse.ArgumentParser()
1424
 
        add_command_line_options(self.parser)
1425
 
 
1426
 
    @contextlib.contextmanager
1427
 
    def assertParseError(self):
1428
 
        with self.assertRaises(SystemExit) as e:
1429
 
            with self.temporarily_suppress_stderr():
1430
 
                yield
1431
 
        # Exit code from argparse is guaranteed to be "2".  Reference:
1432
 
        # https://docs.python.org/3/library/argparse.html#exiting-methods
1433
 
        self.assertEqual(e.exception.code, 2)
1434
 
 
1435
 
    @staticmethod
1436
 
    @contextlib.contextmanager
1437
 
    def temporarily_suppress_stderr():
1438
 
        null = os.open(os.path.devnull, os.O_RDWR)
1439
 
        stderrcopy = os.dup(sys.stderr.fileno())
1440
 
        os.dup2(null, sys.stderr.fileno())
1441
 
        os.close(null)
1442
 
        try:
1443
 
            yield
1444
 
        finally:
1445
 
            # restore stderr
1446
 
            os.dup2(stderrcopy, sys.stderr.fileno())
1447
 
            os.close(stderrcopy)
1448
 
 
1449
 
    def check_option_syntax(self, options):
1450
 
        check_option_syntax(self.parser, options)
1451
 
 
1452
 
    def test_actions_requires_client_or_all(self):
1453
 
        for action, value in self.actions.items():
1454
 
            options = self.parser.parse_args()
1455
 
            setattr(options, action, value)
1456
 
            with self.assertParseError():
1457
 
                self.check_option_syntax(options)
1458
 
 
1459
 
    def test_actions_conflicts_with_verbose(self):
1460
 
        for action, value in self.actions.items():
1461
 
            options = self.parser.parse_args()
1462
 
            setattr(options, action, value)
1463
 
            options.verbose = True
1464
 
            with self.assertParseError():
1465
 
                self.check_option_syntax(options)
1466
 
 
1467
 
    def test_dump_json_conflicts_with_verbose(self):
1468
 
        options = self.parser.parse_args()
1469
 
        options.dump_json = True
1470
 
        options.verbose = True
1471
 
        with self.assertParseError():
1472
 
            self.check_option_syntax(options)
1473
 
 
1474
 
    def test_dump_json_conflicts_with_action(self):
1475
 
        for action, value in self.actions.items():
1476
 
            options = self.parser.parse_args()
1477
 
            setattr(options, action, value)
1478
 
            options.dump_json = True
1479
 
            with self.assertParseError():
1480
 
                self.check_option_syntax(options)
1481
 
 
1482
 
    def test_all_can_not_be_alone(self):
1483
 
        options = self.parser.parse_args()
1484
 
        options.all = True
1485
 
        with self.assertParseError():
1486
 
            self.check_option_syntax(options)
1487
 
 
1488
 
    def test_all_is_ok_with_any_action(self):
1489
 
        for action, value in self.actions.items():
1490
 
            options = self.parser.parse_args()
1491
 
            setattr(options, action, value)
1492
 
            options.all = True
1493
 
            self.check_option_syntax(options)
1494
 
 
1495
 
    def test_is_enabled_fails_without_client(self):
1496
 
        options = self.parser.parse_args()
1497
 
        options.is_enabled = True
1498
 
        with self.assertParseError():
1499
 
            self.check_option_syntax(options)
1500
 
 
1501
 
    def test_is_enabled_works_with_one_client(self):
1502
 
        options = self.parser.parse_args()
1503
 
        options.is_enabled = True
1504
 
        options.client = ["foo"]
1505
 
        self.check_option_syntax(options)
1506
 
 
1507
 
    def test_is_enabled_fails_with_two_clients(self):
1508
 
        options = self.parser.parse_args()
1509
 
        options.is_enabled = True
1510
 
        options.client = ["foo", "barbar"]
1511
 
        with self.assertParseError():
1512
 
            self.check_option_syntax(options)
1513
 
 
1514
 
    def test_remove_can_only_be_combined_with_action_deny(self):
1515
 
        for action, value in self.actions.items():
1516
 
            if action in {"remove", "deny"}:
1517
 
                continue
1518
 
            options = self.parser.parse_args()
1519
 
            setattr(options, action, value)
1520
 
            options.all = True
1521
 
            options.remove = True
1522
 
            with self.assertParseError():
1523
 
                self.check_option_syntax(options)
 
1114
    property = "ApprovalDuration"
 
1115
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1116
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1524
1117
 
1525
1118
 
1526
1119