/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-09 16:14:06 UTC
  • mto: (237.7.594 trunk)
  • mto: This revision was merged to the branch mainline in revision 382.
  • Revision ID: teddy@recompile.se-20190309161406-7dfkgkwxukkozwrh
mandos-ctl: Add missing D-Bus debug output

* mandos-ctl (main): Add debug output when connecting to D-Bus client
                     objects.

Show diffs side-by-side

added added

removed removed

Lines of Context:
42
42
import json
43
43
import unittest
44
44
import logging
 
45
import io
 
46
import tempfile
 
47
import contextlib
45
48
 
46
49
import dbus
47
50
 
273
276
# Abstract classes first
274
277
class Command(object):
275
278
    """Abstract class for commands"""
276
 
    def run(self, mandos, clients):
 
279
    def run(self, mandos, mandos_objmgr, clients):
277
280
        """Normal commands should implement run_on_one_client(), but
278
281
        commands which want to operate on all clients at the same time
279
282
        can override this run() method instead."""
290
293
                    "LastApprovalRequest", "ApprovalDelay",
291
294
                    "ApprovalDuration", "Checker", "ExtendedTimeout",
292
295
                    "Expires", "LastCheckerStatus")
293
 
    def run(self, mandos, clients):
294
 
        print(self.output(clients))
 
296
    def run(self, mandos, mandos_objmgr, clients):
 
297
        print(self.output(mandos_objmgr, clients.values()))
295
298
 
296
299
class PropertyCmd(Command):
297
300
    """Abstract class for Actions for setting one client property"""
298
301
    def run_on_one_client(self, client, properties):
299
302
        """Set the Client's D-Bus property"""
 
303
        log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
 
304
                  client.__dbus_object_path__,
 
305
                  dbus.PROPERTIES_IFACE, client_interface,
 
306
                  self.property, self.value_to_set
 
307
                  if not isinstance(self.value_to_set, dbus.Boolean)
 
308
                  else bool(self.value_to_set))
300
309
        client.Set(client_interface, self.property, self.value_to_set,
301
310
                   dbus_interface=dbus.PROPERTIES_IFACE)
302
311
 
314
323
    @value_to_set.setter
315
324
    def value_to_set(self, value):
316
325
        """When setting, convert value to a datetime.timedelta"""
317
 
        self._vts = string_to_delta(value).total_seconds() * 1000
 
326
        self._vts = int(round(value.total_seconds() * 1000))
318
327
 
319
328
# Actual (non-abstract) command classes
320
329
 
322
331
    def __init__(self, verbose=False):
323
332
        self.verbose = verbose
324
333
 
325
 
    def output(self, clients):
 
334
    def output(self, mandos_objmgr, clients):
326
335
        default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
327
336
        keywords = default_keywords
328
337
        if self.verbose:
329
338
            keywords = self.all_keywords
330
 
        return str(self.TableOfClients(clients.values(), keywords))
 
339
        return str(self.TableOfClients(clients, keywords))
331
340
 
332
341
    class TableOfClients(object):
333
342
        tableheaders = {
406
415
 
407
416
 
408
417
class DumpJSONCmd(PrintCmd):
409
 
    def output(self, clients):
 
418
    def output(self, mandos_objmgr, clients):
410
419
        data = {client["Name"]:
411
420
                {key: self.dbus_boolean_to_bool(client[key])
412
421
                 for key in self.all_keywords}
424
433
            sys.exit(0)
425
434
        sys.exit(1)
426
435
    def is_enabled(self, client, properties):
427
 
        return bool(properties["Enabled"])
 
436
        log.debug("D-Bus: %s:%s:%s.Get(%r, %r)", busname,
 
437
                  client.__dbus_object_path__,
 
438
                  dbus.PROPERTIES_IFACE, client_interface,
 
439
                  "Enabled")
 
440
        return bool(client.Get(client_interface, "Enabled",
 
441
                               dbus_interface=dbus.PROPERTIES_IFACE))
428
442
 
429
443
class RemoveCmd(Command):
430
444
    def run_on_one_client(self, client, properties):
 
445
        log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
 
446
                  server_path, server_interface,
 
447
                  str(client.__dbus_object_path__))
431
448
        self.mandos.RemoveClient(client.__dbus_object_path__)
432
449
 
433
450
class ApproveCmd(Command):
434
451
    def run_on_one_client(self, client, properties):
 
452
        log.debug("D-Bus: %s:%s.Approve(True)",
 
453
                  client.__dbus_object_path__, client_interface)
435
454
        client.Approve(dbus.Boolean(True),
436
455
                       dbus_interface=client_interface)
437
456
 
438
457
class DenyCmd(Command):
439
458
    def run_on_one_client(self, client, properties):
 
459
        log.debug("D-Bus: %s:%s.Approve(False)",
 
460
                  client.__dbus_object_path__, client_interface)
440
461
        client.Approve(dbus.Boolean(False),
441
462
                       dbus_interface=client_interface)
442
463
 
475
496
    property = "Host"
476
497
 
477
498
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
 
499
    @property
 
500
    def value_to_set(self):
 
501
        return self._vts
 
502
    @value_to_set.setter
 
503
    def value_to_set(self, value):
 
504
        """When setting, read data from supplied file object"""
 
505
        self._vts = value.read()
 
506
        value.close()
478
507
    property = "Secret"
479
508
 
480
509
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
495
524
                             MillisecondsValueArgumentMixIn):
