/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-06 19:02:46 UTC
  • Revision ID: teddy@recompile.se-20190306190246-5mdmymvq62qpba3y
mandos-ctl: Refactor test

* mandos-ctl (commands_from_args): Removed.
  (TestOptions.assert_command_from_args): New.

Show diffs side-by-side

added added

removed removed

Lines of Context:
42
42
import json
43
43
import unittest
44
44
import logging
45
 
import io
46
 
import tempfile
47
 
import contextlib
48
45
 
49
46
import dbus
50
47
 
294
291
                    "ApprovalDuration", "Checker", "ExtendedTimeout",
295
292
                    "Expires", "LastCheckerStatus")
296
293
    def run(self, mandos, clients):
297
 
        print(self.output(clients.values()))
 
294
        print(self.output(clients))
298
295
 
299
296
class PropertyCmd(Command):
300
297
    """Abstract class for Actions for setting one client property"""
301
298
    def run_on_one_client(self, client, properties):
302
299
        """Set the Client's D-Bus property"""
303
 
        log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
304
 
                  client.__dbus_object_path__,
305
 
                  dbus.PROPERTIES_IFACE, client_interface,
306
 
                  self.propname, self.value_to_set
307
 
                  if not isinstance(self.value_to_set, dbus.Boolean)
308
 
                  else bool(self.value_to_set))
