/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-09 18:49:03 UTC
  • Revision ID: teddy@recompile.se-20190309184903-81a0vh49w1156a2o
mandos-ctl: Refactor

* mandos-ctl: Rename "property" to "propname" to not conflict with
              "property" builtin.

Show diffs side-by-side

added added

removed removed

Lines of Context:
44
44
import logging
45
45
import io
46
46
import tempfile
 
47
import contextlib
47
48
 
48
49
import dbus
49
50
 
293
294
                    "ApprovalDuration", "Checker", "ExtendedTimeout",
294
295
                    "Expires", "LastCheckerStatus")
295
296
    def run(self, mandos, clients):
296
 
        print(self.output(clients))
 
297
        print(self.output(clients.values()))
297
298
 
298
299
class PropertyCmd(Command):
299
300
    """Abstract class for Actions for setting one client property"""
300
301
    def run_on_one_client(self, client, properties):
301
302
        """Set the Client's D-Bus property"""
302
 
        client.Set(client_interface, self.property, self.value_to_set,
 
303
        log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
 
304
                  client.__dbus_object_path__,
 
305
                  dbus.PROPERTIES_IFACE, client_interface,
 
306
                  self.propname, self.value_to_set
 
307
                  if not isinstance(self.value_to_set, dbus.Boolean)
 
308
                  else bool(self.value_to_set))
 
309
        client.Set(client_interface, self.propname, self.value_to_set,
303
310
                   dbus_interface=dbus.PROPERTIES_IFACE)
304
311
 
305
312
class ValueArgumentMixIn(object):
316
323
    @value_to_set.setter
317
324
    def value_to_set(self, value):
318
325
        """When setting, convert value to a datetime.timedelta"""
319
 
        self._vts = string_to_delta(value).total_seconds() * 1000
 
326
        self._vts = int(round(value.total_seconds() * 1000))
320
327
 
321
328
# Actual (non-abstract) command classes
322
329
 
329
336
        keywords = default_keywords
330
337
        if self.verbose:
331
338
            keywords = self.all_keywords
332
 
        return str(self.TableOfClients(clients.values(), keywords))
 
339
        return str(self.TableOfClients(clients, keywords))
333
340
 
334
341
    class TableOfClients(object):
335
342
        tableheaders = {
426
433
            sys.exit(0)
427
434
        sys.exit(1)
428
435
    def is_enabled(self, client, properties):
429
 
        return bool(properties["Enabled"])
 
436
        log.debug("D-Bus: %s:%s:%s.Get(%r, %r)", busname,
 
437
                  client.__dbus_object_path__,
 
438
                  dbus.PROPERTIES_IFACE, client_interface,
 
439
                  "Enabled")
 
440
        return bool(client.Get(client_interface, "Enabled",
 
441
                               dbus_interface=dbus.PROPERTIES_IFACE))
430
442
 
431
443
class RemoveCmd(Command):
432
444
    def run_on_one_client(self, client, properties):
 
445
        log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
 
446
                  server_path, server_interface,
 
447
                  str(client.__dbus_object_path__))
433
448
        self.mandos.RemoveClient(client.__dbus_object_path__)
434
449
 
435
450
class ApproveCmd(Command):
436
451
    def run_on_one_client(self, client, properties):
 
452
        log.debug("D-Bus: %s:%s.Approve(True)",
 
453
                  client.__dbus_object_path__, client_interface)
437
454
        client.Approve(dbus.Boolean(True),
438
455
                       dbus_interface=client_interface)
439
456
 
440
457
class DenyCmd(Command):
441
458
    def run_on_one_client(self, client, properties):
 
459
        log.debug("D-Bus: %s:%s.Approve(False)",
 
460
                  client.__dbus_object_path__, client_interface)
442
461
        client.Approve(dbus.Boolean(False),
443
462
                       dbus_interface=client_interface)
444
463
 
445
464
class EnableCmd(PropertyCmd):
446
 
    property = "Enabled"
 
465
    propname = "Enabled"
447
466
    value_to_set = dbus.Boolean(True)
448
467
 
449
468
class DisableCmd(PropertyCmd):
450
 
    property = "Enabled"
 
469
    propname = "Enabled"
451
470
    value_to_set = dbus.Boolean(False)
452
471
 
453
472
class BumpTimeoutCmd(PropertyCmd):
454
 
    property = "LastCheckedOK"
 
