/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-03 16:20:02 UTC
  • Revision ID: teddy@recompile.se-20190303162002-y0ezr98f71d0jfro
mandos-ctl: Refactor; test PrintTableCmd instead of TableOfClients

* mandos-ctl (Test_TableOfClients): Removed.
  (TestCmd): New abstract class for tests for command classes.
  (TestPrintTableCmd): New.

Show diffs side-by-side

added added

removed removed

Lines of Context:
268
268
    return value
269
269
 
270
270
 
 
271
class TableOfClients(object):
 
272
    tableheaders = {
 
273
        "Name": "Name",
 
274
        "Enabled": "Enabled",
 
275
        "Timeout": "Timeout",
 
276
        "LastCheckedOK": "Last Successful Check",
 
277
        "LastApprovalRequest": "Last Approval Request",
 
278
        "Created": "Created",
 
279
        "Interval": "Interval",
 
280
        "Host": "Host",
 
281
        "Fingerprint": "Fingerprint",
 
282
        "KeyID": "Key ID",
 
283
        "CheckerRunning": "Check Is Running",
 
284
        "LastEnabled": "Last Enabled",
 
285
        "ApprovalPending": "Approval Is Pending",
 
286
        "ApprovedByDefault": "Approved By Default",
 
287
        "ApprovalDelay": "Approval Delay",
 
288
        "ApprovalDuration": "Approval Duration",
 
289
        "Checker": "Checker",
 
290
        "ExtendedTimeout": "Extended Timeout",
 
291
        "Expires": "Expires",
 
292
        "LastCheckerStatus": "Last Checker Status",
 
293
    }
 
294
 
 
295
    def __init__(self, clients, keywords, tableheaders=None):
 
296
        self.clients = clients
 
297
        self.keywords = keywords
 
298
        if tableheaders is not None:
 
299
            self.tableheaders = tableheaders
 
300
 
 
301
    def __str__(self):
 
302
        return "\n".join(self.rows())
 
303
 
 
304
    if sys.version_info.major == 2:
 
305
        __unicode__ = __str__
 
306
        def __str__(self):
 
307
            return str(self).encode(locale.getpreferredencoding())
 
308
 
 
309
    def rows(self):
 
310
        format_string = self.row_formatting_string()
 
311
        rows = [self.header_line(format_string)]
 
312
        rows.extend(self.client_line(client, format_string)
 
313
                    for client in self.clients)
 
314
        return rows
 
315
 
 
316
    def row_formatting_string(self):
 
317
        "Format string used to format table rows"
 
318
        return " ".join("{{{key}:{width}}}".format(
 
319
            width=max(len(self.tableheaders[key]),
 
320
                      *(len(self.string_from_client(client, key))
 
321
                        for client in self.clients)),
 
322
            key=key)
 
323
                        for key in self.keywords)
 
324
 
 
325
    def string_from_client(self, client, key):
 
326
        return self.valuetostring(client[key], key)
 
327
 
 
328
    @staticmethod
 
329
    def valuetostring(value, keyword):
 
330
        if isinstance(value, dbus.Boolean):
 
331
            return "Yes" if value else "No"
 
332
        if keyword in ("Timeout", "Interval", "ApprovalDelay",
 
333
                       "ApprovalDuration", "ExtendedTimeout"):
 
334
            return milliseconds_to_string(value)
 
335
        return str(value)
 
336
 
 
337
    def header_line(self, format_string):
 
338
        return format_string.format(**self.tableheaders)
 
339
 
 
340
    def client_line(self, client, format_string):
 
341
        return format_string.format(
 
342
            **{key: self.string_from_client(client, key)
 
343
               for key in self.keywords})
 
344
 
 
345
 
271
346
## Classes for commands.
272
347
 
273
348
# Abstract classes first
278
353
        commands which want to operate on all clients at the same time
279
354
        can override this run() method instead."""
280
355
        self.mandos = mandos
281
 
        for client, properties in clients.items():
282
 
            self.run_on_one_client(client, properties)
 
356
        for client in clients:
 
357
            self.run_on_one_client(client)
283
358
 
284
359
class PrintCmd(Command):
285
360
    """Abstract class for commands printing client details"""
295
370
 
296
371
class PropertyCmd(Command):
297
372
    """Abstract class for Actions for setting one client property"""
298
 
    def run_on_one_client(self, client, properties):
 
373
    def run_on_one_client(self, client):
299
374
        """Set the Client's D-Bus property"""
