/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-02-10 10:58:42 UTC
  • mfrom: (237.4.106 release)
  • Revision ID: teddy@recompile.se-20190210105842-l7htgtxjr23p4heb
Merge from release branch

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/python
2
 
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
 
2
# -*- mode: python; coding: utf-8 -*-
3
3
#
4
4
# Mandos Monitor - Control and monitor the Mandos server
5
5
#
40
40
import os
41
41
import collections
42
42
import json
43
 
import unittest
44
 
import logging
45
43
 
46
44
import dbus
47
45
 
48
 
# Show warnings by default
49
 
if not sys.warnoptions:
50
 
    import warnings
51
 
    warnings.simplefilter("default")
52
 
 
53
 
log = logging.getLogger(sys.argv[0])
54
 
logging.basicConfig(level="INFO", # Show info level messages
55
 
                    format="%(message)s") # Show basic log messages
56
 
 
57
 
logging.captureWarnings(True)   # Show warnings via the logging system
58
 
 
59
46
if sys.version_info.major == 2:
60
47
    str = unicode
61
48
 
89
76
server_path = "/"
90
77
server_interface = domain + ".Mandos"
91
78
client_interface = domain + ".Mandos.Client"
92
 
version = "1.8.3"
 
79
version = "1.8.2"
93
80
 
94
81
 
95
82
try:
116
103
    datetime.timedelta(0, 60)
117
104
    >>> rfc3339_duration_to_delta("PT60M")
118
105
    datetime.timedelta(0, 3600)
119
 
    >>> rfc3339_duration_to_delta("P60M")
120
 
    datetime.timedelta(1680)
121
106
    >>> rfc3339_duration_to_delta("PT24H")
122
107
    datetime.timedelta(1)
123
108
    >>> rfc3339_duration_to_delta("P1W")
126
111
    datetime.timedelta(0, 330)
127
112
    >>> rfc3339_duration_to_delta("P1DT3M20S")
128
113
    datetime.timedelta(1, 200)
129
 
    >>> # Can not be empty:
130
 
    >>> rfc3339_duration_to_delta("")
131
 
    Traceback (most recent call last):
132
 
    ...
133
 
    ValueError: Invalid RFC 3339 duration: u''
134
 
    >>> # Must start with "P":
135
 
    >>> rfc3339_duration_to_delta("1D")
136
 
    Traceback (most recent call last):
137
 
    ...
138
 
    ValueError: Invalid RFC 3339 duration: u'1D'
139
 
    >>> # Must use correct order
140
 
    >>> rfc3339_duration_to_delta("PT1S2M")
141
 
    Traceback (most recent call last):
142
 
    ...
143
 
    ValueError: Invalid RFC 3339 duration: u'PT1S2M'
144
 
    >>> # Time needs time marker
145
 
    >>> rfc3339_duration_to_delta("P1H2S")
146
 
    Traceback (most recent call last):
147
 
    ...
148
 
    ValueError: Invalid RFC 3339 duration: u'P1H2S'
149
 
    >>> # Weeks can not be combined with anything else
150
 
    >>> rfc3339_duration_to_delta("P1D2W")
151
 
    Traceback (most recent call last):
152
 
    ...
153
 
    ValueError: Invalid RFC 3339 duration: u'P1D2W'
154
 
    >>> rfc3339_duration_to_delta("P2W2H")
155
 
    Traceback (most recent call last):
156
 
    ...
157
 
    ValueError: Invalid RFC 3339 duration: u'P2W2H'
158
114
    """
159
115
 
160
116
    # Parsing an RFC 3339 duration with regular expressions is not
280
236
 
281
237
def print_clients(clients, keywords):
282
238
    def valuetostring(value, keyword):
283
 
        if isinstance(value, dbus.Boolean):
 
239
        if type(value) is dbus.Boolean:
284
240
            return "Yes" if value else "No"
285
241
        if keyword in ("Timeout", "Interval", "ApprovalDelay",
286
242
                       "ApprovalDuration", "ExtendedTimeout"):
389
345
    if options.all and not has_actions(options):
390
346
        parser.error("--all requires an action.")
391
347
 
 
348
    if options.check:
 
349
        import doctest
 
350
        fail_count, test_count = doctest.testmod()
 
351
        sys.exit(os.EX_OK if fail_count == 0 else 1)
 
352
 
392
353
    try:
393
354
        bus = dbus.SystemBus()
394
355
        mandos_dbus_objc = bus.get_object(busname, server_path)
395
356
    except dbus.exceptions.DBusException:
396
 
        log.critical("Could not connect to Mandos server")
 
357
        print("Could not connect to Mandos server", file=sys.stderr)
397
358
        sys.exit(1)
398
359
 
399
360
    mandos_serv = dbus.Interface(mandos_dbus_objc,
418
379
            os.dup2(stderrcopy, sys.stderr.fileno())
419
380
            os.close(stderrcopy)
420
381
    except dbus.exceptions.DBusException as e:
421
 
        log.critical("Failed to access Mandos server through D-Bus:"
422
 
                     "\n%s", e)
 
382
        print("Access denied: "
 
383
              "Accessing mandos server through D-Bus: {}".format(e),
 
384
              file=sys.stderr)
423
385
        sys.exit(1)
424
386
 
425
387
    # Compile dict of (clients: properties) to process
436
398
                    clients[client_objc] = client
437
399
                    break
438
400
            else:
439
 
                log.critical("Client not found on server: %r", name)
 
401
                print("Client not found on server: {!r}"
 
402
                      .format(name), file=sys.stderr)
440
403
                sys.exit(1)
441
404
 
442
405
    if not has_actions(options) and clients:
530
493
                client.Approve(dbus.Boolean(False),
531
494
                               dbus_interface=client_interface)
532
495
 
533
 
 
534
 
class Test_milliseconds_to_string(unittest.TestCase):
535
 
    def test_all(self):
536
 
        self.assertEqual(milliseconds_to_string(93785000),
537
 
                         "1T02:03:05")
538
 
    def test_no_days(self):
539
 
        self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
540
 
    def test_all_zero(self):
541
 
        self.assertEqual(milliseconds_to_string(0), "00:00:00")
542
 
    def test_no_fractional_seconds(self):
543
 
        self.assertEqual(milliseconds_to_string(400), "00:00:00")
544
 
        self.assertEqual(milliseconds_to_string(900), "00:00:00")
545
 
        self.assertEqual(milliseconds_to_string(1900), "00:00:01")
546
 
 
547
 
 
548
 
def should_only_run_tests():
549
 
    parser = argparse.ArgumentParser(add_help=False)
550
 
    parser.add_argument("--check", action='store_true')
551
 
    args, unknown_args = parser.parse_known_args()
552
 
    run_tests = args.check
553
 
    if run_tests:
554
 
        # Remove --check argument from sys.argv
555
 
        sys.argv[1:] = unknown_args
556
 
    return run_tests
557
 
 
558
 
# Add all tests from doctest strings
559
 
def load_tests(loader, tests, none):
560
 
    import doctest
561
 
    tests.addTests(doctest.DocTestSuite())
562
 
    return tests
563
496
 
564
497
if __name__ == "__main__":
565
 
    if should_only_run_tests():
566
 
        # Call using ./tdd-python-script --check [--verbose]
567
 
        unittest.main()
568
 
    else:
569
 
        main()
 
498
    main()