473
    propname = "LastCheckedOK"
455
474
    value_to_set = ""
456
475
 
457
476
class StartCheckerCmd(PropertyCmd):
458
 
    property = "CheckerRunning"
 
477
    propname = "CheckerRunning"
459
478
    value_to_set = dbus.Boolean(True)
460
479
 
461
480
class StopCheckerCmd(PropertyCmd):
462
 
    property = "CheckerRunning"
 
481
    propname = "CheckerRunning"
463
482
    value_to_set = dbus.Boolean(False)
464
483
 
465
484
class ApproveByDefaultCmd(PropertyCmd):
466
 
    property = "ApprovedByDefault"
 
485
    propname = "ApprovedByDefault"
467
486
    value_to_set = dbus.Boolean(True)
468
487
 
469
488
class DenyByDefaultCmd(PropertyCmd):
470
 
    property = "ApprovedByDefault"
 
489
    propname = "ApprovedByDefault"
471
490
    value_to_set = dbus.Boolean(False)
472
491
 
473
492
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
474
 
    property = "Checker"
 
493
    propname = "Checker"
475
494
 
476
495
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
477
 
    property = "Host"
 
496
    propname = "Host"
478
497
 
479
498
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
 
499
    propname = "Secret"
480
500
    @property
481
501
    def value_to_set(self):
482
502
        return self._vts
485
505
        """When setting, read data from supplied file object"""
486
506
        self._vts = value.read()
487
507
        value.close()
488
 
    property = "Secret"
489
508
 
490
509
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
491
 
    property = "Timeout"
 
510
    propname = "Timeout"
492
511
 
493
512
class SetExtendedTimeoutCmd(PropertyCmd,
494
513
                            MillisecondsValueArgumentMixIn):
495
 
    property = "ExtendedTimeout"
 
514
    propname = "ExtendedTimeout"
496
515
 
497
516
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
498
 
    property = "Interval"
 
517
    propname = "Interval"
499
518
 
500
519
class SetApprovalDelayCmd(PropertyCmd,
501
520
                          MillisecondsValueArgumentMixIn):
502
 
    property = "ApprovalDelay"
 
521
    propname = "ApprovalDelay"
503
522
 
504
523
class SetApprovalDurationCmd(PropertyCmd,
505
524
                             MillisecondsValueArgumentMixIn):
506
 
    property = "ApprovalDuration"
507
 
 
508
 
def has_actions(options):
509
 
    return any((options.enable,
510
 
                options.disable,
511
 
                options.bump_timeout,
512
 
                options.start_checker,
513
 
                options.stop_checker,
514
 
                options.is_enabled,
515
 
                options.remove,
516
 
                options.checker is not None,
517
 
                options.timeout is not None,
518
 
                options.extended_timeout is not None,
519
 
                options.interval is not None,
520
 
                options.approved_by_default is not None,
521
 
                options.approval_delay is not None,
522
 
                options.approval_duration is not None,
523
 
                options.host is not None,
524
 
                options.secret is not None,
525
 
                options.approve,
526
 
                options.deny))
 
525
    propname = "ApprovalDuration"
527
526
 
528
527
def add_command_line_options(parser):
529
528
    parser.add_argument("--version", action="version",
556
555
                        help="Remove client")
557
556
    parser.add_argument("-c", "--checker",
558
557
                        help="Set checker command for client")