300
375
        client.Set(client_interface, self.property, self.value_to_set,
301
376
                   dbus_interface=dbus.PROPERTIES_IFACE)
321
396
class PrintTableCmd(PrintCmd):
322
397
    def __init__(self, verbose=False):
323
398
        self.verbose = verbose
324
 
 
325
399
    def output(self, clients):
326
 
        default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
327
 
        keywords = default_keywords
328
400
        if self.verbose:
329
401
            keywords = self.all_keywords
330
 
        return str(self.TableOfClients(clients.values(), keywords))
331
 
 
332
 
    class TableOfClients(object):
333
 
        tableheaders = {
334
 
            "Name": "Name",
335
 
            "Enabled": "Enabled",
336
 
            "Timeout": "Timeout",
337
 
            "LastCheckedOK": "Last Successful Check",
338
 
            "LastApprovalRequest": "Last Approval Request",
339
 
            "Created": "Created",
340
 
            "Interval": "Interval",
341
 
            "Host": "Host",
342
 
            "Fingerprint": "Fingerprint",
343
 
            "KeyID": "Key ID",
344
 
            "CheckerRunning": "Check Is Running",
345
 
            "LastEnabled": "Last Enabled",
346
 
            "ApprovalPending": "Approval Is Pending",
347
 
            "ApprovedByDefault": "Approved By Default",
348
 
            "ApprovalDelay": "Approval Delay",
349
 
            "ApprovalDuration": "Approval Duration",
350
 
            "Checker": "Checker",
351
 
            "ExtendedTimeout": "Extended Timeout",
352
 
            "Expires": "Expires",
353
 
            "LastCheckerStatus": "Last Checker Status",
354
 
        }
355
 
 
356
 
        def __init__(self, clients, keywords, tableheaders=None):
357
 
            self.clients = clients
358
 
            self.keywords = keywords
359
 
            if tableheaders is not None:
360
 
                self.tableheaders = tableheaders
361
 
 
362
 
        def __str__(self):
363
 
            return "\n".join(self.rows())
364
 
 
365
 
        if sys.version_info.major == 2:
366
 
            __unicode__ = __str__
367
 
            def __str__(self):
368
 
                return str(self).encode(locale.getpreferredencoding())
369
 
 
370
 
        def rows(self):
371
 
            format_string = self.row_formatting_string()
372
 
            rows = [self.header_line(format_string)]
373
 
            rows.extend(self.client_line(client, format_string)
374
 
                        for client in self.clients)
375
 
            return rows
376
 
 
377
 
        def row_formatting_string(self):
378
 
            "Format string used to format table rows"
379
 
            return " ".join("{{{key}:{width}}}".format(
380
 
                width=max(len(self.tableheaders[key]),
381
 
                          *(len(self.string_from_client(client, key))
382
 
                            for client in self.clients)),
383
 
                key=key)
384
 
                            for key in self.keywords)
385
 
 
386
 
        def string_from_client(self, client, key):
387
 
            return self.valuetostring(client[key], key)
388
 
 
389
 
        @staticmethod
390
 
        def valuetostring(value, keyword):
391
 
            if isinstance(value, dbus.Boolean):
392
 
                return "Yes" if value else "No"
393
 
            if keyword in ("Timeout", "Interval", "ApprovalDelay",
394
 
                           "ApprovalDuration", "ExtendedTimeout"):
395
 
                return milliseconds_to_string(value)
396
 
            return str(value)
397
 
 
398
 
        def header_line(self, format_string):
399
 
            return format_string.format(**self.tableheaders)
400
 
 
401
 
        def client_line(self, client, format_string):
402
 
            return format_string.format(
403
 
                **{key: self.string_from_client(client, key)
404
 
                   for key in self.keywords})
405
 
 
406
 
 
 
402
        else:
 
403
            keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
 
404
        return str(TableOfClients(clients.values(), keywords))
407
405
 
408
406
class DumpJSONCmd(PrintCmd):
409
407
    def output(self, clients):
419
417
        return value
420
418
 
421
419
class IsEnabledCmd(Command):
422
 
    def run_on_one_client(self, client, properties):
423
 
        if self.is_enabled(client, properties):
 
420
    def run_on_one_client(self, client):
 
