/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-09 23:55:43 UTC
  • Revision ID: teddy@recompile.se-20190309235543-g36rsz4ephch356t
mandos-ctl: Don't call D-Bus to get property for --is-enabled

* mandos-ctl (IsEnabledCmd.is_enabled): Check properties dict, don't
                                        use D-Bus.

Show diffs side-by-side

added added

removed removed

Lines of Context:
44
44
import logging
45
45
import io
46
46
import tempfile
 
47
import contextlib
47
48
 
48
49
import dbus
49
50
 
293
294
                    "ApprovalDuration", "Checker", "ExtendedTimeout",
294
295
                    "Expires", "LastCheckerStatus")
295
296
    def run(self, mandos, clients):
296
 
        print(self.output(clients))
 
297
        print(self.output(clients.values()))
 
298
    def output(self, clients):
 
299
        raise NotImplementedError()
297
300
 
298
301
class PropertyCmd(Command):
299
302
    """Abstract class for Actions for setting one client property"""
300
303
    def run_on_one_client(self, client, properties):
301
304
        """Set the Client's D-Bus property"""
302
 
        client.Set(client_interface, self.property, self.value_to_set,
 
305
        log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
 
306
                  client.__dbus_object_path__,
 
307
                  dbus.PROPERTIES_IFACE, client_interface,
 
308
                  self.propname, self.value_to_set
 
309
                  if not isinstance(self.value_to_set, dbus.Boolean)
 
310
                  else bool(self.value_to_set))
 
311
        client.Set(client_interface, self.propname, self.value_to_set,
303
312
                   dbus_interface=dbus.PROPERTIES_IFACE)
 
313
    @property
 
314
    def propname(self):
 
315
        raise NotImplementedError()
304
316
 
305
317
class ValueArgumentMixIn(object):
306
318
    """Mixin class for commands taking a value as argument"""
329
341
        keywords = default_keywords
330
342
        if self.verbose:
331
343
            keywords = self.all_keywords
332
 
        return str(self.TableOfClients(clients.values(), keywords))
 
344
        return str(self.TableOfClients(clients, keywords))
333
345
 
334
346
    class TableOfClients(object):
335
347
        tableheaders = {
426
438
            sys.exit(0)
427
439
        sys.exit(1)
428
440
    def is_enabled(self, client, properties):
429
 
        return bool(properties["Enabled"])
 
441
        return properties["Enabled"]
430
442
 
431
443
class RemoveCmd(Command):
432
444
    def run_on_one_client(self, client, properties):
 
445
        log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
 
446
                  server_path, server_interface,
 
447
                  str(client.__dbus_object_path__))
433
448
        self.mandos.RemoveClient(client.__dbus_object_path__)
434
449
 
435
450
class ApproveCmd(Command):
436
451
    def run_on_one_client(self, client, properties):
 
452
        log.debug("D-Bus: %s:%s.Approve(True)",
 
453
                  client.__dbus_object_path__, client_interface)
437
454
        client.Approve(dbus.Boolean(True),
438
455
                       dbus_interface=client_interface)
439
456
 
440
457
class DenyCmd(Command):
441
458
    def run_on_one_client(self, client, properties):
 
459
        log.debug("D-Bus: %s:%s.Approve(False)",
 
460
                  client.__dbus_object_path__, client_interface)
442
461
        client.Approve(dbus.Boolean(False),
443
462
                       dbus_interface=client_interface)
444
463
 
445
464
class EnableCmd(PropertyCmd):
446
 
    property = "Enabled"
 
465
    propname = "Enabled"
447
466
    value_to_set = dbus.Boolean(True)
448
467
 
449
468
class DisableCmd(PropertyCmd):
450
 
    property = "Enabled"
 
469
    propname = "Enabled"
451
470
    value_to_set = dbus.Boolean(False)
452
471
 
453
472
class BumpTimeoutCmd(PropertyCmd):
454
 
    property = "LastCheckedOK"
 
473
    propname = "LastCheckedOK"
455
474
    value_to_set = ""
456
475
 
457
476
class StartCheckerCmd(PropertyCmd):
458
 
    property = "CheckerRunning"
 
477
    propname = "CheckerRunning"
459
478
    value_to_set = dbus.Boolean(True)
460
479
 
461
480
class StopCheckerCmd(PropertyCmd):
462
 
    property = "CheckerRunning"
 
481
    propname = "CheckerRunning"
