/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-02-28 20:57:45 UTC
  • Revision ID: teddy@recompile.se-20190228205745-zl3bd1yoz0q343lm
mandos-ctl: Use isinstance(x, X) instead of type(x) is X

* mandos-ctl (print_clients): Don't use type(value) and check for
                              dbus.Boolean instances exactly; use
                              isinstance() to be more generic.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/python
2
 
# -*- mode: python; coding: utf-8 -*-
 
2
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
3
3
#
4
4
# Mandos Monitor - Control and monitor the Mandos server
5
5
#
6
 
# Copyright © 2008-2018 Teddy Hogeborn
7
 
# Copyright © 2008-2018 Björn Påhlsson
 
6
# Copyright © 2008-2019 Teddy Hogeborn
 
7
# Copyright © 2008-2019 Björn Påhlsson
8
8
#
9
9
# This file is part of Mandos.
10
10
#
40
40
import os
41
41
import collections
42
42
import json
 
43
import unittest
 
44
import logging
43
45
 
44
46
import dbus
45
47
 
 
48
# Show warnings by default
 
49
if not sys.warnoptions:
 
50
    import warnings
 
51
    warnings.simplefilter("default")
 
52
 
 
53
log = logging.getLogger(sys.argv[0])
 
54
logging.basicConfig(level="INFO", # Show info level messages
 
55
                    format="%(message)s") # Show basic log messages
 
56
 
 
57
logging.captureWarnings(True)   # Show warnings via the logging system
 
58
 
46
59
if sys.version_info.major == 2:
47
60
    str = unicode
48
61
 
58
71
    "Interval": "Interval",
59
72
    "Host": "Host",
60
73
    "Fingerprint": "Fingerprint",
 
74
    "KeyID": "Key ID",
61
75
    "CheckerRunning": "Check Is Running",
62
76
    "LastEnabled": "Last Enabled",
63
77
    "ApprovalPending": "Approval Is Pending",
75
89
server_path = "/"
76
90
server_interface = domain + ".Mandos"
77
91
client_interface = domain + ".Mandos.Client"
78
 
version = "1.7.16"
 
92
version = "1.8.3"
79
93
 
80
94
 
81
95
try:
235
249
 
236
250
def print_clients(clients, keywords):
237
251
    def valuetostring(value, keyword):
238
 
        if type(value) is dbus.Boolean:
 
252
        if isinstance(value, dbus.Boolean):
239
253
            return "Yes" if value else "No"
240
254
        if keyword in ("Timeout", "Interval", "ApprovalDelay",
241
255
                       "ApprovalDuration", "ExtendedTimeout"):
344
358
    if options.all and not has_actions(options):
345
359
        parser.error("--all requires an action.")
346
360
 
347
 
    if options.check:
348
 
        import doctest
349
 
        fail_count, test_count = doctest.testmod()
350
 
        sys.exit(os.EX_OK if fail_count == 0 else 1)
351
 
 
352
361
    try:
353
362
        bus = dbus.SystemBus()
354
363
        mandos_dbus_objc = bus.get_object(busname, server_path)
355
364
    except dbus.exceptions.DBusException:
356
 
        print("Could not connect to Mandos server", file=sys.stderr)
 
365
        log.critical("Could not connect to Mandos server")
357
366
        sys.exit(1)
358
367
 
359
368
    mandos_serv = dbus.Interface(mandos_dbus_objc,
378
387
            os.dup2(stderrcopy, sys.stderr.fileno())
379
388
            os.close(stderrcopy)
380
389
    except dbus.exceptions.DBusException as e:
381
 
        print("Access denied: "
382
 
              "Accessing mandos server through D-Bus: {}".format(e),
383
 
              file=sys.stderr)
 
390
        log.critical("Failed to access Mandos server through D-Bus:"
 
391
                     "\n%s", e)
384
392
        sys.exit(1)
385
393
 
386
394
    # Compile dict of (clients: properties) to process
397
405
                    clients[client_objc] = client
398
406
                    break
399
407
            else:
400
 
                print("Client not found on server: {!r}"
401
 
                      .format(name), file=sys.stderr)
 
408
                log.critical("Client not found on server: %r", name)
402
409
                sys.exit(1)
403
410
 
404
411
    if not has_actions(options) and clients:
405
412
        if options.verbose or options.dump_json:
406
413
            keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
407
 
                        "Created", "Interval", "Host", "Fingerprint",
408
 
                        "CheckerRunning", "LastEnabled",
409
 
                        "ApprovalPending", "ApprovedByDefault",
410
 
                        "LastApprovalRequest", "ApprovalDelay",
411
 
                        "ApprovalDuration", "Checker",
412
 
                        "ExtendedTimeout", "Expires",
 
414
                        "Created", "Interval", "Host", "KeyID",
 
415
                        "Fingerprint", "CheckerRunning",
 
416
                        "LastEnabled", "ApprovalPending",
 
417
                        "ApprovedByDefault", "LastApprovalRequest",
 
418
                        "ApprovalDelay", "ApprovalDuration",
 
419
                        "Checker", "ExtendedTimeout", "Expires",
413
420
                        "LastCheckerStatus")
414
421
        else:
415
422
            keywords = defaultkeywords
492
499
                client.Approve(dbus.Boolean(False),
493
500
                               dbus_interface=client_interface)
494
501
 
 
502
 
 
503
class Test_milliseconds_to_string(unittest.TestCase):
 
504
    def test_all(self):
 
505
        self.assertEqual(milliseconds_to_string(93785000),
 
506
                         "1T02:03:05")
 
507
    def test_no_days(self):
 
508
        self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
 
509
    def test_all_zero(self):
 
510
        self.assertEqual(milliseconds_to_string(0), "00:00:00")
 
511
    def test_no_fractional_seconds(self):
 
512
        self.assertEqual(milliseconds_to_string(400), "00:00:00")
 
513
        self.assertEqual(milliseconds_to_string(900), "00:00:00")
 
514
        self.assertEqual(milliseconds_to_string(1900), "00:00:01")
 
515
 
 
516
 
 
517
def should_only_run_tests():
 
518
    parser = argparse.ArgumentParser(add_help=False)
 
519
    parser.add_argument("--check", action='store_true')
 
520
    args, unknown_args = parser.parse_known_args()
 
521
    run_tests = args.check
 
522
    if run_tests:
 
523
        # Remove --check argument from sys.argv
 
524
        sys.argv[1:] = unknown_args
 
525
    return run_tests
 
526
 
 
527
# Add all tests from doctest strings
 
528
def load_tests(loader, tests, none):
 
529
    import doctest
 
530
    tests.addTests(doctest.DocTestSuite())
 
531
    return tests
495
532
 
496
533
if __name__ == "__main__":
497
 
    main()
 
534
    if should_only_run_tests():
 
535
        # Call using ./tdd-python-script --check [--verbose]
 
536
        unittest.main()
 
537
    else:
 
538
        main()