421
        if self.is_enabled(client):
424
422
            sys.exit(0)
425
423
        sys.exit(1)
426
 
    def is_enabled(self, client, properties):
427
 
        return bool(properties["Enabled"])
 
424
    def is_enabled(self, client):
 
425
        return client.Get(client_interface, "Enabled",
 
426
                          dbus_interface=dbus.PROPERTIES_IFACE)
428
427
 
429
428
class RemoveCmd(Command):
430
 
    def run_on_one_client(self, client, properties):
 
429
    def run_on_one_client(self, client):
431
430
        self.mandos.RemoveClient(client.__dbus_object_path__)
432
431
 
433
432
class ApproveCmd(Command):
434
 
    def run_on_one_client(self, client, properties):
 
433
    def run_on_one_client(self, client):
435
434
        client.Approve(dbus.Boolean(True),
436
435
                       dbus_interface=client_interface)
437
436
 
438
437
class DenyCmd(Command):
439
 
    def run_on_one_client(self, client, properties):
 
438
    def run_on_one_client(self, client):
440
439
        client.Approve(dbus.Boolean(False),
441
440
                       dbus_interface=client_interface)
442
441
 
515
514
                options.approve,
516
515
                options.deny))
517
516
 
518
 
def add_command_line_options(parser):
 
517
 
 
518
def commands_and_clients_from_options(args=None):
 
519
    if args is None:
 
520
        args=sys.argv[1:]
 
521
    parser = argparse.ArgumentParser()
519
522
    parser.add_argument("--version", action="version",
520
523
                        version="%(prog)s {}".format(version),
521
524
                        help="show version number and exit")
578
581
    parser.add_argument("--check", action="store_true",
579
582
                        help="Run self-test")
580
583
    parser.add_argument("client", nargs="*", help="Client name")
581
 
 
582
 
 
583
 
def commands_from_options(options):
 
584
    options = parser.parse_args(args=args)
 
585
 
 
586
    if has_actions(options) and not (options.client or options.all):
 
587
        parser.error("Options require clients names or --all.")
 
588
    if options.verbose and has_actions(options):
 
589
        parser.error("--verbose can only be used alone.")
 
590
    if options.dump_json and (options.verbose
 
591
                              or has_actions(options)):
 
592
        parser.error("--dump-json can only be used alone.")
 
593
    if options.all and not has_actions(options):
 
594
        parser.error("--all requires an action.")
 
595
    if options.is_enabled and len(options.client) > 1:
 
596
            parser.error("--is-enabled requires exactly one client")
584
597
 
585
598
    commands = []
586
599
 
594
607
        commands.append(DisableCmd())
595
608
 
596
609
    if options.bump_timeout:
597
 
        commands.append(BumpTimeoutCmd())
 
610
        commands.append(BumpTimeoutCmd(options.bump_timeout))
598
611
 
599
612
    if options.start_checker:
600
613
        commands.append(StartCheckerCmd())
651
664
    if not commands:
652
665
        commands.append(PrintTableCmd(verbose=options.verbose))
653
666
 
654
 
    return commands
 
667
    return commands, options.client
655
668
 
656
669
 
657
670
def main():
658
 
    parser = argparse.ArgumentParser()
659
 
 
660
 
    add_command_line_options(parser)
661
 
 
662
 
    options = parser.parse_args()
663
 
 
664
 
    if has_actions(options) and not (options.client or options.all):
665
 
        parser.error("Options require clients names or --all.")
666
 
    if options.verbose and has_actions(options):
667
 
        parser.error("--verbose can only be used alone.")
668
 
    if options.dump_json and (options.verbose
669
 
                              or has_actions(options)):
670
 
        parser.error("--dump-json can only be used alone.")
671
 
    if options.all and not has_actions(options):
672
 
        parser.error("--all requires an action.")
673
 
    if options.is_enabled and len(options.client) > 1:
674
 
        parser.error("--is-enabled requires exactly one client")
675
 
 
676
 
    clientnames = options.client
 
671
    commands, clientnames = commands_and_clients_from_options()
677
672
 
678
673
    try:
679
674
        bus = dbus.SystemBus()
693
688
        def filter(self, record):
694
689
            return False
695
690
    dbus_filter = NullFilter()
 
691
    dbus_logger.addFilter(dbus_filter)
696
692
    try:
697
 
        dbus_logger.addFilter(dbus_filter)
698
 
        mandos_clients = {path: ifs_and_props[client_interface]
699
 
                          for path, ifs_and_props in
700
 
                          mandos_serv_object_manager
701
 
                          .GetManagedObjects().items()
702
 
                          if client_interface in ifs_and_props}
 
693
        try:
 
694
            mandos_clients = {path: ifs_and_props[client_interface]
 
695
                              for path, ifs_and_props in
 
696
                              mandos_serv_object_manager
 
697
                              .GetManagedObjects().items()
 
698
                              if client_interface in ifs_and_props}
 
699
        finally:
 
700
            # restore dbus logger
 
701
            dbus_logger.removeFilter(dbus_filter)
703
702
    except dbus.exceptions.DBusException as e:
704
703
        log.critical("Failed to access Mandos server through D-Bus:"
705
704
                     "\n%s", e)
706
705
        sys.exit(1)
707
 
    finally:
708
 
        # restore dbus logger
709
 
        dbus_logger.removeFilter(dbus_filter)
710
706
 
711
707
    # Compile dict of (clients: properties) to process
712
708
    clients = {}
726
722
                sys.exit(1)
727
723
 
728
724
    # Run all commands on clients
729
 
    commands = commands_from_options(options)
730
725
    for command in commands:
731
726
        command.run(mandos_serv, clients)
732
727
 
746
741
 
747
742
class Test_string_to_delta(unittest.TestCase):
748
743
    def test_handles_basic_rfc3339(self):
749
 
        self.assertEqual(string_to_delta("PT0S"),
750
 
                         datetime.timedelta())
751
 
        self.assertEqual(string_to_delta("P0D"),
752
 
                         datetime.timedelta())
753
 
        self.assertEqual(string_to_delta("PT1S"),
754
 
                         datetime.timedelta(0, 1))
755
744
        self.assertEqual(string_to_delta("PT2H"),
756
745
                         datetime.timedelta(0, 7200))
757
746
    def test_falls_back_to_pre_1_6_1_with_warning(self):
786
775
                self.__dbus_object_path__ = "objpath_{}".format(name)
787
776
                self.attributes = attributes
788
777
                self.attributes["Name"] = name
789
 
                self.calls = []
790
 
            def Set(self, interface, property, value, dbus_interface):
 
778
            def Set(interface, property, value,
 
779
                    properties_interface):
791
780
                testcase.assertEqual(interface, client_interface)