463
482
    value_to_set = dbus.Boolean(False)
464
483
 
465
484
class ApproveByDefaultCmd(PropertyCmd):
466
 
    property = "ApprovedByDefault"
 
485
    propname = "ApprovedByDefault"
467
486
    value_to_set = dbus.Boolean(True)
468
487
 
469
488
class DenyByDefaultCmd(PropertyCmd):
470
 
    property = "ApprovedByDefault"
 
489
    propname = "ApprovedByDefault"
471
490
    value_to_set = dbus.Boolean(False)
472
491
 
473
492
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
474
 
    property = "Checker"
 
493
    propname = "Checker"
475
494
 
476
495
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
477
 
    property = "Host"
 
496
    propname = "Host"
478
497
 
479
498
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
 
499
    propname = "Secret"
480
500
    @property
481
501
    def value_to_set(self):
482
502
        return self._vts
485
505
        """When setting, read data from supplied file object"""
486
506
        self._vts = value.read()
487
507
        value.close()
488
 
    property = "Secret"
489
508
 
490
509
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
491
 
    property = "Timeout"
 
510
    propname = "Timeout"
492
511
 
493
512
class SetExtendedTimeoutCmd(PropertyCmd,
494
513
                            MillisecondsValueArgumentMixIn):
495
 
    property = "ExtendedTimeout"
 
514
    propname = "ExtendedTimeout"
496
515
 
497
516
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
498
 
    property = "Interval"
 
517
    propname = "Interval"
499
518
 
500
519
class SetApprovalDelayCmd(PropertyCmd,
501
520
                          MillisecondsValueArgumentMixIn):
502
 
    property = "ApprovalDelay"
 
521
    propname = "ApprovalDelay"
503
522
 
504
523
class SetApprovalDurationCmd(PropertyCmd,
505
524
                             MillisecondsValueArgumentMixIn):
506
 
    property = "ApprovalDuration"
 
525
    propname = "ApprovalDuration"
507
526
 
508
527
def add_command_line_options(parser):
509
528
    parser.add_argument("--version", action="version",
565
584
        help="Approve any current client request")
566
585
    approve_deny.add_argument("-D", "--deny", action="store_true",
567
586
                              help="Deny any current client request")
 
587
    parser.add_argument("--debug", action="store_true",
 
588
                        help="Debug mode (show D-Bus commands)")
568
589
    parser.add_argument("--check", action="store_true",
569
590
                        help="Run self-test")
570
591
    parser.add_argument("client", nargs="*", help="Client name")
595
616
    if options.is_enabled:
596
617
        commands.append(IsEnabledCmd())
597
618
 
598
 
    if options.remove:
599
 
        commands.append(RemoveCmd())
600
 
 
601
619
    if options.checker is not None:
602
620
        commands.append(SetCheckerCmd(options.checker))
603
621
 
636
654
    if options.deny:
637
655
        commands.append(DenyCmd())
638
656
 
 
657
    if options.remove:
 
658
        commands.append(RemoveCmd())
 
659
 
639
660
    # If no command option has been given, show table of clients,
640
661
    # optionally verbosely
641
662
    if not commands:
644
665
    return commands
645
666
 
646
667
 
647
 
def main():
648
 
    parser = argparse.ArgumentParser()
649
 
 
650
 
    add_command_line_options(parser)
651
 
 
652
 
    options = parser.parse_args()
 
668
def check_option_syntax(parser, options):
 
669
    """Apply additional restrictions on options, not expressible in
 
670
argparse"""
653
671
 
654
672
    def has_actions(options):