559
 
    parser.add_argument("-t", "--timeout",
 
558
    parser.add_argument("-t", "--timeout", type=string_to_delta,
560
559
                        help="Set timeout for client")
561
 
    parser.add_argument("--extended-timeout",
 
560
    parser.add_argument("--extended-timeout", type=string_to_delta,
562
561
                        help="Set extended timeout for client")
563
 
    parser.add_argument("-i", "--interval",
 
562
    parser.add_argument("-i", "--interval", type=string_to_delta,
564
563
                        help="Set checker interval for client")
565
564
    approve_deny_default = parser.add_mutually_exclusive_group()
566
565
    approve_deny_default.add_argument(
571
570
        "--deny-by-default", action="store_false",
572
571
        dest="approved_by_default",
573
572
        help="Set client to be denied by default")
574
 
    parser.add_argument("--approval-delay",
 
573
    parser.add_argument("--approval-delay", type=string_to_delta,
575
574
                        help="Set delay before client approve/deny")
576
 
    parser.add_argument("--approval-duration",
 
575
    parser.add_argument("--approval-duration", type=string_to_delta,
577
576
                        help="Set duration of one client approval")
578
577
    parser.add_argument("-H", "--host", help="Set host for client")
579
578
    parser.add_argument("-s", "--secret",
585
584
        help="Approve any current client request")
586
585
    approve_deny.add_argument("-D", "--deny", action="store_true",
587
586
                              help="Deny any current client request")
 
587
    parser.add_argument("--debug", action="store_true",
 
588
                        help="Debug mode (show D-Bus commands)")
588
589
    parser.add_argument("--check", action="store_true",
589
590
                        help="Run self-test")
590
591
    parser.add_argument("client", nargs="*", help="Client name")
615
616
    if options.is_enabled:
616
617
        commands.append(IsEnabledCmd())
617
618
 
618
 
    if options.remove:
619
 
        commands.append(RemoveCmd())
620
 
 
621
619
    if options.checker is not None:
622
620
        commands.append(SetCheckerCmd(options.checker))
623
621
 
656
654
    if options.deny:
657
655
        commands.append(DenyCmd())
658
656
 
 
657
    if options.remove:
 
658
        commands.append(RemoveCmd())
 
659
 
659
660
    # If no command option has been given, show table of clients,
660
661
    # optionally verbosely
661
662
    if not commands:
664
665
    return commands
665
666
 
666
667
 
667
 
def main():
668
 
    parser = argparse.ArgumentParser()
669
 
 
670
 
    add_command_line_options(parser)
671
 
 
672
 
    options = parser.parse_args()
 
668
def check_option_syntax(parser, options):
 
669
    """Apply additional restrictions on options, not expressible in
 
670
argparse"""
 
671
 
 
672
    def has_actions(options):
 
673
        return any((options.enable,
 
674
                    options.disable,
 
675
                    options.bump_timeout,
 
676
                    options.start_checker,
 
677
                    options.stop_checker,
 
678
                    options.is_enabled,
 
679
                    options.remove,
 
680
                    options.checker is not None,
 
681
                    options.timeout is not None,
 
682
                    options.extended_timeout is not None,
 
683
                    options.interval is not None,
 
684
                    options.approved_by_default is not None,
 
685
                    options.approval_delay is not None,
 
686
                    options.approval_duration is not None,
 
687
                    options.host is not None,
 
688
                    options.secret is not None,
 
689
                    options.approve,
 
690
                    options.deny))
673
691
 
674
692
    if has_actions(options) and not (options.client or options.all):
675
693
        parser.error("Options require clients names or --all.")
682
700
        parser.error("--all requires an action.")
683
701
    if options.is_enabled and len(options.client) > 1:
684
702
        parser.error("--is-enabled requires exactly one client")
 
703
    if options.remove:
 
704
        options.remove = False
 
705
        if has_actions(options) and not options.deny:
 
706
            parser.error("--remove can only be combined with --deny")
 
707
        options.remove = True
 
708
 
 
709
 
 
710
def main():
 
711
    parser = argparse.ArgumentParser()
 
712
 
 
713
    add_command_line_options(parser)
 
714
 
 
715
    options = parser.parse_args()
 
716
 
 
717
    check_option_syntax(parser, options)
685
718
 
686
719
    clientnames = options.client
687
720
 
 
721
    if options.debug:
 
722
        log.setLevel(logging.DEBUG)
 
723
 
688
724
    try:
689
725
        bus = dbus.SystemBus()
 
726
        log.debug("D-Bus: Connect to: (name=%r, path=%r)", busname,
 
727
                  server_path)
690
728
        mandos_dbus_objc = bus.get_object(busname, server_path)
691
729
    except dbus.exceptions.DBusException:
692
730
        log.critical("Could not connect to Mandos server")
705
743
    dbus_filter = NullFilter()
706
744
    try:
707
745
        dbus_logger.addFilter(dbus_filter)
 
746
        log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", busname,
 
747
                  server_path, dbus.OBJECT_MANAGER_IFACE)
708
748
        mandos_clients = {path: ifs_and_props[client_interface]
709
749
                          for path, ifs_and_props in
710
750
                          mandos_serv_object_manager
722
762
    clients = {}
723
763
 
724
764
    if not clientnames:
725
 
        clients = {bus.get_object(busname, path): properties
 
765
        clients = {(log.debug("D-Bus: Connect to: (name=%r, path=%r)",
 
766
                              busname, str(path)) and False) or
 
767
                   bus.get_object(busname, path): properties
726
768
                   for path, properties in mandos_clients.items()}
727
769
    else:
728
770
        for name in clientnames:
729
771
            for path, client in mandos_clients.items():
730
772
                if client["Name"] == name:
 
773
                    log.debug("D-Bus: Connect to: (name=%r, path=%r)",
 
774
                              busname, str(path))
731
775
                    client_objc = bus.get_object(busname, path)
732
776
                    clients[client_objc] = client
733
777
                    break
797
841
                self.attributes = attributes
798
842
                self.attributes["Name"] = name
799
843
                self.calls = []
800
 
            def Set(self, interface, property, value, dbus_interface):
801
 
                testcase.assertEqual(interface, client_interface)
802
 
                testcase.assertEqual(dbus_interface,
803
 
                                     dbus.PROPERTIES_IFACE)
804
 
                self.attributes[property] = value
805
 
            def Get(self, interface, property, dbus_interface):
806
 
                testcase.assertEqual(interface, client_interface)
807
 
                testcase.assertEqual(dbus_interface,
808
 
                                     dbus.PROPERTIES_IFACE)
809
 
                return self.attributes[property]
 
844
            def Set(self, interface, propname, value, dbus_interface):
 
845
                testcase.assertEqual(interface, client_interface)
 
846
                testcase.assertEqual(dbus_interface,
 
847
                                     dbus.PROPERTIES_IFACE)
 
848
                self.attributes[propname] = value
 
849
            def Get(self, interface, propname, dbus_interface):
 
850
                testcase.assertEqual(interface, client_interface)
 
851
                testcase.assertEqual(dbus_interface,
 
852
                                     dbus.PROPERTIES_IFACE)
 
853
                return self.attributes[propname]
810
854
            def Approve(self, approve, dbus_interface):
811
855
                testcase.assertEqual(dbus_interface, client_interface)
812
856
                self.calls.append(("Approve", (approve,
868
912
 
869
913
class TestPrintTableCmd(TestCmd):
870
914
    def test_normal(self):
871
 
        output = PrintTableCmd().output(self.clients)
 
915
        output = PrintTableCmd().output(self.clients.values())
872
916
        expected_output = """
873
917
Name   Enabled Timeout  Last Successful Check
874
918
foo    Yes     00:05:00 2019-02-03T00:00:00  
876
920
"""[1:-1]
877
921
        self.assertEqual(output, expected_output)
878
922
    def test_verbose(self):
879
 
        output = PrintTableCmd(verbose=True).output(self.clients)
 
923
        output = PrintTableCmd(verbose=True).output(
 
924
            self.clients.values())
880
925
        expected_output = """
881
926
Name   Enabled Timeout  Last Successful Check Created             Interval Host            Key ID                                                           Fingerprint                              Check Is Running Last Enabled        Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker              Extended Timeout Expires             Last Checker Status
882
927
foo    Yes     00:05:00 2019-02-03T00:00:00   2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No               2019-01-03T00:00:00 No                  Yes                                       00:00:00       00:00:01          fping -q -- %(host)s 00:15:00         2019-02-04T00:00:00 0                  
884
929
"""[1:-1]
885
930
        self.assertEqual(output, expected_output)
886
931
    def test_one_client(self):
887
 
        output = PrintTableCmd().output(self.one_client)
 
932
        output = PrintTableCmd().output(self.one_client.values())
888
933
        expected_output = """
889
934
Name Enabled Timeout  Last Successful Check
890
935
foo  Yes     00:05:00 2019-02-03T00:00:00  
1034
1079
        for value_to_set, value_to_get in zip(self.values_to_set,
1035
1080
                                              values_to_get):
1036
1081
            for client in self.clients:
1037
 
                old_value = client.attributes[self.property]
 
1082
                old_value = client.attributes[self.propname]
1038
1083
                self.assertNotIsInstance(old_value, Unique)
1039
 
                client.attributes[self.property] = Unique()
 
1084
                client.attributes[self.propname] = Unique()
1040
1085
            self.run_command(value_to_set, self.clients)
1041
1086
            for client in self.clients:
1042
 
                value = client.attributes[self.property]
 
1087
                value = client.attributes[self.propname]
1043
1088
                self.assertNotIsInstance(value, Unique)
1044
1089
                self.assertEqual(value, value_to_get)
1045
1090
    def run_command(self, value, clients):
1047
1092
 
1048
1093
class TestBumpTimeoutCmd(TestPropertyCmd):
1049
1094
    command = BumpTimeoutCmd
1050
 
    property = "LastCheckedOK"
 
1095
    propname = "LastCheckedOK"
1051
1096
    values_to_set = [""]
1052
1097
 
1053
1098
class TestStartCheckerCmd(TestPropertyCmd):
1054
1099
    command = StartCheckerCmd
1055
 
    property = "CheckerRunning"
 
1100
    propname = "CheckerRunning"
1056
1101
    values_to_set = [dbus.Boolean(True)]
1057
1102
 
1058
1103
class TestStopCheckerCmd(TestPropertyCmd):
1059
1104
    command = StopCheckerCmd
1060
 
    property = "CheckerRunning"
 
1105
    propname = "CheckerRunning"
1061
1106
    values_to_set = [dbus.Boolean(False)]
1062
1107
 
1063
1108
class TestApproveByDefaultCmd(TestPropertyCmd):
1064
1109
    command = ApproveByDefaultCmd
1065
 
    property = "ApprovedByDefault"
 
1110
    propname = "ApprovedByDefault"
1066
1111
    values_to_set = [dbus.Boolean(True)]
1067
1112
 
1068
1113
class TestDenyByDefaultCmd(TestPropertyCmd):
1069
1114
    command = DenyByDefaultCmd
1070
 
    property = "ApprovedByDefault"
 
1115
    propname = "ApprovedByDefault"
1071
1116
    values_to_set = [dbus.Boolean(False)]
1072
1117
 
1073
1118
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1082
1127
 
1083
1128
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1084
1129
    command = SetCheckerCmd
1085
 
    property = "Checker"
 
1130
    propname = "Checker"
1086
1131
    values_to_set = ["", ":", "fping -q -- %s"]
1087
1132
 
1088
1133
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1089
1134
    command = SetHostCmd
1090
 
    property = "Host"
 
1135
    propname = "Host"
1091
1136
    values_to_set = ["192.0.2.3", "foo.example.org"]
1092
1137
 
1093
1138
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1094
1139
    command = SetSecretCmd
1095
 
    property = "Secret"
1096
 
    values_to_set = [open("/dev/null", "rb"),
 
1140
    propname = "Secret"
 
1141
    values_to_set = [io.BytesIO(b""),
1097
1142
                     io.BytesIO(b"secret\0xyzzy\nbar")]
1098
1143
    values_to_get = [b"", b"secret\0xyzzy\nbar"]
1099
1144
 
1100
1145
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1101
1146
    command = SetTimeoutCmd
1102
 
    property = "Timeout"
1103
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1104
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1147
    propname = "Timeout"
 
1148
    values_to_set = [datetime.timedelta(),
 
1149
                     datetime.timedelta(minutes=5),
 
1150
                     datetime.timedelta(seconds=1),
 
1151
                     datetime.timedelta(weeks=1),
 
1152
                     datetime.timedelta(weeks=52)]
 
1153
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1105
1154
 
1106
1155
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1107
1156
    command = SetExtendedTimeoutCmd
1108
 
    property = "ExtendedTimeout"
1109
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1110
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1157
    propname = "ExtendedTimeout"
 
1158
    values_to_set = [datetime.timedelta(),
 
1159
                     datetime.timedelta(minutes=5),
 
1160
                     datetime.timedelta(seconds=1),
 
1161
                     datetime.timedelta(weeks=1),
 
1162
                     datetime.timedelta(weeks=52)]
 
1163
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1111
1164
 
1112
1165
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1113
1166
    command = SetIntervalCmd
1114
 
    property = "Interval"
1115
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1116
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1167
    propname = "Interval"
 
1168
    values_to_set = [datetime.timedelta(),
 
1169
                     datetime.timedelta(minutes=5),
 
1170
                     datetime.timedelta(seconds=1),
 
1171
                     datetime.timedelta(weeks=1),
 
1172
                     datetime.timedelta(weeks=52)]
 
1173
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1117
1174
 
1118
1175
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1119
1176
    command = SetApprovalDelayCmd
1120
 
    property = "ApprovalDelay"
1121
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1122
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1177
    propname = "ApprovalDelay"
 
1178
    values_to_set = [datetime.timedelta(),
 
1179
                     datetime.timedelta(minutes=5),
 
1180
                     datetime.timedelta(seconds=1),
 
1181
                     datetime.timedelta(weeks=1),
 
1182
                     datetime.timedelta(weeks=52)]
 
1183
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1123
1184
 
1124
1185
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1125
1186
    command = SetApprovalDurationCmd
1126
 
    property = "ApprovalDuration"
1127
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1128
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1187
    propname = "ApprovalDuration"
 
1188
    values_to_set = [datetime.timedelta(),
 
1189
                     datetime.timedelta(minutes=5),
 
1190
                     datetime.timedelta(seconds=1),
 
1191
                     datetime.timedelta(weeks=1),
 
1192
                     datetime.timedelta(weeks=52)]
 
1193
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1129
1194
 
1130
1195
class Test_command_from_options(unittest.TestCase):
1131
1196
    def setUp(self):
1135
1200
        """Assert that parsing ARGS should result in an instance of
1136
1201
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1137
1202
        options = self.parser.parse_args(args)
 
1203
        check_option_syntax(self.parser, options)
1138
1204
        commands = commands_from_options(options)
1139
1205
        self.assertEqual(len(commands), 1)
1140
1206
        command = commands[0]
1149
1215
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
1150
1216
                                      verbose=True)
1151
1217
 
 
1218
    def test_print_table_verbose_short(self):
 
1219
        self.assert_command_from_args(["-v"], PrintTableCmd,
 
1220
                                      verbose=True)
 
1221
 
1152
1222
    def test_enable(self):
1153
1223
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1154
1224
 
 
1225
    def test_enable_short(self):
 
1226
        self.assert_command_from_args(["-e", "foo"], EnableCmd)
 
1227
 
1155
1228
    def test_disable(self):
1156
1229
        self.assert_command_from_args(["--disable", "foo"],
1157
1230
                                      DisableCmd)
1158
1231
 
 
1232
    def test_disable_short(self):
 
1233
        self.assert_command_from_args(["-d", "foo"], DisableCmd)
 
1234
 
1159
1235
    def test_bump_timeout(self):
1160
1236
        self.assert_command_from_args(["--bump-timeout", "foo"],
1161
1237
                                      BumpTimeoutCmd)
1162
1238
 
 
1239
    def test_bump_timeout_short(self):
 
1240
        self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
 
1241
 
1163
1242
    def test_start_checker(self):
1164
1243
        self.assert_command_from_args(["--start-checker", "foo"],
1165
1244
                                      StartCheckerCmd)
1172
1251
        self.assert_command_from_args(["--remove", "foo"],
1173
1252
                                      RemoveCmd)
1174
1253
 
 
1254
    def test_remove_short(self):
 
1255
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
 
1256
 
1175
1257
    def test_checker(self):
1176
1258
        self.assert_command_from_args(["--checker", ":", "foo"],
1177
1259
                                      SetCheckerCmd, value_to_set=":")
1178
1260
 
 
1261
    def test_checker_empty(self):
 
1262
        self.assert_command_from_args(["--checker", "", "foo"],
 
1263
                                      SetCheckerCmd, value_to_set="")
 
1264
 
 
1265
    def test_checker_short(self):
 
1266
        self.assert_command_from_args(["-c", ":", "foo"],
 
1267
                                      SetCheckerCmd, value_to_set=":")
 
1268
 
1179
1269
    def test_timeout(self):
1180
1270
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1181
1271
                                      SetTimeoutCmd,
1182
1272
                                      value_to_set=300000)
1183
1273
 
 
1274
    def test_timeout_short(self):
 
1275
        self.assert_command_from_args(["-t", "PT5M", "foo"],
 
1276
                                      SetTimeoutCmd,
 
1277
                                      value_to_set=300000)
 
1278
 
1184
1279
    def test_extended_timeout(self):
1185
1280
        self.assert_command_from_args(["--extended-timeout", "PT15M",
1186
1281
                                       "foo"],
1192
1287
                                      SetIntervalCmd,
1193
1288
                                      value_to_set=120000)
1194
1289
 
 
1290
    def test_interval_short(self):
 
1291
        self.assert_command_from_args(["-i", "PT2M", "foo"],
 
1292
                                      SetIntervalCmd,
 
1293
                                      value_to_set=120000)
 
1294
 
1195
1295
    def test_approve_by_default(self):
1196
1296
        self.assert_command_from_args(["--approve-by-default", "foo"],
1197
1297
                                      ApproveByDefaultCmd)
1215
1315
                                       "foo"], SetHostCmd,
1216
1316
                                      value_to_set="foo.example.org")
1217
1317
 
 
1318
    def test_host_short(self):
 
1319
        self.assert_command_from_args(["-H", "foo.example.org",
 
1320
                                       "foo"], SetHostCmd,
 
1321
                                      value_to_set="foo.example.org")
 
1322
 
1218
1323
    def test_secret_devnull(self):
1219
1324
        self.assert_command_from_args(["--secret", os.path.devnull,
1220
1325
                                       "foo"], SetSecretCmd,
1229
1334
                                           "foo"], SetSecretCmd,
1230
1335
                                          value_to_set=value)
1231
1336
 
 
1337
    def test_secret_devnull_short(self):
 
1338
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
 
1339
                                      SetSecretCmd, value_to_set=b"")
 
1340
 
 
1341
    def test_secret_tempfile_short(self):
 
1342
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1343
            value = b"secret\0xyzzy\nbar"
 
1344
            f.write(value)
 
1345
            f.seek(0)
 
1346
            self.assert_command_from_args(["-s", f.name, "foo"],
 
1347
                                          SetSecretCmd,
 
1348
                                          value_to_set=value)
 
1349
 
1232
1350
    def test_approve(self):
1233
1351
        self.assert_command_from_args(["--approve", "foo"],
1234
1352
                                      ApproveCmd)
1235
1353
 
 
1354
    def test_approve_short(self):
 
1355
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
 
1356
 
1236
1357
    def test_deny(self):
1237
1358
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1238
1359
 
 
1360
    def test_deny_short(self):
 
1361
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
 
1362
 
1239
1363
    def test_dump_json(self):
1240
1364
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1241
1365
 
1243
1367
        self.assert_command_from_args(["--is-enabled", "foo"],
1244
1368
                                      IsEnabledCmd)
1245
1369
 
 
1370
    def test_is_enabled_short(self):
 
1371
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
 
1372
 
 
1373
    def test_deny_before_remove(self):
 
1374
        options = self.parser.parse_args(["--deny", "--remove", "foo"])
 
1375
        check_option_syntax(self.parser, options)
 
1376
        commands = commands_from_options(options)
 
1377
        self.assertEqual(len(commands), 2)
 
1378
        self.assertIsInstance(commands[0], DenyCmd)
 
1379
        self.assertIsInstance(commands[1], RemoveCmd)
 
1380
 
 
1381
    def test_deny_before_remove_reversed(self):
 
1382
        options = self.parser.parse_args(["--remove", "--deny", "--all"])
 
1383
        check_option_syntax(self.parser, options)
 
1384
        commands = commands_from_options(options)
 
1385
        self.assertEqual(len(commands), 2)
 
1386
        self.assertIsInstance(commands[0], DenyCmd)
 
1387
        self.assertIsInstance(commands[1], RemoveCmd)
 
1388
 
 
1389
 
 
1390
class Test_check_option_syntax(unittest.TestCase):
 
1391
    # This mostly corresponds to the definition from has_actions() in
 
1392
    # check_option_syntax()
 
1393
    actions = {
 
1394
        # The actual values set here are not that important, but we do
 
1395
        # at least stick to the correct types, even though they are
 
1396
        # never used
 
1397
        "enable": True,
 
1398
        "disable": True,
 
1399
        "bump_timeout": True,
 
1400
        "start_checker": True,
 
1401
        "stop_checker": True,
 
1402
        "is_enabled": True,
 
1403
        "remove": True,
 
1404
        "checker": "x",
 
1405
        "timeout": datetime.timedelta(),
 
1406
        "extended_timeout": datetime.timedelta(),
 
1407
        "interval": datetime.timedelta(),
 
1408
        "approved_by_default": True,
 
1409
        "approval_delay": datetime.timedelta(),
 
1410
        "approval_duration": datetime.timedelta(),
 
1411
        "host": "x",
 
1412
        "secret": io.BytesIO(b"x"),
 
1413
        "approve": True,
 
1414
        "deny": True,
 
1415
    }
 
1416
 
 
1417
    def setUp(self):
 
1418
        self.parser = argparse.ArgumentParser()
 
1419
        add_command_line_options(self.parser)
 
1420
 
 
1421
    @contextlib.contextmanager
 
1422
    def assertParseError(self):
 
1423
        with self.assertRaises(SystemExit) as e:
 
1424
            with self.temporarily_suppress_stderr():
 
1425
                yield
 
1426
        # Exit code from argparse is guaranteed to be "2".  Reference:
 
1427
        # https://docs.python.org/3/library/argparse.html#exiting-methods
 
1428
        self.assertEqual(e.exception.code, 2)
 
1429
 
 
1430
    @staticmethod
 
1431
    @contextlib.contextmanager
 
1432
    def temporarily_suppress_stderr():
 
1433
        null = os.open(os.path.devnull, os.O_RDWR)
 
1434
        stderrcopy = os.dup(sys.stderr.fileno())
 
1435
        os.dup2(null, sys.stderr.fileno())
 
1436
        os.close(null)
 
1437
        try:
 
1438
            yield
 
1439
        finally:
 
1440
            # restore stderr
 
1441
            os.dup2(stderrcopy, sys.stderr.fileno())
 
1442
            os.close(stderrcopy)
 
1443
 
 
1444
    def check_option_syntax(self, options):
 
1445
        check_option_syntax(self.parser, options)
 
1446
 
 
1447
    def test_actions_requires_client_or_all(self):
 
1448
        for action, value in self.actions.items():
 
1449
            options = self.parser.parse_args()
 
1450
            setattr(options, action, value)
 
1451
            with self.assertParseError():
 
1452
                self.check_option_syntax(options)
 
1453
 
 
1454
    def test_actions_conflicts_with_verbose(self):
 
1455
        for action, value in self.actions.items():
 
1456
            options = self.parser.parse_args()
 
1457
            setattr(options, action, value)
 
1458
            options.verbose = True
 
1459
            with self.assertParseError():
 
1460
                self.check_option_syntax(options)
 
1461
 
 
1462
    def test_dump_json_conflicts_with_verbose(self):
 
1463
        options = self.parser.parse_args()
 
1464
        options.dump_json = True
 
1465
        options.verbose = True
 
1466
        with self.assertParseError():
 
1467
            self.check_option_syntax(options)
 
1468
 
 
1469
    def test_dump_json_conflicts_with_action(self):
 
1470
        for action, value in self.actions.items():
 
1471
            options = self.parser.parse_args()
 
1472
            setattr(options, action, value)
 
1473
            options.dump_json = True
 
1474
            with self.assertParseError():
 
1475
                self.check_option_syntax(options)
 
1476
 
 
1477
    def test_all_can_not_be_alone(self):
 
1478
        options = self.parser.parse_args()
 
1479
        options.all = True
 
1480
        with self.assertParseError():
 
1481
            self.check_option_syntax(options)
 
1482
 
 
1483
    def test_all_is_ok_with_any_action(self):
 
1484
        for action, value in self.actions.items():
 
1485
            options = self.parser.parse_args()
 
1486
            setattr(options, action, value)
 
1487
            options.all = True
 
1488
            self.check_option_syntax(options)
 
1489
 
 
1490
    def test_is_enabled_fails_without_client(self):
 
1491
        options = self.parser.parse_args()
 
1492
        options.is_enabled = True
 
1493
        with self.assertParseError():
 
1494
            self.check_option_syntax(options)
 
1495
 
 
1496
    def test_is_enabled_works_with_one_client(self):
 
1497
        options = self.parser.parse_args()
 
1498
        options.is_enabled = True
 
1499
        options.client = ["foo"]
 
1500
        self.check_option_syntax(options)
 
1501
 
 
1502
    def test_is_enabled_fails_with_two_clients(self):
 
1503
        options = self.parser.parse_args()
 
1504
        options.is_enabled = True
 
1505
        options.client = ["foo", "barbar"]
 
1506
        with self.assertParseError():
 
1507
            self.check_option_syntax(options)
 
1508
 
 
1509
    def test_remove_can_only_be_combined_with_action_deny(self):
 
1510
        for action, value in self.actions.items():
 
1511
            if action in {"remove", "deny"}:
 
1512
                continue
 
1513
            options = self.parser.parse_args()
 
1514
            setattr(options, action, value)
 
1515
            options.all = True
 
1516
            options.remove = True
 
1517
            with self.assertParseError():
 
1518
                self.check_option_syntax(options)
 
1519
 
1246
1520
 
1247
1521
 
1248
1522
def should_only_run_tests():