496
525
    property = "ApprovalDuration"
497
526
 
498
 
def has_actions(options):
499
 
    return any((options.enable,
500
 
                options.disable,
501
 
                options.bump_timeout,
502
 
                options.start_checker,
503
 
                options.stop_checker,
504
 
                options.is_enabled,
505
 
                options.remove,
506
 
                options.checker is not None,
507
 
                options.timeout is not None,
508
 
                options.extended_timeout is not None,
509
 
                options.interval is not None,
510
 
                options.approved_by_default is not None,
511
 
                options.approval_delay is not None,
512
 
                options.approval_duration is not None,
513
 
                options.host is not None,
514
 
                options.secret is not None,
515
 
                options.approve,
516
 
                options.deny))
517
 
 
518
527
def add_command_line_options(parser):
519
528
    parser.add_argument("--version", action="version",
520
529
                        version="%(prog)s {}".format(version),
546
555
                        help="Remove client")
547
556
    parser.add_argument("-c", "--checker",
548
557
                        help="Set checker command for client")
549
 
    parser.add_argument("-t", "--timeout",
 
558
    parser.add_argument("-t", "--timeout", type=string_to_delta,
550
559
                        help="Set timeout for client")
551
 
    parser.add_argument("--extended-timeout",
 
560
    parser.add_argument("--extended-timeout", type=string_to_delta,
552
561
                        help="Set extended timeout for client")
553
 
    parser.add_argument("-i", "--interval",
 
562
    parser.add_argument("-i", "--interval", type=string_to_delta,
554
563
                        help="Set checker interval for client")
555
564
    approve_deny_default = parser.add_mutually_exclusive_group()
556
565
    approve_deny_default.add_argument(
561
570
        "--deny-by-default", action="store_false",
562
571
        dest="approved_by_default",
563
572
        help="Set client to be denied by default")
564
 
    parser.add_argument("--approval-delay",
 
573
    parser.add_argument("--approval-delay", type=string_to_delta,
565
574
                        help="Set delay before client approve/deny")
566
 
    parser.add_argument("--approval-duration",
 
575
    parser.add_argument("--approval-duration", type=string_to_delta,
567
576
                        help="Set duration of one client approval")
568
577
    parser.add_argument("-H", "--host", help="Set host for client")
569
578
    parser.add_argument("-s", "--secret",
575
584
        help="Approve any current client request")
576
585
    approve_deny.add_argument("-D", "--deny", action="store_true",
577
586
                              help="Deny any current client request")
 
587
    parser.add_argument("--debug", action="store_true",
 
588
                        help="Debug mode (show D-Bus commands)")
578
589
    parser.add_argument("--check", action="store_true",
579
590
                        help="Run self-test")
580
591
    parser.add_argument("client", nargs="*", help="Client name")
605
616
    if options.is_enabled:
606
617
        commands.append(IsEnabledCmd())
607
618
 
608
 
    if options.remove:
609
 
        commands.append(RemoveCmd())
610
 
 
611
619
    if options.checker is not None:
612
 
        commands.append(SetCheckerCmd())
 
620
        commands.append(SetCheckerCmd(options.checker))
613
621
 
614
622
    if options.timeout is not None:
615
623
        commands.append(SetTimeoutCmd(options.timeout))
619
627
            SetExtendedTimeoutCmd(options.extended_timeout))
620
628
 
621
629
    if options.interval is not None:
622
 
        command.append(SetIntervalCmd(options.interval))
 
630
        commands.append(SetIntervalCmd(options.interval))
623
631
 
624
632
    if options.approved_by_default is not None:
625
633
        if options.approved_by_default:
626
 
            command.append(ApproveByDefaultCmd())
 
634
            commands.append(ApproveByDefaultCmd())
627
635
        else:
628
 
            command.append(DenyByDefaultCmd())
 
636
            commands.append(DenyByDefaultCmd())
629
637
 
630
638
    if options.approval_delay is not None:
631
 
        command.append(SetApprovalDelayCmd(options.approval_delay))
 
639
        commands.append(SetApprovalDelayCmd(options.approval_delay))
632
640
 
633
641
    if options.approval_duration is not None:
634
 
        command.append(
 
642
        commands.append(
635
643
            SetApprovalDurationCmd(options.approval_duration))
636
644
 
637
645
    if options.host is not None:
638
 
        command.append(SetHostCmd(options.host))
 
646
        commands.append(SetHostCmd(options.host))
639
647
 
640
648
    if options.secret is not None:
641
 
        command.append(SetSecretCmd(options.secret))
 
649
        commands.append(SetSecretCmd(options.secret))
642
650
 
643
651
    if options.approve:
644
652
        commands.append(ApproveCmd())
646
654
    if options.deny:
647
655
        commands.append(DenyCmd())
648
656
 
 
657
    if options.remove:
 
658
        commands.append(RemoveCmd())
 
659
 
649
660
    # If no command option has been given, show table of clients,
650
661
    # optionally verbosely
651
662
    if not commands:
654
665
    return commands
655
666
 
656
667
 
657
 
def main():
658
 
    parser = argparse.ArgumentParser()
659
 
 
660
 
    add_command_line_options(parser)
661
 
 
662
 
    options = parser.parse_args()
 
668
def check_option_syntax(parser, options):
 
669
    """Apply additional restrictions on options, not expressible in
 
670
argparse"""
 
671
 
 
672
    def has_actions(options):
 
673
        return any((options.enable,
 
674
                    options.disable,
 
675
                    options.bump_timeout,
 
676
                    options.start_checker,
 
677
                    options.stop_checker,
 
678
                    options.is_enabled,
 
679
                    options.remove,
 
680
                    options.checker is not None,
 
681
                    options.timeout is not None,
 
682
                    options.extended_timeout is not None,
 
683
                    options.interval is not None,
 
684
                    options.approved_by_default is not None,
 
685
                    options.approval_delay is not None,
 
686
                    options.approval_duration is not None,
 
687
                    options.host is not None,
 
688
                    options.secret is not None,
 
689
                    options.approve,
 
690
                    options.deny))
663
691
 
664
692
    if has_actions(options) and not (options.client or options.all):
665
693
        parser.error("Options require clients names or --all.")
672
700
        parser.error("--all requires an action.")
673
701
    if options.is_enabled and len(options.client) > 1:
674
702
        parser.error("--is-enabled requires exactly one client")
 
703
    if options.remove:
 
704
        options.remove = False
 
705
        if has_actions(options) and not options.deny:
 
706
            parser.error("--remove can only be combined with --deny")
 
707
        options.remove = True
 
708
 
 
709
 
 
710
def main():
 
711
    parser = argparse.ArgumentParser()
 
712
 
 
713
    add_command_line_options(parser)
 
714
 
 
715
    options = parser.parse_args()
 
716
 
 
717
    check_option_syntax(parser, options)
675
718
 
676
719
    clientnames = options.client
677
720
 
 
721
    if options.debug:
 
722
        log.setLevel(logging.DEBUG)
 
723
 
678
724
    try:
679
725
        bus = dbus.SystemBus()
 
726
        log.debug("D-Bus: Connect to: (name=%r, path=%r)", busname,
 
727
                  server_path)
680
728
        mandos_dbus_objc = bus.get_object(busname, server_path)
681
729
    except dbus.exceptions.DBusException:
682
730
        log.critical("Could not connect to Mandos server")
695
743
    dbus_filter = NullFilter()
696
744
    try:
697
745
        dbus_logger.addFilter(dbus_filter)
 
746
        log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", busname,
 
747
                  server_path, dbus.OBJECT_MANAGER_IFACE)
698
748
        mandos_clients = {path: ifs_and_props[client_interface]
699
749
                          for path, ifs_and_props in
700
750
                          mandos_serv_object_manager
712
762
    clients = {}
713
763
 
714
764
    if not clientnames:
715
 
        clients = {bus.get_object(busname, path): properties
 
765
        clients = {(log.debug("D-Bus: Connect to: (name=%r, path=%r)",
 
766
                              busname, str(path)) and False) or
 
767
                   bus.get_object(busname, path): properties
716
768
                   for path, properties in mandos_clients.items()}
717
769
    else:
718
770
        for name in clientnames:
719
771
            for path, client in mandos_clients.items():
720
772
                if client["Name"] == name:
 
773
                    log.debug("D-Bus: Connect to: (name=%r, path=%r)",
 
774
                              busname, str(path))
721
775
                    client_objc = bus.get_object(busname, path)
722
776
                    clients[client_objc] = client
723
777
                    break
728
782
    # Run all commands on clients
729
783
    commands = commands_from_options(options)
730
784
    for command in commands:
731
 
        command.run(mandos_serv, clients)
 
785
        command.run(mandos_serv, mandos_serv_object_manager, clients)
732
786
 
733
787
 
734
788
class Test_milliseconds_to_string(unittest.TestCase):
858
912
 
859
913
class TestPrintTableCmd(TestCmd):
860
914
    def test_normal(self):
861
 
        output = PrintTableCmd().output(self.clients)
 
915
        output = PrintTableCmd().output(None, self.clients.values())
862
916
        expected_output = """
863
917
Name   Enabled Timeout  Last Successful Check
864
918
foo    Yes     00:05:00 2019-02-03T00:00:00  
866
920
"""[1:-1]
867
921
        self.assertEqual(output, expected_output)
868
922
    def test_verbose(self):
869
 
        output = PrintTableCmd(verbose=True).output(self.clients)
 
923
        output = PrintTableCmd(verbose=True).output(
 
924
            None, self.clients.values())
870
925
        expected_output = """
871
926
Name   Enabled Timeout  Last Successful Check Created             Interval Host            Key ID                                                           Fingerprint                              Check Is Running Last Enabled        Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker              Extended Timeout Expires             Last Checker Status
872
927
foo    Yes     00:05:00 2019-02-03T00:00:00   2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No               2019-01-03T00:00:00 No                  Yes                                       00:00:00       00:00:01          fping -q -- %(host)s 00:15:00         2019-02-04T00:00:00 0                  
874
929
"""[1:-1]
875
930
        self.assertEqual(output, expected_output)
876
931
    def test_one_client(self):
877
 
        output = PrintTableCmd().output(self.one_client)
 
932
        output = PrintTableCmd().output(None, self.one_client.values())
878
933
        expected_output = """
879
934
Name Enabled Timeout  Last Successful Check
880
935
foo  Yes     00:05:00 2019-02-03T00:00:00  
935
990
        }
936
991
        return super(TestDumpJSONCmd, self).setUp()
937
992
    def test_normal(self):
938
 
        json_data = json.loads(DumpJSONCmd().output(self.clients))
 
993
        json_data = json.loads(DumpJSONCmd().output(None,
 
994
                                                    self.clients))
939
995
        self.assertDictEqual(json_data, self.expected_json)
940
996
    def test_one_client(self):
941
997
        clients = self.one_client
942
 
        json_data = json.loads(DumpJSONCmd().output(clients))
 
998
        json_data = json.loads(DumpJSONCmd().output(None, clients))
943
999
        expected_json = {"foo": self.expected_json["foo"]}
944
1000
        self.assertDictEqual(json_data, expected_json)
945
1001
 
949
1005
                            for client, properties in self.clients.items()))
950
1006
    def test_is_enabled_run_exits_successfully(self):
951
1007
        with self.assertRaises(SystemExit) as e:
952
 
            IsEnabledCmd().run(None, self.one_client)
 
1008
            IsEnabledCmd().run(None, None, self.one_client)
953
1009
        if e.exception.code is not None:
954
1010
            self.assertEqual(e.exception.code, 0)
955
1011
        else:
957
1013
    def test_is_enabled_run_exits_with_failure(self):
958
1014
        self.client.attributes["Enabled"] = dbus.Boolean(False)
959
1015
        with self.assertRaises(SystemExit) as e:
960
 
            IsEnabledCmd().run(None, self.one_client)
 
1016
            IsEnabledCmd().run(None, None, self.one_client)
961
1017
        if isinstance(e.exception.code, int):
962
1018
            self.assertNotEqual(e.exception.code, 0)
963
1019
        else:
972
1028
                self.calls.append(("RemoveClient", (dbus_path,)))
973
1029
        mandos = MockMandos()
974
1030
        super(TestRemoveCmd, self).setUp()
975
 
        RemoveCmd().run(mandos, self.clients)
 
1031
        RemoveCmd().run(mandos, None, self.clients)
976
1032
        self.assertEqual(len(mandos.calls), 2)
977
1033
        for client in self.clients:
978
1034
            self.assertIn(("RemoveClient",
981
1037
 
982
1038
class TestApproveCmd(TestCmd):
983
1039
    def test_approve(self):
984
 
        ApproveCmd().run(None, self.clients)
 
1040
        ApproveCmd().run(None, None, self.clients)
985
1041
        for client in self.clients:
986
1042
            self.assertIn(("Approve", (True, client_interface)),
987
1043
                          client.calls)
988
1044
 
989
1045
class TestDenyCmd(TestCmd):
990
1046
    def test_deny(self):
991
 
        DenyCmd().run(None, self.clients)
 
1047
        DenyCmd().run(None, None, self.clients)
992
1048
        for client in self.clients:
993
1049
            self.assertIn(("Approve", (False, client_interface)),
994
1050
                          client.calls)
998
1054
        for client in self.clients:
999
1055
            client.attributes["Enabled"] = False
1000
1056
 
1001
 
        EnableCmd().run(None, self.clients)
 
1057
        EnableCmd().run(None, None, self.clients)
1002
1058
 
1003
1059
        for client in self.clients:
1004
1060
            self.assertTrue(client.attributes["Enabled"])
1005
1061
 
1006
1062
class TestDisableCmd(TestCmd):
1007
1063
    def test_disable(self):
1008
 
        DisableCmd().run(None, self.clients)
 
1064
        DisableCmd().run(None, None, self.clients)
1009
1065
 
1010
1066
        for client in self.clients:
1011
1067
            self.assertFalse(client.attributes["Enabled"])
1033
1089
                self.assertNotIsInstance(value, Unique)
1034
1090
                self.assertEqual(value, value_to_get)
1035
1091
    def run_command(self, value, clients):
1036
 
        self.command().run(None, clients)
 
1092
        self.command().run(None, None, clients)
1037
1093
 
1038
1094
class TestBumpTimeoutCmd(TestPropertyCmd):
1039
1095
    command = BumpTimeoutCmd
1068
1124
            return
1069
1125
        return super(TestValueArgumentPropertyCmd, self).runTest()
1070
1126
    def run_command(self, value, clients):
1071
 
        self.command(value).run(None, clients)
 
1127
        self.command(value).run(None, None, clients)
1072
1128
 
1073
1129
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1074
1130
    command = SetCheckerCmd
1083
1139
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1084
1140
    command = SetSecretCmd
1085
1141
    property = "Secret"
1086
 
    values_to_set = [b"", b"secret"]
 
1142
    values_to_set = [io.BytesIO(b""),
 
1143
                     io.BytesIO(b"secret\0xyzzy\nbar")]
 
1144
    values_to_get = [b"", b"secret\0xyzzy\nbar"]
1087
1145
 
1088
1146
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1089
1147
    command = SetTimeoutCmd
1090
1148
    property = "Timeout"
1091
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1092
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1149
    values_to_set = [datetime.timedelta(),
 
1150
                     datetime.timedelta(minutes=5),
 
1151
                     datetime.timedelta(seconds=1),
 
1152
                     datetime.timedelta(weeks=1),
 
1153
                     datetime.timedelta(weeks=52)]
 
1154
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1093
1155
 
1094
1156
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1095
1157
    command = SetExtendedTimeoutCmd
1096
1158
    property = "ExtendedTimeout"
1097
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1098
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1159
    values_to_set = [datetime.timedelta(),
 
1160
                     datetime.timedelta(minutes=5),
 
1161
                     datetime.timedelta(seconds=1),
 
1162
                     datetime.timedelta(weeks=1),
 
1163
                     datetime.timedelta(weeks=52)]
 
1164
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1099
1165
 
1100
1166
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1101
1167
    command = SetIntervalCmd
1102
1168
    property = "Interval"
1103
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1104
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1169
    values_to_set = [datetime.timedelta(),
 
1170
                     datetime.timedelta(minutes=5),
 
1171
                     datetime.timedelta(seconds=1),
 
1172
                     datetime.timedelta(weeks=1),
 
1173
                     datetime.timedelta(weeks=52)]
 
1174
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1105
1175
 
1106
1176
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1107
1177
    command = SetApprovalDelayCmd
1108
1178
    property = "ApprovalDelay"
1109
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1110
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1179
    values_to_set = [datetime.timedelta(),
 
1180
                     datetime.timedelta(minutes=5),
 
1181
                     datetime.timedelta(seconds=1),
 
1182
                     datetime.timedelta(weeks=1),
 
1183
                     datetime.timedelta(weeks=52)]
 
1184
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1111
1185
 
1112
1186
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1113
1187
    command = SetApprovalDurationCmd
1114
1188
    property = "ApprovalDuration"
1115
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1116
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
 
1189
    values_to_set = [datetime.timedelta(),
 
1190
                     datetime.timedelta(minutes=5),
 
1191
                     datetime.timedelta(seconds=1),
 
1192
                     datetime.timedelta(weeks=1),
 
1193
                     datetime.timedelta(weeks=52)]
 
1194
    values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1117
1195
 
1118
 
class TestOptions(unittest.TestCase):
 
1196
class Test_command_from_options(unittest.TestCase):
1119
1197
    def setUp(self):
1120
1198
        self.parser = argparse.ArgumentParser()
1121
1199
        add_command_line_options(self.parser)
1123
1201
        """Assert that parsing ARGS should result in an instance of
1124
1202
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1125
1203
        options = self.parser.parse_args(args)
 
1204
        check_option_syntax(self.parser, options)
1126
1205
        commands = commands_from_options(options)
1127
1206
        self.assertEqual(len(commands), 1)
1128
1207
        command = commands[0]
1129
1208
        self.assertIsInstance(command, command_cls)
1130
1209
        for key, value in cmd_attrs.items():
1131
1210
            self.assertEqual(getattr(command, key), value)
1132
 
    def test_default_is_show_table(self):
 
1211
    def test_print_table(self):
1133
1212
        self.assert_command_from_args([], PrintTableCmd,
1134
1213
                                      verbose=False)
1135
 
    def test_show_table_verbose(self):
 
1214
 
 
1215
    def test_print_table_verbose(self):
1136
1216
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
1137
1217
                                      verbose=True)
 
1218
 
 
1219
    def test_print_table_verbose_short(self):
 
1220
        self.assert_command_from_args(["-v"], PrintTableCmd,
 
1221
                                      verbose=True)
 
1222
 
1138
1223
    def test_enable(self):
1139
 
        self.assert_command_from_args(["--enable"], EnableCmd)
 
1224
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)
 
1225
 
 
1226
    def test_enable_short(self):
 
1227
        self.assert_command_from_args(["-e", "foo"], EnableCmd)
 
1228
 
1140
1229
    def test_disable(self):
1141
 
        self.assert_command_from_args(["--disable"], DisableCmd)
 
1230
        self.assert_command_from_args(["--disable", "foo"],
 
1231
                                      DisableCmd)
 
1232
 
 
1233
    def test_disable_short(self):
 
1234
        self.assert_command_from_args(["-d", "foo"], DisableCmd)
 
1235
 
 
1236
    def test_bump_timeout(self):
 
1237
        self.assert_command_from_args(["--bump-timeout", "foo"],
 
1238
                                      BumpTimeoutCmd)
 
1239
 
 
1240
    def test_bump_timeout_short(self):
 
1241
        self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
 
1242
 
 
1243
    def test_start_checker(self):
 
1244
        self.assert_command_from_args(["--start-checker", "foo"],
 
1245
                                      StartCheckerCmd)
 
1246
 
 
1247
    def test_stop_checker(self):
 
1248
        self.assert_command_from_args(["--stop-checker", "foo"],
 
1249
                                      StopCheckerCmd)
 
1250
 
 
1251
    def test_remove(self):
 
1252
        self.assert_command_from_args(["--remove", "foo"],
 
1253
                                      RemoveCmd)
 
1254
 
 
1255
    def test_remove_short(self):
 
1256
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
 
1257
 
 
1258
    def test_checker(self):
 
1259
        self.assert_command_from_args(["--checker", ":", "foo"],
 
1260
                                      SetCheckerCmd, value_to_set=":")
 
1261
 
 
1262
    def test_checker_empty(self):
 
1263
        self.assert_command_from_args(["--checker", "", "foo"],
 
1264
                                      SetCheckerCmd, value_to_set="")
 
1265
 
 
1266
    def test_checker_short(self):
 
1267
        self.assert_command_from_args(["-c", ":", "foo"],
 
1268
                                      SetCheckerCmd, value_to_set=":")
 
1269
 
 
1270
    def test_timeout(self):
 
1271
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
 
1272
                                      SetTimeoutCmd,
 
1273
                                      value_to_set=300000)
 
1274
 
 
1275
    def test_timeout_short(self):
 
1276
        self.assert_command_from_args(["-t", "PT5M", "foo"],
 
1277
                                      SetTimeoutCmd,
 
1278
                                      value_to_set=300000)
 
1279
 
 
1280
    def test_extended_timeout(self):
 
1281
        self.assert_command_from_args(["--extended-timeout", "PT15M",
 
1282
                                       "foo"],
 
1283
                                      SetExtendedTimeoutCmd,
 
1284
                                      value_to_set=900000)
 
1285
 
 
1286
    def test_interval(self):
 
1287
        self.assert_command_from_args(["--interval", "PT2M", "foo"],
 
1288
                                      SetIntervalCmd,
 
1289
                                      value_to_set=120000)
 
1290
 
 
1291
    def test_interval_short(self):
 
1292
        self.assert_command_from_args(["-i", "PT2M", "foo"],
 
1293
                                      SetIntervalCmd,
 
1294
                                      value_to_set=120000)
 
1295
 
 
1296
    def test_approve_by_default(self):
 
1297
        self.assert_command_from_args(["--approve-by-default", "foo"],
 
1298
                                      ApproveByDefaultCmd)
 
1299
 
 
1300
    def test_deny_by_default(self):
 
1301
        self.assert_command_from_args(["--deny-by-default", "foo"],
 
1302
                                      DenyByDefaultCmd)
 
1303
 
 
1304
    def test_approval_delay(self):
 
1305
        self.assert_command_from_args(["--approval-delay", "PT30S",
 
1306
                                       "foo"], SetApprovalDelayCmd,
 
1307
                                      value_to_set=30000)
 
1308
 
 
1309
    def test_approval_duration(self):
 
1310
        self.assert_command_from_args(["--approval-duration", "PT1S",
 
1311
                                       "foo"], SetApprovalDurationCmd,
 
1312
                                      value_to_set=1000)
 
1313
 
 
1314
    def test_host(self):
 
1315
        self.assert_command_from_args(["--host", "foo.example.org",
 
1316
                                       "foo"], SetHostCmd,
 
1317
                                      value_to_set="foo.example.org")
 
1318
 
 
1319
    def test_host_short(self):
 
1320
        self.assert_command_from_args(["-H", "foo.example.org",
 
1321
                                       "foo"], SetHostCmd,
 
1322
                                      value_to_set="foo.example.org")
 
1323
 
 
1324
    def test_secret_devnull(self):
 
1325
        self.assert_command_from_args(["--secret", os.path.devnull,
 
1326
                                       "foo"], SetSecretCmd,
 
1327
                                      value_to_set=b"")
 
1328
 
 
1329
    def test_secret_tempfile(self):
 
1330
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1331
            value = b"secret\0xyzzy\nbar"
 
1332
            f.write(value)
 
1333
            f.seek(0)
 
1334
            self.assert_command_from_args(["--secret", f.name,
 
1335
                                           "foo"], SetSecretCmd,
 
1336
                                          value_to_set=value)
 
1337
 
 
1338
    def test_secret_devnull_short(self):
 
1339
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
 
1340
                                      SetSecretCmd, value_to_set=b"")
 
1341
 
 
1342
    def test_secret_tempfile_short(self):
 
1343
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
 
1344
            value = b"secret\0xyzzy\nbar"
 
1345
            f.write(value)
 
1346
            f.seek(0)
 
1347
            self.assert_command_from_args(["-s", f.name, "foo"],
 
1348
                                          SetSecretCmd,
 
1349
                                          value_to_set=value)
 
1350
 
 
1351
    def test_approve(self):
 
1352
        self.assert_command_from_args(["--approve", "foo"],
 
1353
                                      ApproveCmd)
 
1354
 
 
1355
    def test_approve_short(self):
 
1356
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
 
1357
 
 
1358
    def test_deny(self):
 
1359
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
 
1360
 
 
1361
    def test_deny_short(self):
 
1362
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
 
1363
 
 
1364
    def test_dump_json(self):
 
1365
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
 
1366
 
 
1367
    def test_is_enabled(self):
 
1368
        self.assert_command_from_args(["--is-enabled", "foo"],
 
1369
                                      IsEnabledCmd)
 
1370
 
 
1371
    def test_is_enabled_short(self):
 
1372
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
 
1373
 
 
1374
    def test_deny_before_remove(self):
 
1375
        options = self.parser.parse_args(["--deny", "--remove", "foo"])
 
1376
        check_option_syntax(self.parser, options)
 
1377
        commands = commands_from_options(options)
 
1378
        self.assertEqual(len(commands), 2)
 
1379
        self.assertIsInstance(commands[0], DenyCmd)
 
1380
        self.assertIsInstance(commands[1], RemoveCmd)
 
1381
 
 
1382
    def test_deny_before_remove_reversed(self):
 
1383
        options = self.parser.parse_args(["--remove", "--deny", "--all"])
 
1384
        check_option_syntax(self.parser, options)
 
1385
        commands = commands_from_options(options)
 
1386
        self.assertEqual(len(commands), 2)
 
1387
        self.assertIsInstance(commands[0], DenyCmd)
 
1388
        self.assertIsInstance(commands[1], RemoveCmd)
 
1389
 
 
1390
 
 
1391
class Test_check_option_syntax(unittest.TestCase):
 
1392
    # This mostly corresponds to the definition from has_actions() in
 
1393
    # check_option_syntax()
 
1394
    actions = {
 
1395
        # The actual values set here are not that important, but we do
 
1396
        # at least stick to the correct types, even though they are
 
1397
        # never used
 
1398
        "enable": True,
 
1399
        "disable": True,
 
1400
        "bump_timeout": True,
 
1401
        "start_checker": True,
 
1402
        "stop_checker": True,
 
1403
        "is_enabled": True,
 
1404
        "remove": True,
 
1405
        "checker": "x",
 
1406
        "timeout": datetime.timedelta(),
 
1407
        "extended_timeout": datetime.timedelta(),
 
1408
        "interval": datetime.timedelta(),
 
1409
        "approved_by_default": True,
 
1410
        "approval_delay": datetime.timedelta(),
 
1411
        "approval_duration": datetime.timedelta(),
 
1412
        "host": "x",
 
1413
        "secret": io.BytesIO(b"x"),
 
1414
        "approve": True,
 
1415
        "deny": True,
 
1416
    }
 
1417
 
 
1418
    def setUp(self):
 
1419
        self.parser = argparse.ArgumentParser()
 
1420
        add_command_line_options(self.parser)
 
1421
 
 
1422
    @contextlib.contextmanager
 
1423
    def assertParseError(self):
 
1424
        with self.assertRaises(SystemExit) as e:
 
1425
            with self.temporarily_suppress_stderr():
 
1426
                yield
 
1427
        # Exit code from argparse is guaranteed to be "2".  Reference:
 
1428
        # https://docs.python.org/3/library/argparse.html#exiting-methods
 
1429
        self.assertEqual(e.exception.code, 2)
 
1430
 
 
1431
    @staticmethod
 
1432
    @contextlib.contextmanager
 
1433
    def temporarily_suppress_stderr():
 
1434
        null = os.open(os.path.devnull, os.O_RDWR)
 
1435
        stderrcopy = os.dup(sys.stderr.fileno())
 
1436
        os.dup2(null, sys.stderr.fileno())
 
1437
        os.close(null)
 
1438
        try:
 
1439
            yield
 
1440
        finally:
 
1441
            # restore stderr
 
1442
            os.dup2(stderrcopy, sys.stderr.fileno())
 
1443
            os.close(stderrcopy)
 
1444
 
 
1445
    def check_option_syntax(self, options):
 
1446
        check_option_syntax(self.parser, options)
 
1447
 
 
1448
    def test_actions_requires_client_or_all(self):
 
1449
        for action, value in self.actions.items():
 
1450
            options = self.parser.parse_args()
 
1451
            setattr(options, action, value)
 
1452
            with self.assertParseError():
 
1453
                self.check_option_syntax(options)
 
1454
 
 
1455
    def test_actions_conflicts_with_verbose(self):
 
1456
        for action, value in self.actions.items():
 
1457
            options = self.parser.parse_args()
 
1458
            setattr(options, action, value)
 
1459
            options.verbose = True
 
1460
            with self.assertParseError():
 
1461
                self.check_option_syntax(options)
 
1462
 
 
1463
    def test_dump_json_conflicts_with_verbose(self):
 
1464
        options = self.parser.parse_args()
 
1465
        options.dump_json = True
 
1466
        options.verbose = True
 
1467
        with self.assertParseError():
 
1468
            self.check_option_syntax(options)
 
1469
 
 
1470
    def test_dump_json_conflicts_with_action(self):
 
1471
        for action, value in self.actions.items():
 
1472
            options = self.parser.parse_args()
 
1473
            setattr(options, action, value)
 
1474
            options.dump_json = True
 
1475
            with self.assertParseError():
 
1476
                self.check_option_syntax(options)
 
1477
 
 
1478
    def test_all_can_not_be_alone(self):
 
1479
        options = self.parser.parse_args()
 
1480
        options.all = True
 
1481
        with self.assertParseError():
 
1482
            self.check_option_syntax(options)
 
1483
 
 
1484
    def test_all_is_ok_with_any_action(self):
 
1485
        for action, value in self.actions.items():
 
1486
            options = self.parser.parse_args()
 
1487
            setattr(options, action, value)
 
1488
            options.all = True
 
1489
            self.check_option_syntax(options)
 
1490
 
 
1491
    def test_is_enabled_fails_without_client(self):
 
1492
        options = self.parser.parse_args()
 
1493
        options.is_enabled = True
 
1494
        with self.assertParseError():
 
1495
            self.check_option_syntax(options)
 
1496
 
 
1497
    def test_is_enabled_works_with_one_client(self):
 
1498
        options = self.parser.parse_args()
 
1499
        options.is_enabled = True
 
1500
        options.client = ["foo"]
 
1501
        self.check_option_syntax(options)
 
1502
 
 
1503
    def test_is_enabled_fails_with_two_clients(self):
 
1504
        options = self.parser.parse_args()
 
1505
        options.is_enabled = True
 
1506
        options.client = ["foo", "barbar"]
 
1507
        with self.assertParseError():
 
1508
            self.check_option_syntax(options)
 
1509
 
 
1510
    def test_remove_can_only_be_combined_with_action_deny(self):
 
1511
        for action, value in self.actions.items():
 
1512
            if action in {"remove", "deny"}:
 
1513
                continue
 
1514
            options = self.parser.parse_args()
 
1515
            setattr(options, action, value)
 
1516
            options.all = True
 
1517
            options.remove = True
 
1518
            with self.assertParseError():
 
1519
                self.check_option_syntax(options)
1142
1520
 
1143
1521
 
1144
1522