655
673
        return any((options.enable,
682
700
        parser.error("--all requires an action.")
683
701
    if options.is_enabled and len(options.client) > 1:
684
702
        parser.error("--is-enabled requires exactly one client")
 
703
    if options.remove:
 
704
        options.remove = False
 
705
        if has_actions(options) and not options.deny:
 
706
            parser.error("--remove can only be combined with --deny")
 
707
        options.remove = True
 
708
 
 
709
 
 
710
def main():
 
711
    parser = argparse.ArgumentParser()
 
712
 
 
713
    add_command_line_options(parser)
 
714
 
 
715
    options = parser.parse_args()
 
716
 
 
717
    check_option_syntax(parser, options)
685
718
 
686
719
    clientnames = options.client
687
720
 
 
721
    if options.debug:
 
722
        log.setLevel(logging.DEBUG)
 
723
 
688
724
    try:
689
725
        bus = dbus.SystemBus()
 
726
        log.debug("D-Bus: Connect to: (name=%r, path=%r)", busname,
 
727
                  server_path)
690
728
        mandos_dbus_objc = bus.get_object(busname, server_path)
691
729
    except dbus.exceptions.DBusException:
692
730
        log.critical("Could not connect to Mandos server")
705
743
    dbus_filter = NullFilter()
706
744
    try:
707
745
        dbus_logger.addFilter(dbus_filter)
 
746
        log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", busname,
 
747
                  server_path, dbus.OBJECT_MANAGER_IFACE)
708
748
        mandos_clients = {path: ifs_and_props[client_interface]
709
749
                          for path, ifs_and_props in
710
750
                          mandos_serv_object_manager
722
762
    clients = {}
723
763
 
724
764
    if not clientnames:
725
 
        clients = {bus.get_object(busname, path): properties
 
765
        clients = {(log.debug("D-Bus: Connect to: (name=%r, path=%r)",
 
766
                              busname, str(path)) and False) or
 
767
                   bus.get_object(busname, path): properties
726
768
                   for path, properties in mandos_clients.items()}
727
769
    else:
728
770
        for name in clientnames:
729
771
            for path, client in mandos_clients.items():
730
772
                if client["Name"] == name:
 
773
                    log.debug("D-Bus: Connect to: (name=%r, path=%r)",
 
774
                              busname, str(path))
731
775
                    client_objc = bus.get_object(busname, path)
732
776
                    clients[client_objc] = client
733
777
                    break
797
841
                self.attributes = attributes
798
842
                self.attributes["Name"] = name
799
843
                self.calls = []
800
 
            def Set(self, interface, property, value, dbus_interface):
801
 
                testcase.assertEqual(interface, client_interface)
802
 
                testcase.assertEqual(dbus_interface,
803
 
                                     dbus.PROPERTIES_IFACE)
804
 
                self.attributes[property] = value
805
 
            def Get(self, interface, property, dbus_interface):
806
 
                testcase.assertEqual(interface, client_interface)
807
 
                testcase.assertEqual(dbus_interface,
808
 
                                     dbus.PROPERTIES_IFACE)
809
 
                return self.attributes[property]
 
844
            def Set(self, interface, propname, value, dbus_interface):
 
845
                testcase.assertEqual(interface, client_interface)
 
846
                testcase.assertEqual(dbus_interface,
 
847
                                     dbus.PROPERTIES_IFACE)
 
848
                self.attributes[propname] = value
 
849
            def Get(self, interface, propname, dbus_interface):
 
850
                testcase.assertEqual(interface, client_interface)
 
851
                testcase.assertEqual(dbus_interface,
 
852
                                     dbus.PROPERTIES_IFACE)
 
853
                return self.attributes[propname]
810
854
            def Approve(self, approve, dbus_interface):
811
855
                testcase.assertEqual(dbus_interface, client_interface)
812
856
                self.calls.append(("Approve", (approve,
868
912
 
869
913
class TestPrintTableCmd(TestCmd):
870
914
    def test_normal(self):
871
 
        output = PrintTableCmd().output(self.clients)
 
915
        output = PrintTableCmd().output(self.clients.values())
872
916
        expected_output = """
873
917
Name   Enabled Timeout  Last Successful Check
874
918
foo    Yes     00:05:00 2019-02-03T00:00:00  
876
920
"""[1:-1]
877
921
        self.assertEqual(output, expected_output)
878
922
    def test_verbose(self):
879
 
        output = PrintTableCmd(verbose=True).output(self.clients)
 
923
        output = PrintTableCmd(verbose=True).output(
 
924
            self.clients.values())
880
925
        expected_output = """
881
926
Name   Enabled Timeout  Last Successful Check Created             Interval Host            Key ID                                                           Fingerprint                              Check Is Running Last Enabled        Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker              Extended Timeout Expires             Last Checker Status
882
927
foo    Yes     00:05:00 2019-02-03T00:00:00   2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No               2019-01-03T00:00:00 No                  Yes                                       00:00:00       00:00:01          fping -q -- %(host)s 00:15:00         2019-02-04T00:00:00 0                  
884
929
"""[1:-1]
885
930
        self.assertEqual(output, expected_output)
886
931
    def test_one_client(self):
887
 
        output = PrintTableCmd().output(self.one_client)
 
932
        output = PrintTableCmd().output(self.one_client.values())
888
933
        expected_output = """
889
934
Name Enabled Timeout  Last Successful Check
890
935
foo  Yes     00:05:00 2019-02-03T00:00:00  
1034
1079
        for value_to_set, value_to_get in zip(self.values_to_set,
1035
1080
                                              values_to_get):
1036
1081
            for client in self.clients:
1037
 
                old_value = client.attributes[self.property]
 
1082
                old_value = client.attributes[self.propname]
1038
1083
                self.assertNotIsInstance(old_value, Unique)
1039
 
                client.attributes[self.property] = Unique()
 
1084
                client.attributes[self.propname] = Unique()
1040
1085
            self.run_command(value_to_set, self.clients)
1041
1086
            for client in self.clients:
1042
 
                value = client.attributes[self.property]
 
1087
                value = client.attributes[self.propname]
1043
1088
                self.assertNotIsInstance(value, Unique)
1044
1089
                self.assertEqual(value, value_to_get)
1045
1090
    def run_command(self, value, clients):
1047
1092
 
1048
1093
class TestBumpTimeoutCmd(TestPropertyCmd):
1049
1094
    command = BumpTimeoutCmd
1050
 
    property = "LastCheckedOK"
 
1095
    propname = "LastCheckedOK"
1051
1096
    values_to_set = [""]
1052
1097
 
1053
1098
class TestStartCheckerCmd(TestPropertyCmd):
1054
1099
    command = StartCheckerCmd
1055
 
    property = "CheckerRunning"
 
1100
    propname = "CheckerRunning"
1056
1101
    values_to_set = [dbus.Boolean(True)]
1057
1102
 
1058
1103
class TestStopCheckerCmd(TestPropertyCmd):
1059
1104
    command = StopCheckerCmd
1060
 
    property = "CheckerRunning"
 
1105
    propname = "CheckerRunning"
1061
1106
    values_to_set = [dbus.Boolean(False)]
1062
1107
 
1063
1108
class TestApproveByDefaultCmd(TestPropertyCmd):
1064
1109
    command = ApproveByDefaultCmd
1065
 
    property = "ApprovedByDefault"
 
1110
    propname = "ApprovedByDefault"
1066
1111
    values_to_set = [dbus.Boolean(True)]
1067
1112
 
1068
1113
class TestDenyByDefaultCmd(TestPropertyCmd):
1069
1114
    command = DenyByDefaultCmd
1070
 
    property = "ApprovedByDefault"
 
1115
    propname = "ApprovedByDefault"
1071
1116
    values_to_set = [dbus.Boolean(False)]
1072
1117
 
1073
1118
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1082
1127
 
1083
1128
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1084
1129
    command = SetCheckerCmd
1085
 
    property = "Checker"
 
1130
    propname = "Checker"
1086
1131
    values_to_set = ["", ":", "fping -q -- %s"]
1087
1132
 
1088
1133
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1089
1134
    command = SetHostCmd
1090
 
    property = "Host"
 
1135
    propname = "Host"
1091
1136
    values_to_set = ["192.0.2.3", "foo.example.org"]
1092
1137
 
1093
1138
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1094
1139
    command = SetSecretCmd
1095
 
    property = "Secret"
1096
 
    values_to_set = [open("/dev/null", "rb"),
 
1140
    propname = "Secret"
 
1141
    values_to_set = [io.BytesIO(b""),
1097
1142
                     io.BytesIO(b"secret\0xyzzy\nbar")]
1098
1143
    values_to_get = [b"", b"secret\0xyzzy\nbar"]
1099
1144
 
1100
1145
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1101
1146
    command = SetTimeoutCmd
1102
 
    property = "Timeout"
 
1147
    propname = "Timeout"
1103
1148
    values_to_set = [datetime.timedelta(),
1104
1149
                     datetime.timedelta(minutes=5),
1105
1150
                     datetime.timedelta(seconds=1),
1109
1154
 
1110
1155
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1111
1156
    command = SetExtendedTimeoutCmd
1112
 
    property = "ExtendedTimeout"
 
1157
    propname = "ExtendedTimeout"
1113
1158
    values_to_set = [datetime.timedelta(),
1114
1159
                     datetime.timedelta(minutes=5),
1115
1160
                     datetime.timedelta(seconds=1),
1119
1164
 
1120
1165
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1121
1166
    command = SetIntervalCmd
1122
 
    property = "Interval"
 
1167
    propname = "Interval"
1123
1168
    values_to_set = [datetime.timedelta(),
1124
1169
                     datetime.timedelta(minutes=5),
1125
1170
                     datetime.timedelta(seconds=1),
1129
1174
 
1130
1175
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1131
1176
    command = SetApprovalDelayCmd
1132
 
    property = "ApprovalDelay"
 
1177
    propname = "ApprovalDelay"
1133
1178
    values_to_set = [datetime.timedelta(),
1134
1179
                     datetime.timedelta(minutes=5),
1135
1180
                     datetime.timedelta(seconds=1),
1139
1184
 
1140
1185
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1141
1186
    command = SetApprovalDurationCmd
1142
 
    property = "ApprovalDuration"
 
1187
    propname = "ApprovalDuration"
1143
1188
    values_to_set = [datetime.timedelta(),
1144
1189
                     datetime.timedelta(minutes=5),
1145
1190
                     datetime.timedelta(seconds=1),
1155
1200
        """Assert that parsing ARGS should result in an instance of
1156
1201
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1157
1202
        options = self.parser.parse_args(args)
 
1203
        check_option_syntax(self.parser, options)
1158
1204
        commands = commands_from_options(options)
1159
1205
        self.assertEqual(len(commands), 1)
1160
1206
        command = commands[0]
1169
1215
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
1170
1216
                                      verbose=True)
1171
1217
 
 
1218
    def test_print_table_verbose_short(self):
 
1219
        self.assert_command_from_args(["-v"], PrintTableCmd,
 
1220
                                      verbose=True)
 
1221
 
1172
1222
    def test_enable(self):
1173
1223
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1174
1224
 
 
1225
    def test_enable_short(self):
 
1226
        self.assert_command_from_args(["-e", "foo"], EnableCmd)
 
1227
 
1175
1228
    def test_disable(self):
1176
1229
        self.assert_command_from_args(["--disable", "foo"],
1177
1230
                                      DisableCmd)
1178
1231
 
 
1232
    def test_disable_short(self):
 
1233
        self.assert_command_from_args(["-d", "foo"], DisableCmd)
 
1234
 
1179
1235
    def test_bump_timeout(self):
1180
1236
        self.assert_command_from_args(["--bump-timeout", "foo"],
1181
1237
                                      BumpTimeoutCmd)
1182
1238
 
 
1239
    def test_bump_timeout_short(self):
 
1240
        self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
 
1241
 
1183
1242
    def test_start_checker(self):
1184
1243
        self.assert_command_from_args(["--start-checker", "foo"],
1185
1244
                                      StartCheckerCmd)
1192
1251
        self.assert_command_from_args(["--remove", "foo"],
1193
1252
                                      RemoveCmd)
1194
1253
 
 
1254
    def test_remove_short(self):
 
1255
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
 
1256
 
1195
1257
    def test_checker(self):
1196
1258
        self.assert_command_from_args(["--checker", ":", "foo"],
1197
1259
                                      SetCheckerCmd, value_to_set=":")
1200
1262
        self.assert_command_from_args(["--checker", "", "foo"],
1201
1263
                                      SetCheckerCmd, value_to_set="")
1202
1264
 
 
1265
    def test_checker_short(self):
 
1266
        self.assert_command_from_args(["-c", ":", "foo"],
 
1267
                                      SetCheckerCmd, value_to_set=":")
 
1268
 
1203
1269
    def test_timeout(self):
1204
1270
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1205
1271
                                      SetTimeoutCmd,
1206
1272
                                      value_to_set=300000)
1207
1273
 
 
1274
    def test_timeout_short(self):
 
1275
        self.assert_command_from_args(["-t", "PT5M", "foo"],
 
1276
                                      SetTimeoutCmd,
 
1277
                                      value_to_set=300000)
 
1278
 
1208
1279
    def test_extended_timeout(self):
1209
1280
        self.assert_command_from_args(["--extended-timeout", "PT15M",
1210
1281
                                       "foo"],
1216
1287
                                      SetIntervalCmd,
1217
1288
                                      value_to_set=120000)
1218
1289
 
 
1290
    def test_interval_short(self):
 
1291
        self.assert_command_from_args(["-i", "PT2M", "foo"],
 
1292
                                      SetIntervalCmd,
 
1293
                                      value_to_set=120000)
 
1294
 
1219
1295
    def test_approve_by_default(self):
1220
1296
        self.assert_command_from_args(["--approve-by-default", "foo"],
1221
1297
                                      ApproveByDefaultCmd)
1239
1315
                                       "foo"], SetHostCmd,
1240
1316
                                      value_to_set="foo.example.org")
1241
1317
 
 
1318
    def test_host_short(self):
 
1319
        self.assert_command_from_args(["-H", "foo.example.org",
 
1320
                                       "foo"], SetHostCmd,
 
1321
                                      value_to_set="foo.example.org")
 
1322
 
1242
1323
    def test_secret_devnull(self):
1243
1324
        self.assert_command_from_args(["--secret", os.path.devnull,
1244
1325
                                       "foo"], SetSecretCmd,
1253
1334
                                           "foo"], SetSecretCmd,
1254
1335
                                          value_to_set=value)
1255
1336
 
 
1337
    def test_secret_devnull_short(self):
 
1338
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
 
1339
                                      SetSecretCmd, value_to_set=b"")
 