309
 
        client.Set(client_interface, self.propname, self.value_to_set,
 
300
        client.Set(client_interface, self.property, self.value_to_set,
310
301
                   dbus_interface=dbus.PROPERTIES_IFACE)
311
302
 
312
303
class ValueArgumentMixIn(object):
323
314
    @value_to_set.setter
324
315
    def value_to_set(self, value):
325
316
        """When setting, convert value to a datetime.timedelta"""
326
 
        self._vts = int(round(value.total_seconds() * 1000))
 
317
        self._vts = string_to_delta(value).total_seconds() * 1000
327
318
 
328
319
# Actual (non-abstract) command classes
329
320
 
336
327
        keywords = default_keywords
337
328
        if self.verbose:
338
329
            keywords = self.all_keywords
339
 
        return str(self.TableOfClients(clients, keywords))
 
330
        return str(self.TableOfClients(clients.values(), keywords))
340
331
 
341
332
    class TableOfClients(object):
342
333
        tableheaders = {
433
424
            sys.exit(0)
434
425
        sys.exit(1)
435
426
    def is_enabled(self, client, properties):
436
 
        log.debug("D-Bus: %s:%s:%s.Get(%r, %r)", busname,
437
 
                  client.__dbus_object_path__,
438
 
                  dbus.PROPERTIES_IFACE, client_interface,
439
 
                  "Enabled")
440
 
        return bool(client.Get(client_interface, "Enabled",
441
 
                               dbus_interface=dbus.PROPERTIES_IFACE))
 
427
        return bool(properties["Enabled"])
442
428
 
443
429
class RemoveCmd(Command):
444
430
    def run_on_one_client(self, client, properties):
445
 
        log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
446
 
                  server_path, server_interface,
447
 
                  str(client.__dbus_object_path__))
448
431
        self.mandos.RemoveClient(client.__dbus_object_path__)
449
432
 
450
433
class ApproveCmd(Command):
451
434
    def run_on_one_client(self, client, properties):
452
 
        log.debug("D-Bus: %s:%s.Approve(True)",
453
 
                  client.__dbus_object_path__, client_interface)
454
435
        client.Approve(dbus.Boolean(True),
455
436
                       dbus_interface=client_interface)
456
437
 
457
438
class DenyCmd(Command):
458
439
    def run_on_one_client(self, client, properties):
459
 
        log.debug("D-Bus: %s:%s.Approve(False)",
460
 
                  client.__dbus_object_path__, client_interface)
461
440
        client.Approve(dbus.Boolean(False),
462
441
                       dbus_interface=client_interface)
463
442
 
464
443
class EnableCmd(PropertyCmd):
465
 
    propname = "Enabled"
 
444
    property = "Enabled"
466
445
    value_to_set = dbus.Boolean(True)
467
446
 
468
447
class DisableCmd(PropertyCmd):
469
 
    propname = "Enabled"
 
448
    property = "Enabled"
470
449
    value_to_set = dbus.Boolean(False)
471
450
 
472
451
class BumpTimeoutCmd(PropertyCmd):
473
 
    propname = "LastCheckedOK"
 
452
    property = "LastCheckedOK"
474
453
    value_to_set = ""
475
454
 
476
455
class StartCheckerCmd(PropertyCmd):
477
 
    propname = "CheckerRunning"
 
456
    property = "CheckerRunning"
478
457
    value_to_set = dbus.Boolean(True)
479
458
 
480
459
class StopCheckerCmd(PropertyCmd):
481
 
    propname = "CheckerRunning"
 
460
    property = "CheckerRunning"
482
461
    value_to_set = dbus.Boolean(False)
483
462
 
484
463
class ApproveByDefaultCmd(PropertyCmd):
485
 
    propname = "ApprovedByDefault"
 
464
    property = "ApprovedByDefault"
486
465
    value_to_set = dbus.Boolean(True)
487
466
 
488
467
class DenyByDefaultCmd(PropertyCmd):
489
 
    propname = "ApprovedByDefault"
 
468
    property = "ApprovedByDefault"
490
469
    value_to_set = dbus.Boolean(False)
491
470
 
492
471
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
493
 
    propname = "Checker"
 
472
    property = "Checker"
494
473
 
495
474
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
496
 
    propname = "Host"
 
475
    property = "Host"
497
476
 
498
477
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
499
 
    propname = "Secret"
500
 
    @property
501
 
    def value_to_set(self):
502
 
        return self._vts
503
 
    @value_to_set.setter
504
 
    def value_to_set(self, value):
505
 
        """When setting, read data from supplied file object"""
506
 
        self._vts = value.read()
507
 
        value.close()
 
478
    property = "Secret"
508
479
 
509
480
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
510
 
    propname = "Timeout"
 
481
    property = "Timeout"
511
482
 
512
483
class SetExtendedTimeoutCmd(PropertyCmd,
513
484
                            MillisecondsValueArgumentMixIn):
514
 
    propname = "ExtendedTimeout"
 
485
    property = "ExtendedTimeout"
515
486
 
516
487
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
517
 
    propname = "Interval"
 
488
    property = "Interval"
518
489
 
519
490
class SetApprovalDelayCmd(PropertyCmd,
520
491
                          MillisecondsValueArgumentMixIn):
521
 
    propname = "ApprovalDelay"
 
492
    property = "ApprovalDelay"
522
493
 
523
494
class SetApprovalDurationCmd(PropertyCmd,
524
495
                             MillisecondsValueArgumentMixIn):
525
 
    propname = "ApprovalDuration"
 
496
    property = "ApprovalDuration"
 
497
 
 
498
def has_actions(options):
 
499
    return any((options.enable,
 
500
                options.disable,
 
501
                options.bump_timeout,
 
502
                options.start_checker,
 
503
                options.stop_checker,
 
504
                options.is_enabled,
 
505
                options.remove,
 
506
                options.checker is not None,
 
507
                options.timeout is not None,
 
508
                options.extended_timeout is not None,
 
509
                options.interval is not None,
 
510
                options.approved_by_default is not None,
 
511
                options.approval_delay is not None,
 
512
                options.approval_duration is not None,
 
513
                options.host is not None,
 
514
                options.secret is not None,
 
515
                options.approve,
 
516
                options.deny))
526
517
 
527
518
def add_command_line_options(parser):
528
519
    parser.add_argument("--version", action="version",
555
546
                        help="Remove client")
556
547
    parser.add_argument("-c", "--checker",
557
548
                        help="Set checker command for client")
558
 
    parser.add_argument("-t", "--timeout", type=string_to_delta,
 
549
    parser.add_argument("-t", "--timeout",
559
550
                        help="Set timeout for client")
560
 
    parser.add_argument("--extended-timeout", type=string_to_delta,
 
551
    parser.add_argument("--extended-timeout",
561
552
                        help="Set extended timeout for client")
562
 
    parser.add_argument("-i", "--interval", type=string_to_delta,
 
553
    parser.add_argument("-i", "--interval",
563
554
                        help="Set checker interval for client")
564
555
    approve_deny_default = parser.add_mutually_exclusive_group()
565
556
    approve_deny_default.add_argument(
570
561
        "--deny-by-default", action="store_false",
571
562
        dest="approved_by_default",
572
563
        help="Set client to be denied by default")
573
 
    parser.add_argument("--approval-delay", type=string_to_delta,
 
564
    parser.add_argument("--approval-delay",
574
565
                        help="Set delay before client approve/deny")
575
 
    parser.add_argument("--approval-duration", type=string_to_delta,
 
566
    parser.add_argument("--approval-duration",
576
567
                        help="Set duration of one client approval")
577
568
    parser.add_argument("-H", "--host", help="Set host for client")
578
569
    parser.add_argument("-s", "--secret",
584
575
        help="Approve any current client request")
585
576
    approve_deny.add_argument("-D", "--deny", action="store_true",
586
577
                              help="Deny any current client request")
587
 
    parser.add_argument("--debug", action="store_true",
588
 
                        help="Debug mode (show D-Bus commands)")
589
578
    parser.add_argument("--check", action="store_true",
590
579
                        help="Run self-test")
591
580
    parser.add_argument("client", nargs="*", help="Client name")
616
605
    if options.is_enabled:
617
606
        commands.append(IsEnabledCmd())
618
607
 
 
608
    if options.remove:
 
609
        commands.append(RemoveCmd())
 
610
 
619
611
    if options.checker is not None:
620
 
        commands.append(SetCheckerCmd(options.checker))
 
612
        commands.append(SetCheckerCmd())
621
613
 
622
614
    if options.timeout is not None:
623
615
        commands.append(SetTimeoutCmd(options.timeout))
627
619
            SetExtendedTimeoutCmd(options.extended_timeout))
628
620
 
629
621
    if options.interval is not None:
630
 
        commands.append(SetIntervalCmd(options.interval))
 
622
        command.append(SetIntervalCmd(options.interval))
631
623
 
632
624
    if options.approved_by_default is not None:
633
625
        if options.approved_by_default:
634
 
            commands.append(ApproveByDefaultCmd())
 
626
            command.append(ApproveByDefaultCmd())
635
627
        else:
636
 
            commands.append(DenyByDefaultCmd())
 
628
            command.append(DenyByDefaultCmd())
637
629
 
638
630
    if options.approval_delay is not None:
639
 
        commands.append(SetApprovalDelayCmd(options.approval_delay))
 
631
        command.append(SetApprovalDelayCmd(options.approval_delay))
640
632
 
641
633
    if options.approval_duration is not None:
642
 
        commands.append(
 
634
        command.append(
643
635
            SetApprovalDurationCmd(options.approval_duration))
644
636
 
645
637
    if options.host is not None:
646
 
        commands.append(SetHostCmd(options.host))
 
638
        command.append(SetHostCmd(options.host))
647
639
 
648
640
    if options.secret is not None:
649
 
        commands.append(SetSecretCmd(options.secret))
 
641
        command.append(SetSecretCmd(options.secret))
650
642
 
651
643
    if options.approve:
652
644
        commands.append(ApproveCmd())
654
646
    if options.deny:
655
647
        commands.append(DenyCmd())
656
648
 
657
 
    if options.remove:
658
 
        commands.append(RemoveCmd())
659
 
 
660
649
    # If no command option has been given, show table of clients,
661
650
    # optionally verbosely
662
651
    if not commands:
665
654
    return commands
666
655
 
667
656
 
668
 
def check_option_syntax(parser, options):
669
 
    """Apply additional restrictions on options, not expressible in
670
 
argparse"""
671
 
 
672
 
    def has_actions(options):
673
 
        return any((options.enable,
674
 
                    options.disable,
675
 
                    options.bump_timeout,
676
 
                    options.start_checker,
677
 
                    options.stop_checker,
678
 
                    options.is_enabled,
679
 
                    options.remove,
680
 
                    options.checker is not None,
681
 
                    options.timeout is not None,
682
 
                    options.extended_timeout is not None,
683
 
                    options.interval is not None,
684
 
                    options.approved_by_default is not None,
685
 
                    options.approval_delay is not None,
686
 
                    options.approval_duration is not None,
687
 
                    options.host is not None,
688
 
                    options.secret is not None,
689
 
                    options.approve,
690
 
                    options.deny))
 
657
def main():
 
658
    parser = argparse.ArgumentParser()
 
659
 
 
660
    add_command_line_options(parser)
 
661
 
 
662
    options = parser.parse_args()
691
663
 
692
664
    if has_actions(options) and not (options.client or options.all):
693
665
        parser.error("Options require clients names or --all.")
700
672
        parser.error("--all requires an action.")
701
673
    if options.is_enabled and len(options.client) > 1:
702
674
        parser.error("--is-enabled requires exactly one client")
703
 
    if options.remove:
704
 
        options.remove = False
705
 
        if has_actions(options) and not options.deny:
706
 
            parser.error("--remove can only be combined with --deny")
707
 
        options.remove = True
708
 
 
709
 
 
710
 
def main():
711
 
    parser = argparse.ArgumentParser()
712
 
 
713
 
    add_command_line_options(parser)
714
 
 
715
 
    options = parser.parse_args()
716
 
 
717
 
    check_option_syntax(parser, options)
718
675
 
719
676
    clientnames = options.client
720
677
 
721
 
    if options.debug:
722
 
        log.setLevel(logging.DEBUG)
723
 
 
724
678
    try:
725
679
        bus = dbus.SystemBus()
726
 
        log.debug("D-Bus: Connect to: (name=%r, path=%r)", busname,
727
 
                  server_path)
728
680
        mandos_dbus_objc = bus.get_object(busname, server_path)
729
681
    except dbus.exceptions.DBusException:
730
682
        log.critical("Could not connect to Mandos server")
743
695
    dbus_filter = NullFilter()
744
696
    try:
745
697
        dbus_logger.addFilter(dbus_filter)
746
 
        log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", busname,
747
 
                  server_path, dbus.OBJECT_MANAGER_IFACE)
748
698
        mandos_clients = {path: ifs_and_props[client_interface]
749
699
                          for path, ifs_and_props in
750
700
                          mandos_serv_object_manager
762
712
    clients = {}
763
713
 
764
714
    if not clientnames:
765
 
        clients = {(log.debug("D-Bus: Connect to: (name=%r, path=%r)",
766
 
                              busname, str(path)) and False) or
767
 
                   bus.get_object(busname, path): properties
 
715
        clients = {bus.get_object(busname, path): properties
768
716
                   for path, properties in mandos_clients.items()}
769
717
    else:
770
718
        for name in clientnames:
771
719
            for path, client in mandos_clients.items():
772
720
                if client["Name"] == name:
773
 
                    log.debug("D-Bus: Connect to: (name=%r, path=%r)",
774
 
                              busname, str(path))
775
721
                    client_objc = bus.get_object(busname, path)
776
722
                    clients[client_objc] = client
777
723
                    break
841
787
                self.attributes = attributes
842
788
                self.attributes["Name"] = name
843
789
                self.calls = []
844
 
            def Set(self, interface, propname, value, dbus_interface):
845
 
                testcase.assertEqual(interface, client_interface)
846
 
                testcase.assertEqual(dbus_interface,
847
 
                                     dbus.PROPERTIES_IFACE)
848
 
                self.attributes[propname] = value
849
 
            def Get(self, interface, propname, dbus_interface):
850
 
                testcase.assertEqual(interface, client_interface)
851
 
                testcase.assertEqual(dbus_interface,
852
 
                                     dbus.PROPERTIES_IFACE)
853
 
                return self.attributes[propname]
 
790
            def Set(self, interface, property, value, dbus_interface):
 
791
                testcase.assertEqual(interface, client_interface)
 
792
                testcase.assertEqual(dbus_interface,
 
793
                                     dbus.PROPERTIES_IFACE)
 
794
                self.attributes[property] = value
 
795
            def Get(self, interface, property, dbus_interface):
 
796
                testcase.assertEqual(interface, client_interface)
 
797
                testcase.assertEqual(dbus_interface,
 
798
                                     dbus.PROPERTIES_IFACE)
 
799
                return self.attributes[property]
854
800
            def Approve(self, approve, dbus_interface):
855
801
                testcase.assertEqual(dbus_interface, client_interface)
856
802
                self.calls.append(("Approve", (approve,
912
858
 
913
859
class TestPrintTableCmd(TestCmd):
914
860
    def test_normal(self):
915
 
        output = PrintTableCmd().output(self.clients.values())
 
861
        output = PrintTableCmd().output(self.clients)
916
862
        expected_output = """
917
863
Name   Enabled Timeout  Last Successful Check
918
864
foo    Yes     00:05:00 2019-02-03T00:00:00  
920
866
"""[1:-1]
921
867
        self.assertEqual(output, expected_output)
922
868
    def test_verbose(self):
923
 
        output = PrintTableCmd(verbose=True).output(
924
 
            self.clients.values())
 
869
        output = PrintTableCmd(verbose=True).output(self.clients)
925
870
        expected_output = """
926
871
Name   Enabled Timeout  Last Successful Check Created             Interval Host            Key ID                                                           Fingerprint                              Check Is Running Last Enabled        Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker              Extended Timeout Expires             Last Checker Status
927
872
foo    Yes     00:05:00 2019-02-03T00:00:00   2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No               2019-01-03T00:00:00 No                  Yes                                       00:00:00       00:00:01          fping -q -- %(host)s 00:15:00         2019-02-04T00:00:00 0                  
929
874
"""[1:-1]
930
875
        self.assertEqual(output, expected_output)
931
876
    def test_one_client(self):
932
 
        output = PrintTableCmd().output(self.one_client.values())
 
877
        output = PrintTableCmd().output(self.one_client)
933
878
        expected_output = """
934
879
Name Enabled Timeout  Last Successful Check
935
880
foo  Yes     00:05:00 2019-02-03T00:00:00  
1079
1024
        for value_to_set, value_to_get in zip(self.values_to_set,
1080
1025
                                              values_to_get):
1081
1026
            for client in self.clients:
1082
 
                old_value = client.attributes[self.propname]
 
1027
                old_value = client.attributes[self.property]
1083
1028
                self.assertNotIsInstance(old_value, Unique)
1084
 
                client.attributes[self.propname] = Unique()
 
1029
                client.attributes[self.property] = Unique()
1085
1030
            self.run_command(value_to_set, self.clients)
1086
1031
            for client in self.clients:
1087
 
                value = client.attributes[self.propname]
 
1032
                value = client.attributes[self.property]
1088
1033
                self.assertNotIsInstance(value, Unique)
1089
1034
                self.assertEqual(value, value_to_get)
1090
1035
    def run_command(self, value, clients):
1092
1037
 
1093
1038
class TestBumpTimeoutCmd(TestPropertyCmd):
1094
1039
    command = BumpTimeoutCmd
1095
 
    propname = "LastCheckedOK"
 
1040
    property = "LastCheckedOK"
1096
1041
    values_to_set = [""]
1097
1042
 
1098
1043
class TestStartCheckerCmd(TestPropertyCmd):
1099
1044
    command = StartCheckerCmd
1100
 
    propname = "CheckerRunning"
 
1045
    property = "CheckerRunning"
1101
1046
    values_to_set = [dbus.Boolean(True)]
1102
1047
 
1103
1048
class TestStopCheckerCmd(TestPropertyCmd):
1104
1049
    command = StopCheckerCmd
1105
 
    propname = "CheckerRunning"
 
1050
    property = "CheckerRunning"
1106
1051
    values_to_set = [dbus.Boolean(False)]
1107
1052
 
1108
1053
class TestApproveByDefaultCmd(TestPropertyCmd):
1109
1054
    command = ApproveByDefaultCmd
1110
 
    propname = "ApprovedByDefault"
 
1055
    property = "ApprovedByDefault"
1111
1056
    values_to_set = [dbus.Boolean(True)]
1112
1057
 
1113
1058
class TestDenyByDefaultCmd(TestPropertyCmd):
1114
1059
    command = DenyByDefaultCmd
1115
 
    propname = "ApprovedByDefault"
 
1060
    property = "ApprovedByDefault"
1116
1061
    values_to_set = [dbus.Boolean(False)]
1117
1062
 
1118
1063
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1127
1072
 
1128
1073
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1129
1074
    command = SetCheckerCmd
1130
 
    propname = "Checker"
 
1075
    property = "Checker"
1131
1076
    values_to_set = ["", ":", "fping -q -- %s"]
1132
1077
 
1133
1078
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1134
1079
    command = SetHostCmd
1135
 
    propname = "Host"
 
1080
    property = "Host"
1136
1081
    values_to_set = ["192.0.2.3", "foo.example.org"]
1137
1082
 
1138
1083
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1139
1084
    command = SetSecretCmd
1140
 
    propname = "Secret"
1141
 
    values_to_set = [io.BytesIO(b""),
1142
 
                     io.BytesIO(b"secret\0xyzzy\nbar")]
1143
 
    values_to_get = [b"", b"secret\0xyzzy\nbar"]
 
1085
    property = "Secret"
 
1086
    values_to_set = [b"", b"secret"]
1144
1087
 
1145
1088
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1146
1089
    command = SetTimeoutCmd
1147
 
    propname = "Timeout"
1148
 
    values_to_set = [datetime.timedelta(),
1149
 
                     datetime.timedelta(minutes=5),
1150
 
                     datetime.timedelta(seconds=1),
1151
 
                     datetime.timedelta(weeks=1),
1152
 
                     datetime.timedelta(weeks=52)]
1153
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1090
    property = "Timeout"
 
1091
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1092
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1154
1093
 
1155
1094
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1156
1095
    command = SetExtendedTimeoutCmd
1157
 
    propname = "ExtendedTimeout"
1158
 
    values_to_set = [datetime.timedelta(),
1159
 
                     datetime.timedelta(minutes=5),
1160
 
                     datetime.timedelta(seconds=1),
1161
 
                     datetime.timedelta(weeks=1),
1162
 
                     datetime.timedelta(weeks=52)]
1163
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1096
    property = "ExtendedTimeout"
 
1097
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1098
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1164
1099
 
1165
1100
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1166
1101
    command = SetIntervalCmd
1167
 
    propname = "Interval"
1168
 
    values_to_set = [datetime.timedelta(),
1169
 
                     datetime.timedelta(minutes=5),
1170
 
                     datetime.timedelta(seconds=1),
1171
 
                     datetime.timedelta(weeks=1),
1172
 
                     datetime.timedelta(weeks=52)]
1173
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1102
    property = "Interval"
 
1103
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1104
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1174
1105
 
1175
1106
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1176
1107
    command = SetApprovalDelayCmd
1177
 
    propname = "ApprovalDelay"
1178
 
    values_to_set = [datetime.timedelta(),
1179
 
                     datetime.timedelta(minutes=5),
1180
 
                     datetime.timedelta(seconds=1),
1181
 
                     datetime.timedelta(weeks=1),
1182
 
                     datetime.timedelta(weeks=52)]
1183
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1108
    property = "ApprovalDelay"
 
1109
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1110
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1184
1111
 
1185
1112
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1186
1113
    command = SetApprovalDurationCmd
1187
 
    propname = "ApprovalDuration"
1188
 
    values_to_set = [datetime.timedelta(),
1189
 
                     datetime.timedelta(minutes=5),
1190
 
                     datetime.timedelta(seconds=1),
1191
 
                     datetime.timedelta(weeks=1),
1192
 
                     datetime.timedelta(weeks=52)]
1193
 
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
 
1114
    property = "ApprovalDuration"
 
1115
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
 
1116
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1194
1117
 
1195
 
class Test_command_from_options(unittest.TestCase):
 
1118
class TestOptions(unittest.TestCase):
1196
1119
    def setUp(self):
1197
1120
        self.parser = argparse.ArgumentParser()
1198
1121
        add_command_line_options(self.parser)
1200
1123
        """Assert that parsing ARGS should result in an instance of
1201
1124
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1202
1125
        options = self.parser.parse_args(args)
1203
 
        check_option_syntax(self.parser, options)
1204
1126
        commands = commands_from_options(options)
1205
1127
        self.assertEqual(len(commands), 1)
1206
1128
        command = commands[0]
1207
1129
        self.assertIsInstance(command, command_cls)
1208
1130
        for key, value in cmd_attrs.items():
1209
1131
            self.assertEqual(getattr(command, key), value)
1210
 
    def test_print_table(self):
 
1132
    def test_default_is_show_table(self):
1211
1133
        self.assert_command_from_args([], PrintTableCmd,
1212
1134
                                      verbose=False)
1213
 
 
1214
 
    def test_print_table_verbose(self):
 
1135
    def test_show_table_verbose(self):
1215
1136
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
1216
1137
                                      verbose=True)
1217
 
 
1218
 
    def test_print_table_verbose_short(self):
1219
 
        self.assert_command_from_args(["-v"], PrintTableCmd,
1220
 
                                      verbose=True)
1221
 
 
1222
1138
    def test_enable(self):
1223
 
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1224
 
 
1225
 
    def test_enable_short(self):
1226
 
        self.assert_command_from_args(["-e", "foo"], EnableCmd)
1227
 
 
 
1139
        self.assert_command_from_args(["--enable"], EnableCmd)
1228
1140
    def test_disable(self):
1229
 
        self.assert_command_from_args(["--disable", "foo"],
1230
 
                                      DisableCmd)
1231
 
 
1232
 
    def test_disable_short(self):
1233
 
        self.assert_command_from_args(["-d", "foo"], DisableCmd)
1234
 
 
1235
 
    def test_bump_timeout(self):
1236
 
        self.assert_command_from_args(["--bump-timeout", "foo"],
1237
 
                                      BumpTimeoutCmd)
1238
 
 
1239
 
    def test_bump_timeout_short(self):
1240
 
        self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1241
 
 
1242
 
    def test_start_checker(self):
1243
 
        self.assert_command_from_args(["--start-checker", "foo"],
1244
 
                                      StartCheckerCmd)
1245
 
 
1246
 
    def test_stop_checker(self):
1247
 
        self.assert_command_from_args(["--stop-checker", "foo"],
1248
 
                                      StopCheckerCmd)
1249
 
 
1250
 
    def test_remove(self):
1251
 
        self.assert_command_from_args(["--remove", "foo"],
1252
 
                                      RemoveCmd)
1253
 
 
1254
 
    def test_remove_short(self):
1255
 
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1256
 
 
1257
 
    def test_checker(self):
1258
 
        self.assert_command_from_args(["--checker", ":", "foo"],
1259
 
                                      SetCheckerCmd, value_to_set=":")
1260
 
 
1261
 
    def test_checker_empty(self):
1262
 
        self.assert_command_from_args(["--checker", "", "foo"],
1263
 
                                      SetCheckerCmd, value_to_set="")
1264
 
 
1265
 
    def test_checker_short(self):
1266
 
        self.assert_command_from_args(["-c", ":", "foo"],
1267
 
                                      SetCheckerCmd, value_to_set=":")
1268
 
 
1269
 
    def test_timeout(self):
1270
 
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1271
 
                                      SetTimeoutCmd,
1272
 
                                      value_to_set=300000)
1273
 
 
1274
 
    def test_timeout_short(self):
1275
 
        self.assert_command_from_args(["-t", "PT5M", "foo"],
1276
 
                                      SetTimeoutCmd,
1277
 
                                      value_to_set=300000)
1278
 
 
1279
 
    def test_extended_timeout(self):
1280
 
        self.assert_command_from_args(["--extended-timeout", "PT15M",
1281
 
                                       "foo"],
1282
 
                                      SetExtendedTimeoutCmd,
1283
 
                                      value_to_set=900000)
1284
 
 
1285
 
    def test_interval(self):
1286
 
        self.assert_command_from_args(["--interval", "PT2M", "foo"],
1287
 
                                      SetIntervalCmd,
1288
 
                                      value_to_set=120000)
1289
 
 
1290
 
    def test_interval_short(self):
1291
 
        self.assert_command_from_args(["-i", "PT2M", "foo"],
1292
 
                                      SetIntervalCmd,
1293
 
                                      value_to_set=120000)
1294
 
 
1295
 
    def test_approve_by_default(self):
1296
 
        self.assert_command_from_args(["--approve-by-default", "foo"],
1297
 
                                      ApproveByDefaultCmd)
1298
 
 
1299
 
    def test_deny_by_default(self):
1300
 
        self.assert_command_from_args(["--deny-by-default", "foo"],
1301
 
                                      DenyByDefaultCmd)
1302
 
 
1303
 
    def test_approval_delay(self):
1304
 
        self.assert_command_from_args(["--approval-delay", "PT30S",
1305
 
                                       "foo"], SetApprovalDelayCmd,
1306
 
                                      value_to_set=30000)
1307
 
 
1308
 
    def test_approval_duration(self):
1309
 
        self.assert_command_from_args(["--approval-duration", "PT1S",
1310
 
                                       "foo"], SetApprovalDurationCmd,
1311
 
                                      value_to_set=1000)
1312
 
 
1313
 
    def test_host(self):
1314
 
        self.assert_command_from_args(["--host", "foo.example.org",
1315
 
                                       "foo"], SetHostCmd,
1316
 
                                      value_to_set="foo.example.org")
1317
 
 
1318
 
    def test_host_short(self):
1319
 
        self.assert_command_from_args(["-H", "foo.example.org",
1320
 
                                       "foo"], SetHostCmd,
1321
 
                                      value_to_set="foo.example.org")
1322
 
 
1323
 
    def test_secret_devnull(self):
1324
 
        self.assert_command_from_args(["--secret", os.path.devnull,
1325
 
                                       "foo"], SetSecretCmd,
1326
 
                                      value_to_set=b"")
1327
 
 
1328
 
    def test_secret_tempfile(self):
1329
 
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
1330
 
            value = b"secret\0xyzzy\nbar"
1331
 
            f.write(value)
1332
 
            f.seek(0)
1333
 
            self.assert_command_from_args(["--secret", f.name,
1334
 
                                           "foo"], SetSecretCmd,
1335
 
                                          value_to_set=value)
1336
 
 
1337
 
    def test_secret_devnull_short(self):
1338
 
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1339
 
                                      SetSecretCmd, value_to_set=b"")
1340
 
 
1341
 
    def test_secret_tempfile_short(self):
1342
 
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
1343
 
            value = b"secret\0xyzzy\nbar"
1344
 
            f.write(value)
1345
 
            f.seek(0)
1346
 
            self.assert_command_from_args(["-s", f.name, "foo"],
1347
 
                                          SetSecretCmd,
1348
 
                                          value_to_set=value)
1349
 
 
1350
 
    def test_approve(self):
1351
 
        self.assert_command_from_args(["--approve", "foo"],
1352
 
                                      ApproveCmd)
1353
 
 
1354
 
    def test_approve_short(self):
1355
 
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1356
 
 
1357
 
    def test_deny(self):
1358
 
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1359
 
 
1360
 
    def test_deny_short(self):
1361
 
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
1362
 
 
1363
 
    def test_dump_json(self):
1364
 
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1365
 
 
1366
 
    def test_is_enabled(self):
1367
 
        self.assert_command_from_args(["--is-enabled", "foo"],
1368
 
                                      IsEnabledCmd)
1369
 
 
1370
 
    def test_is_enabled_short(self):
1371
 
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1372
 
 
1373
 
    def test_deny_before_remove(self):
1374
 
        options = self.parser.parse_args(["--deny", "--remove", "foo"])
1375
 
        check_option_syntax(self.parser, options)
1376
 
        commands = commands_from_options(options)
1377
 
        self.assertEqual(len(commands), 2)
1378
 
        self.assertIsInstance(commands[0], DenyCmd)
1379
 
        self.assertIsInstance(commands[1], RemoveCmd)
1380
 
 
1381
 
    def test_deny_before_remove_reversed(self):
1382
 
        options = self.parser.parse_args(["--remove", "--deny", "--all"])
1383
 
        check_option_syntax(self.parser, options)
1384
 
        commands = commands_from_options(options)
1385
 
        self.assertEqual(len(commands), 2)
1386
 
        self.assertIsInstance(commands[0], DenyCmd)
1387
 
        self.assertIsInstance(commands[1], RemoveCmd)
1388
 
 
1389
 
 
1390
 
class Test_check_option_syntax(unittest.TestCase):
1391
 
    # This mostly corresponds to the definition from has_actions() in
1392
 
    # check_option_syntax()
1393
 
    actions = {
1394
 
        # The actual values set here are not that important, but we do
1395
 
        # at least stick to the correct types, even though they are
1396
 
        # never used
1397
 
        "enable": True,
1398
 
        "disable": True,
1399
 
        "bump_timeout": True,
1400
 
        "start_checker": True,
1401
 
        "stop_checker": True,
1402
 
        "is_enabled": True,
1403
 
        "remove": True,
1404
 
        "checker": "x",
1405
 
        "timeout": datetime.timedelta(),
1406
 
        "extended_timeout": datetime.timedelta(),
1407
 
        "interval": datetime.timedelta(),
1408
 
        "approved_by_default": True,
1409
 
        "approval_delay": datetime.timedelta(),
1410
 
        "approval_duration": datetime.timedelta(),
1411
 
        "host": "x",
1412
 
        "secret": io.BytesIO(b"x"),
1413
 
        "approve": True,
1414
 
        "deny": True,
1415
 
    }
1416
 
 
1417
 
    def setUp(self):
1418
 
        self.parser = argparse.ArgumentParser()
1419
 
        add_command_line_options(self.parser)
1420
 
 
1421
 
    @contextlib.contextmanager
1422
 
    def assertParseError(self):
1423
 
        with self.assertRaises(SystemExit) as e:
1424
 
            with self.temporarily_suppress_stderr():
1425
 
                yield
1426
 
        # Exit code from argparse is guaranteed to be "2".  Reference:
1427
 
        # https://docs.python.org/3/library/argparse.html#exiting-methods
1428
 
        self.assertEqual(e.exception.code, 2)
1429
 
 
1430
 
    @staticmethod
1431
 
    @contextlib.contextmanager
1432
 
    def temporarily_suppress_stderr():
1433
 
        null = os.open(os.path.devnull, os.O_RDWR)
1434
 
        stderrcopy = os.dup(sys.stderr.fileno())
1435
 
        os.dup2(null, sys.stderr.fileno())
1436
 
        os.close(null)
1437
 
        try:
1438
 
            yield
1439
 
        finally:
1440
 
            # restore stderr
1441
 
            os.dup2(stderrcopy, sys.stderr.fileno())
1442
 
            os.close(stderrcopy)
1443
 
 
1444
 
    def check_option_syntax(self, options):
1445
 
        check_option_syntax(self.parser, options)
1446
 
 
1447
 
    def test_actions_requires_client_or_all(self):
1448
 
        for action, value in self.actions.items():
1449
 
            options = self.parser.parse_args()
1450
 
            setattr(options, action, value)
1451
 
            with self.assertParseError():
1452
 
                self.check_option_syntax(options)
1453
 
 
1454
 
    def test_actions_conflicts_with_verbose(self):
1455
 
        for action, value in self.actions.items():
1456
 
            options = self.parser.parse_args()
1457
 
            setattr(options, action, value)
1458
 
            options.verbose = True
1459
 
            with self.assertParseError():
1460
 
                self.check_option_syntax(options)
1461
 
 
1462
 
    def test_dump_json_conflicts_with_verbose(self):
1463
 
        options = self.parser.parse_args()
1464
 
        options.dump_json = True
1465
 
        options.verbose = True
1466
 
        with self.assertParseError():
1467
 
            self.check_option_syntax(options)
1468
 
 
1469
 
    def test_dump_json_conflicts_with_action(self):
1470
 
        for action, value in self.actions.items():
1471
 
            options = self.parser.parse_args()
1472
 
            setattr(options, action, value)
1473
 
            options.dump_json = True
1474
 
            with self.assertParseError():
1475
 
                self.check_option_syntax(options)
1476
 
 
1477
 
    def test_all_can_not_be_alone(self):
1478
 
        options = self.parser.parse_args()
1479
 
        options.all = True
1480
 
        with self.assertParseError():
1481
 
            self.check_option_syntax(options)
1482
 
 
1483
 
    def test_all_is_ok_with_any_action(self):
1484
 
        for action, value in self.actions.items():
1485
 
            options = self.parser.parse_args()
1486
 
            setattr(options, action, value)
1487
 
            options.all = True
1488
 
            self.check_option_syntax(options)
1489
 
 
1490
 
    def test_is_enabled_fails_without_client(self):
1491
 
        options = self.parser.parse_args()
1492
 
        options.is_enabled = True
1493
 
        with self.assertParseError():
1494
 
            self.check_option_syntax(options)
1495
 
 
1496
 
    def test_is_enabled_works_with_one_client(self):
1497
 
        options = self.parser.parse_args()
1498
 
        options.is_enabled = True
1499
 
        options.client = ["foo"]
1500
 
        self.check_option_syntax(options)
1501
 
 
1502
 
    def test_is_enabled_fails_with_two_clients(self):
1503
 
        options = self.parser.parse_args()
1504
 
        options.is_enabled = True
1505
 
        options.client = ["foo", "barbar"]
1506
 
        with self.assertParseError():
1507
 
            self.check_option_syntax(options)
1508
 
 
1509
 
    def test_remove_can_only_be_combined_with_action_deny(self):
1510
 
        for action, value in self.actions.items():
1511
 
            if action in {"remove", "deny"}:
1512
 
                continue
1513
 
            options = self.parser.parse_args()
1514
 
            setattr(options, action, value)
1515
 
            options.all = True
1516
 
            options.remove = True
1517
 
            with self.assertParseError():
1518
 
                self.check_option_syntax(options)
 
1141
        self.assert_command_from_args(["--disable"], DisableCmd)
1519
1142
 
1520
1143
 
1521
1144