792
 
                testcase.assertEqual(dbus_interface,
 
781
                testcase.assertEqual(properties_interface,
793
782
                                     dbus.PROPERTIES_IFACE)
794
783
                self.attributes[property] = value
795
 
            def Get(self, interface, property, dbus_interface):
 
784
            def Get(interface, property, properties_interface):
796
785
                testcase.assertEqual(interface, client_interface)
797
 
                testcase.assertEqual(dbus_interface,
 
786
                testcase.assertEqual(properties_interface,
798
787
                                     dbus.PROPERTIES_IFACE)
799
788
                return self.attributes[property]
800
 
            def Approve(self, approve, dbus_interface):
801
 
                testcase.assertEqual(dbus_interface, client_interface)
802
 
                self.calls.append(("Approve", (approve,
803
 
                                               dbus_interface)))
804
 
        self.client = MockClient(
805
 
            "foo",
806
 
            KeyID=("92ed150794387c03ce684574b1139a65"
807
 
                   "94a34f895daaaf09fd8ea90a27cddb12"),
808
 
            Secret=b"secret",
809
 
            Host="foo.example.org",
810
 
            Enabled=dbus.Boolean(True),
811
 
            Timeout=300000,
812
 
            LastCheckedOK="2019-02-03T00:00:00",
813
 
            Created="2019-01-02T00:00:00",
814
 
            Interval=120000,
815
 
            Fingerprint=("778827225BA7DE539C5A"
816
 
                         "7CFA59CFF7CDBD9A5920"),
817
 
            CheckerRunning=dbus.Boolean(False),
818
 
            LastEnabled="2019-01-03T00:00:00",
819
 
            ApprovalPending=dbus.Boolean(False),
820
 
            ApprovedByDefault=dbus.Boolean(True),
821
 
            LastApprovalRequest="",
822
 
            ApprovalDelay=0,
823
 
            ApprovalDuration=1000,
824
 
            Checker="fping -q -- %(host)s",
825
 
            ExtendedTimeout=900000,
826
 
            Expires="2019-02-04T00:00:00",
827
 
            LastCheckerStatus=0)
828
 
        self.other_client = MockClient(
829
 
            "barbar",
830
 
            KeyID=("0558568eedd67d622f5c83b35a115f79"
831
 
                   "6ab612cff5ad227247e46c2b020f441c"),
832
 
            Secret=b"secretbar",
833
 
            Host="192.0.2.3",
834
 
            Enabled=dbus.Boolean(True),
835
 
            Timeout=300000,
836
 
            LastCheckedOK="2019-02-04T00:00:00",
837
 
            Created="2019-01-03T00:00:00",
838
 
            Interval=120000,
839
 
            Fingerprint=("3E393AEAEFB84C7E89E2"
840
 
                         "F547B3A107558FCA3A27"),
841
 
            CheckerRunning=dbus.Boolean(True),
842
 
            LastEnabled="2019-01-04T00:00:00",
843
 
            ApprovalPending=dbus.Boolean(False),
844
 
            ApprovedByDefault=dbus.Boolean(False),
845
 
            LastApprovalRequest="2019-01-03T00:00:00",
846
 
            ApprovalDelay=30000,
847
 
            ApprovalDuration=1000,
848
 
            Checker=":",
849
 
            ExtendedTimeout=900000,
850
 
            Expires="2019-02-05T00:00:00",
851
 
            LastCheckerStatus=-2)
852
 
        self.clients =  collections.OrderedDict(
853
 
            [
854
 
                (self.client, self.client.attributes),
855
 
                (self.other_client, self.other_client.attributes),
 
789
            def __getitem__(self, key):
 
790
                return self.attributes[key]
 
791
        self.clients = collections.OrderedDict([
 
792
            ("foo",
 
793
             MockClient(
 
794
                 "foo",
 
795
                 KeyID=("92ed150794387c03ce684574b1139a65"
 
796
                        "94a34f895daaaf09fd8ea90a27cddb12"),
 
797
                 Secret=b"secret",
 
798
                 Host="foo.example.org",
 
799
                 Enabled=dbus.Boolean(True),
 
800
                 Timeout=300000,
 
801
                 LastCheckedOK="2019-02-03T00:00:00",
 
802
                 Created="2019-01-02T00:00:00",
 
803
                 Interval=120000,
 
804
                 Fingerprint=("778827225BA7DE539C5A"
 
805
                              "7CFA59CFF7CDBD9A5920"),
 
806
                 CheckerRunning=dbus.Boolean(False),
 
807
                 LastEnabled="2019-01-03T00:00:00",
 
808
                 ApprovalPending=dbus.Boolean(False),
 
809
                 ApprovedByDefault=dbus.Boolean(True),
 
810
                 LastApprovalRequest="",
 
811
                 ApprovalDelay=0,
 
812
                 ApprovalDuration=1000,
 
813
                 Checker="fping -q -- %(host)s",
 
814
                 ExtendedTimeout=900000,
 
815
                 Expires="2019-02-04T00:00:00",
 
816
                 LastCheckerStatus=0)),
 
817
            ("barbar",
 
818
             MockClient(
 
819
                 "barbar",
 
820
                 KeyID=("0558568eedd67d622f5c83b35a115f79"
 
821
                        "6ab612cff5ad227247e46c2b020f441c"),
 
822
                 Secret=b"secretbar",
 
823
                 Host="192.0.2.3",
 
824
                 Enabled=dbus.Boolean(True),
 
825
                 Timeout=300000,
 
826
                 LastCheckedOK="2019-02-04T00:00:00",
 
827
                 Created="2019-01-03T00:00:00",
 
828
                 Interval=120000,
 
829
                 Fingerprint=("3E393AEAEFB84C7E89E2"
 
830
                              "F547B3A107558FCA3A27"),
 
831
                 CheckerRunning=dbus.Boolean(True),
 
832
                 LastEnabled="2019-01-04T00:00:00",
 
833
                 ApprovalPending=dbus.Boolean(False),
 
834
                 ApprovedByDefault=dbus.Boolean(False),
 
835
                 LastApprovalRequest="2019-01-03T00:00:00",
 
836
                 ApprovalDelay=30000,
 
837
                 ApprovalDuration=1000,
 
838
                 Checker=":",
 
839
                 ExtendedTimeout=900000,
 
840
                 Expires="2019-02-05T00:00:00",
 
841
                 LastCheckerStatus=-2)),
856
842
            ])
857
 
        self.one_client = {self.client: self.client.attributes}
858
843
 
859
844
class TestPrintTableCmd(TestCmd):
860
845
    def test_normal(self):
874
859
"""[1:-1]
875
860
        self.assertEqual(output, expected_output)
876
861
    def test_one_client(self):
877
 
        output = PrintTableCmd().output(self.one_client)
 
862
        output = PrintTableCmd().output({"foo": self.clients["foo"]})
878
863
        expected_output = """
879
864
Name Enabled Timeout  Last Successful Check
880
865
foo  Yes     00:05:00 2019-02-03T00:00:00  
881
866
"""[1:-1]
882
867
        self.assertEqual(output, expected_output)
883
868
 
884
 
class TestDumpJSONCmd(TestCmd):
885
 
    def setUp(self):
886
 
        self.expected_json = {
887
 
            "foo": {
888
 
                "Name": "foo",
889
 
                "KeyID": ("92ed150794387c03ce684574b1139a65"
890
 
                          "94a34f895daaaf09fd8ea90a27cddb12"),
891
 
                "Host": "foo.example.org",
892
 
                "Enabled": True,
893
 
                "Timeout": 300000,
894
 
                "LastCheckedOK": "2019-02-03T00:00:00",
895
 
                "Created": "2019-01-02T00:00:00",
896
 
                "Interval": 120000,
897
 
                "Fingerprint": ("778827225BA7DE539C5A"
898
 
                                "7CFA59CFF7CDBD9A5920"),
899
 
                "CheckerRunning": False,
900
 
                "LastEnabled": "2019-01-03T00:00:00",
901
 
                "ApprovalPending": False,
902
 
                "ApprovedByDefault": True,
903
 
                "LastApprovalRequest": "",
904
 
                "ApprovalDelay": 0,
905
 
                "ApprovalDuration": 1000,
906
 
                "Checker": "fping -q -- %(host)s",
907
 
                "ExtendedTimeout": 900000,
908
 
                "Expires": "2019-02-04T00:00:00",
909
 
                "LastCheckerStatus": 0,
910
 
            },
911
 
            "barbar": {
912
 
                "Name": "barbar",
913
 
                "KeyID": ("0558568eedd67d622f5c83b35a115f79"
914
 
                          "6ab612cff5ad227247e46c2b020f441c"),
915
 
                "Host": "192.0.2.3",
916
 
                "Enabled": True,
917
 
                "Timeout": 300000,
918
 
                "LastCheckedOK": "2019-02-04T00:00:00",
919
 
                "Created": "2019-01-03T00:00:00",
920
 
                "Interval": 120000,
921
 
                "Fingerprint": ("3E393AEAEFB84C7E89E2"
922
 
                                "F547B3A107558FCA3A27"),
923
 
                "CheckerRunning": True,
924
 
                "LastEnabled": "2019-01-04T00:00:00",
925
 
                "ApprovalPending": False,
926
 
                "ApprovedByDefault": False,
927
 
                "LastApprovalRequest": "2019-01-03T00:00:00",
928
 
                "ApprovalDelay": 30000,
929
 
                "ApprovalDuration": 1000,
930
 
                "Checker": ":",
931
 
                "ExtendedTimeout": 900000,
932
 
                "Expires": "2019-02-05T00:00:00",
933
 
                "LastCheckerStatus": -2,
934
 
            },
935
 
        }
936
 
        return super(TestDumpJSONCmd, self).setUp()
937
 
    def test_normal(self):
938
 
        json_data = json.loads(DumpJSONCmd().output(self.clients))
939
 
        self.assertDictEqual(json_data, self.expected_json)
940
 
    def test_one_client(self):
941
 
        clients = self.one_client
942
 
        json_data = json.loads(DumpJSONCmd().output(clients))
943
 
        expected_json = {"foo": self.expected_json["foo"]}
944
 
        self.assertDictEqual(json_data, expected_json)
945
 
 
946
 
class TestIsEnabledCmd(TestCmd):
947
 
    def test_is_enabled(self):
948
 
        self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
949
 
                            for client, properties in self.clients.items()))
950
 
    def test_is_enabled_run_exits_successfully(self):
951
 
        with self.assertRaises(SystemExit) as e:
952
 
            IsEnabledCmd().run(None, self.one_client)
953
 
        if e.exception.code is not None:
954
 
            self.assertEqual(e.exception.code, 0)
955
 
        else:
956
 
            self.assertIsNone(e.exception.code)
957
 
    def test_is_enabled_run_exits_with_failure(self):
958
 
        self.client.attributes["Enabled"] = dbus.Boolean(False)
959
 
        with self.assertRaises(SystemExit) as e:
960
 
            IsEnabledCmd().run(None, self.one_client)
961
 
        if isinstance(e.exception.code, int):
962
 
            self.assertNotEqual(e.exception.code, 0)
963
 
        else:
964
 
            self.assertIsNotNone(e.exception.code)
965
 
 
966
 
class TestRemoveCmd(TestCmd):
967
 
    def test_remove(self):
968
 
        class MockMandos(object):
969
 
            def __init__(self):
970
 
                self.calls = []
971
 
            def RemoveClient(self, dbus_path):
972
 
                self.calls.append(("RemoveClient", (dbus_path,)))
973
 
        mandos = MockMandos()
974
 
        super(TestRemoveCmd, self).setUp()
975
 
        RemoveCmd().run(mandos, self.clients)
976
 
        self.assertEqual(len(mandos.calls), 2)
977
 
        for client in self.clients:
978
 
            self.assertIn(("RemoveClient",
979
 
                           (client.__dbus_object_path__,)),
980
 
                          mandos.calls)
981
 
 
982
 
class TestApproveCmd(TestCmd):
983
 
    def test_approve(self):
984
 
        ApproveCmd().run(None, self.clients)
985
 
        for client in self.clients:
986
 
            self.assertIn(("Approve", (True, client_interface)),
987
 
                          client.calls)
988
 
 
989
 
class TestDenyCmd(TestCmd):
990
 
    def test_deny(self):
991
 
        DenyCmd().run(None, self.clients)
992
 
        for client in self.clients:
993
 
            self.assertIn(("Approve", (False, client_interface)),
994
 
                          client.calls)
995
 
 
996
 
class TestEnableCmd(TestCmd):
997
 
    def test_enable(self):
998
 
        for client in self.clients:
999
 
            client.attributes["Enabled"] = False
1000
 
 
1001
 
        EnableCmd().run(None, self.clients)
1002
 
 
1003
 
        for client in self.clients:
1004
 
            self.assertTrue(client.attributes["Enabled"])
1005
 
 
1006
 
class TestDisableCmd(TestCmd):
1007
 
    def test_disable(self):
1008
 
        DisableCmd().run(None, self.clients)
1009
 
 
1010
 
        for client in self.clients:
1011
 
            self.assertFalse(client.attributes["Enabled"])
1012
 
 
1013
 
class Unique(object):
1014
 
    """Class for objects which exist only to be unique objects, since
1015
 
unittest.mock.sentinel only exists in Python 3.3"""
1016
 
 
1017
 
class TestPropertyCmd(TestCmd):
1018
 
    """Abstract class for tests of PropertyCmd classes"""
1019
 
    def runTest(self):
1020
 
        if not hasattr(self, "command"):
1021
 
            return
1022
 
        values_to_get = getattr(self, "values_to_get",
1023
 
                                self.values_to_set)
1024
 
        for value_to_set, value_to_get in zip(self.values_to_set,
1025
 
                                              values_to_get):
1026
 
            for client in self.clients:
1027
 
                old_value = client.attributes[self.property]
1028
 
                self.assertNotIsInstance(old_value, Unique)
1029
 
                client.attributes[self.property] = Unique()
1030
 
            self.run_command(value_to_set, self.clients)
1031
 
            for client in self.clients:
1032
 
                value = client.attributes[self.property]
1033
 
                self.assertNotIsInstance(value, Unique)
1034
 
                self.assertEqual(value, value_to_get)
1035
 
    def run_command(self, value, clients):
1036
 
        self.command().run(None, clients)
1037
 
 
1038
 
class TestBumpTimeoutCmd(TestPropertyCmd):
1039
 
    command = BumpTimeoutCmd
1040
 
    property = "LastCheckedOK"
1041
 
    values_to_set = [""]
1042
 
 
1043
 
class TestStartCheckerCmd(TestPropertyCmd):
1044
 
    command = StartCheckerCmd
1045
 
    property = "CheckerRunning"
1046
 
    values_to_set = [dbus.Boolean(True)]
1047
 
 
1048
 
class TestStopCheckerCmd(TestPropertyCmd):
1049
 
    command = StopCheckerCmd
1050
 
    property = "CheckerRunning"
1051
 
    values_to_set = [dbus.Boolean(False)]
1052
 
 
1053
 
class TestApproveByDefaultCmd(TestPropertyCmd):
1054
 
    command = ApproveByDefaultCmd
1055
 
    property = "ApprovedByDefault"
1056
 
    values_to_set = [dbus.Boolean(True)]
1057
 
 
1058
 
class TestDenyByDefaultCmd(TestPropertyCmd):
1059
 
    command = DenyByDefaultCmd
1060
 
    property = "ApprovedByDefault"
1061
 
    values_to_set = [dbus.Boolean(False)]
1062
 
 
1063
 
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1064
 
    """Abstract class for tests of PropertyCmd classes using the
1065
 
ValueArgumentMixIn"""
1066
 
    def runTest(self):
1067
 
        if type(self) is TestValueArgumentPropertyCmd:
1068
 
            return
1069
 
        return super(TestValueArgumentPropertyCmd, self).runTest()
1070
 
    def run_command(self, value, clients):
1071
 
        self.command(value).run(None, clients)
1072
 
 
1073
 
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1074
 
    command = SetCheckerCmd
1075
 
    property = "Checker"
1076
 
    values_to_set = ["", ":", "fping -q -- %s"]
1077
 
 
1078
 
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1079
 
    command = SetHostCmd
1080
 
    property = "Host"
1081
 
    values_to_set = ["192.0.2.3", "foo.example.org"]
1082
 
 
1083
 
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1084
 
    command = SetSecretCmd
1085
 
    property = "Secret"
1086
 
    values_to_set = [b"", b"secret"]
1087
 
 
1088
 
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1089
 
    command = SetTimeoutCmd
1090
 
    property = "Timeout"
1091
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1092
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1093
 
 
1094
 
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1095
 
    command = SetExtendedTimeoutCmd
1096
 
    property = "ExtendedTimeout"
1097
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1098
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1099
 
 
1100
 
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1101
 
    command = SetIntervalCmd
1102
 
    property = "Interval"
1103
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1104
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1105
 
 
1106
 
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1107
 
    command = SetApprovalDelayCmd
1108
 
    property = "ApprovalDelay"
1109
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1110
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1111
 
 
1112
 
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1113
 
    command = SetApprovalDurationCmd
1114
 
    property = "ApprovalDuration"
1115
 
    values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1116
 
    values_to_get = [0, 300000, 1000, 120000, 31449600000]
1117
 
 
1118
 
class TestOptions(unittest.TestCase):
1119
 
    def setUp(self):
1120
 
        self.parser = argparse.ArgumentParser()
1121
 
        add_command_line_options(self.parser)
1122
 
    def commands_from_args(self, args):
1123
 
        self.options = self.parser.parse_args(args)
1124
 
        return commands_from_options(self.options)
1125
 
    def test_default_is_show_table(self):
1126
 
        commands = self.commands_from_args([])
1127
 
        self.assertEqual(len(commands), 1)
1128
 
        command = commands[0]
1129
 
        self.assertIsInstance(command, PrintTableCmd)
1130
 
        self.assertEqual(command.verbose, False)
1131
 
    def test_show_table_verbose(self):
1132
 
        commands = self.commands_from_args(["--verbose"])
1133
 
        self.assertEqual(len(commands), 1)
1134
 
        command = commands[0]
1135
 
        self.assertIsInstance(command, PrintTableCmd)
1136
 
        self.assertEqual(command.verbose, True)
1137
 
    def test_enable(self):
1138
 
        commands = self.commands_from_args(["--enable", "foo"])
1139
 
        self.assertEqual(len(commands), 1)
1140
 
        command = commands[0]
1141
 
        self.assertIsInstance(command, EnableCmd)
1142
 
        self.assertEqual(self.options.client, ["foo"])
1143
 
 
1144
869
 
1145
870
 
1146
871
def should_only_run_tests():