1340
 
 
1341
    def test_secret_tempfile_short(self):
 
1342
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1343
            value = b"secret\0xyzzy\nbar"
 
1344
            f.write(value)
 
1345
            f.seek(0)
 
1346
            self.assert_command_from_args(["-s", f.name, "foo"],
 
1347
                                          SetSecretCmd,
 
1348
                                          value_to_set=value)
 
1349
 
1256
1350
    def test_approve(self):
1257
1351
        self.assert_command_from_args(["--approve", "foo"],
1258
1352
                                      ApproveCmd)
1259
1353
 
 
1354
    def test_approve_short(self):
 
1355
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
 
1356
 
1260
1357
    def test_deny(self):
1261
1358
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1262
1359
 
 
1360
    def test_deny_short(self):
 
1361
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
 
1362
 
1263
1363
    def test_dump_json(self):
1264
1364
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1265
1365
 
1267
1367
        self.assert_command_from_args(["--is-enabled", "foo"],
1268
1368
                                      IsEnabledCmd)
1269
1369
 
 
1370
    def test_is_enabled_short(self):
 
1371
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
 
1372
 
 
1373
    def test_deny_before_remove(self):
 
1374
        options = self.parser.parse_args(["--deny", "--remove", "foo"])
 
1375
        check_option_syntax(self.parser, options)
 
1376
        commands = commands_from_options(options)
 
1377
        self.assertEqual(len(commands), 2)
 
1378
        self.assertIsInstance(commands[0], DenyCmd)
 
1379
        self.assertIsInstance(commands[1], RemoveCmd)
 
1380
 
 
1381
    def test_deny_before_remove_reversed(self):
 
1382
        options = self.parser.parse_args(["--remove", "--deny", "--all"])
 
1383
        check_option_syntax(self.parser, options)
 
1384
        commands = commands_from_options(options)
 
1385
        self.assertEqual(len(commands), 2)
 
1386
        self.assertIsInstance(commands[0], DenyCmd)
 
1387
        self.assertIsInstance(commands[1], RemoveCmd)
 
1388
 
 
1389
 
 
1390
class Test_check_option_syntax(unittest.TestCase):
 
1391
    # This mostly corresponds to the definition from has_actions() in
 
1392
    # check_option_syntax()
 
1393
    actions = {
 
1394
        # The actual values set here are not that important, but we do
 
1395
        # at least stick to the correct types, even though they are
 
1396
        # never used
 
1397
        "enable": True,
 
1398
        "disable": True,
 
1399
        "bump_timeout": True,
 
1400
        "start_checker": True,
 
1401
        "stop_checker": True,
 
1402
        "is_enabled": True,
 
1403
        "remove": True,
 
1404
        "checker": "x",
 
1405
        "timeout": datetime.timedelta(),
 
1406
        "extended_timeout": datetime.timedelta(),
 
1407
        "interval": datetime.timedelta(),
 
1408
        "approved_by_default": True,
 
1409
        "approval_delay": datetime.timedelta(),
 
1410
        "approval_duration": datetime.timedelta(),
 
1411
        "host": "x",
 
1412
        "secret": io.BytesIO(b"x"),
 
1413
        "approve": True,
 
1414
        "deny": True,
 
1415
    }
 
1416
 
 
1417
    def setUp(self):
 
1418
        self.parser = argparse.ArgumentParser()
 
1419
        add_command_line_options(self.parser)
 
1420
 
 
1421
    @contextlib.contextmanager
 
1422
    def assertParseError(self):
 
1423
        with self.assertRaises(SystemExit) as e:
 
1424
            with self.temporarily_suppress_stderr():
 
1425
                yield
 
1426
        # Exit code from argparse is guaranteed to be "2".  Reference:
 
1427
        # https://docs.python.org/3/library/argparse.html#exiting-methods
 
1428
        self.assertEqual(e.exception.code, 2)
 
1429
 
 
1430
    @staticmethod
 
1431
    @contextlib.contextmanager
 
1432
    def temporarily_suppress_stderr():
 
1433
        null = os.open(os.path.devnull, os.O_RDWR)
 
1434
        stderrcopy = os.dup(sys.stderr.fileno())
 
1435
        os.dup2(null, sys.stderr.fileno())
 
1436
        os.close(null)
 
1437
        try:
 
1438
            yield
 
1439
        finally:
 
1440
            # restore stderr
 
1441
            os.dup2(stderrcopy, sys.stderr.fileno())
 
1442
            os.close(stderrcopy)
 
1443
 
 
1444
    def check_option_syntax(self, options):
 
1445
        check_option_syntax(self.parser, options)
 
1446
 
 
1447
    def test_actions_requires_client_or_all(self):
 
1448
        for action, value in self.actions.items():
 
1449
            options = self.parser.parse_args()
 
1450
            setattr(options, action, value)
 
1451
            with self.assertParseError():
 
1452
                self.check_option_syntax(options)
 
1453
 
 
1454
    def test_actions_conflicts_with_verbose(self):
 
1455
        for action, value in self.actions.items():
 
1456
            options = self.parser.parse_args()
 
1457
            setattr(options, action, value)
 
1458
            options.verbose = True
 
1459
            with self.assertParseError():
 
1460
                self.check_option_syntax(options)
 
1461
 
 
1462
    def test_dump_json_conflicts_with_verbose(self):
 
1463
        options = self.parser.parse_args()
 
1464
        options.dump_json = True
 
1465
        options.verbose = True
 
1466
        with self.assertParseError():
 
1467
            self.check_option_syntax(options)
 
1468
 
 
1469
    def test_dump_json_conflicts_with_action(self):
 
1470
        for action, value in self.actions.items():
 
1471
            options = self.parser.parse_args()
 
1472
            setattr(options, action, value)
 
1473
            options.dump_json = True
 
1474
            with self.assertParseError():
 
1475
                self.check_option_syntax(options)
 
1476
 
 
1477
    def test_all_can_not_be_alone(self):
 
1478
        options = self.parser.parse_args()
 
1479
        options.all = True
 
1480
        with self.assertParseError():
 
1481
            self.check_option_syntax(options)
 
1482
 
 
1483
    def test_all_is_ok_with_any_action(self):
 
1484
        for action, value in self.actions.items():
 
1485
            options = self.parser.parse_args()
 
1486
            setattr(options, action, value)
 
1487
            options.all = True
 
1488
            self.check_option_syntax(options)
 
1489
 
 
1490
    def test_is_enabled_fails_without_client(self):
 
1491
        options = self.parser.parse_args()
 
1492
        options.is_enabled = True
 
1493
        with self.assertParseError():
 
1494
            self.check_option_syntax(options)
 
1495
 
 
1496
    def test_is_enabled_works_with_one_client(self):
 
1497
        options = self.parser.parse_args()
 
1498
        options.is_enabled = True
 
1499
        options.client = ["foo"]
 
1500
        self.check_option_syntax(options)
 
1501
 
 
1502
    def test_is_enabled_fails_with_two_clients(self):
 
1503
        options = self.parser.parse_args()
 
1504
        options.is_enabled = True
 
1505
        options.client = ["foo", "barbar"]
 
1506
        with self.assertParseError():
 
1507
            self.check_option_syntax(options)
 
1508
 
 
1509
    def test_remove_can_only_be_combined_with_action_deny(self):
 
1510
        for action, value in self.actions.items():
 
1511
            if action in {"remove", "deny"}:
 
1512
                continue
 
1513
            options = self.parser.parse_args()
 
1514
            setattr(options, action, value)
 
1515
            options.all = True
 
1516
            options.remove = True
 
1517
            with self.assertParseError():
 
1518
                self.check_option_syntax(options)
 
1519
 
1270
1520
 
1271
1521
 
1272
1522
def should_only_run_tests():