/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-02-28 21:07:58 UTC
  • Revision ID: teddy@recompile.se-20190228210758-zt2bfeer3v66porp
mandos-ctl (rfc3339_duration_to_delta): Improve tests

* mandos-ctl (rfc3339_duration_to_delta): Add test for month-only
                                          value, and tests that
                                          invalid values raises
                                          ValueError.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/python
2
 
# -*- mode: python; coding: utf-8 -*-
 
2
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
3
3
#
4
4
# Mandos Monitor - Control and monitor the Mandos server
5
5
#
6
 
# Copyright © 2008-2018 Teddy Hogeborn
7
 
# Copyright © 2008-2018 Björn Påhlsson
 
6
# Copyright © 2008-2019 Teddy Hogeborn
 
7
# Copyright © 2008-2019 Björn Påhlsson
8
8
#
9
9
# This file is part of Mandos.
10
10
#
40
40
import os
41
41
import collections
42
42
import json
 
43
import unittest
 
44
import logging
43
45
 
44
46
import dbus
45
47
 
 
48
# Show warnings by default
 
49
if not sys.warnoptions:
 
50
    import warnings
 
51
    warnings.simplefilter("default")
 
52
 
 
53
log = logging.getLogger(sys.argv[0])
 
54
logging.basicConfig(level="INFO", # Show info level messages
 
55
                    format="%(message)s") # Show basic log messages
 
56
 
 
57
logging.captureWarnings(True)   # Show warnings via the logging system
 
58
 
46
59
if sys.version_info.major == 2:
47
60
    str = unicode
48
61
 
58
71
    "Interval": "Interval",
59
72
    "Host": "Host",
60
73
    "Fingerprint": "Fingerprint",
 
74
    "KeyID": "Key ID",
61
75
    "CheckerRunning": "Check Is Running",
62
76
    "LastEnabled": "Last Enabled",
63
77
    "ApprovalPending": "Approval Is Pending",
75
89
server_path = "/"
76
90
server_interface = domain + ".Mandos"
77
91
client_interface = domain + ".Mandos.Client"
78
 
version = "1.7.18"
 
92
version = "1.8.3"
79
93
 
80
94
 
81
95
try:
102
116
    datetime.timedelta(0, 60)
103
117
    >>> rfc3339_duration_to_delta("PT60M")
104
118
    datetime.timedelta(0, 3600)
 
119
    >>> rfc3339_duration_to_delta("P60M")
 
120
    datetime.timedelta(1680)
105
121
    >>> rfc3339_duration_to_delta("PT24H")
106
122
    datetime.timedelta(1)
107
123
    >>> rfc3339_duration_to_delta("P1W")
110
126
    datetime.timedelta(0, 330)
111
127
    >>> rfc3339_duration_to_delta("P1DT3M20S")
112
128
    datetime.timedelta(1, 200)
 
129
    >>> # Can not be empty:
 
130
    >>> rfc3339_duration_to_delta("")
 
131
    Traceback (most recent call last):
 
132
    ...
 
133
    ValueError: Invalid RFC 3339 duration: u''
 
134
    >>> # Must start with "P":
 
135
    >>> rfc3339_duration_to_delta("1D")
 
136
    Traceback (most recent call last):
 
137
    ...
 
138
    ValueError: Invalid RFC 3339 duration: u'1D'
 
139
    >>> # Must use correct order
 
140
    >>> rfc3339_duration_to_delta("PT1S2M")
 
141
    Traceback (most recent call last):
 
142
    ...
 
143
    ValueError: Invalid RFC 3339 duration: u'PT1S2M'
 
144
    >>> # Time needs time marker
 
145
    >>> rfc3339_duration_to_delta("P1H2S")
 
146
    Traceback (most recent call last):
 
147
    ...
 
148
    ValueError: Invalid RFC 3339 duration: u'P1H2S'
 
149
    >>> # Weeks can not be combined with anything else
 
150
    >>> rfc3339_duration_to_delta("P1D2W")
 
151
    Traceback (most recent call last):
 
152
    ...
 
153
    ValueError: Invalid RFC 3339 duration: u'P1D2W'
 
154
    >>> rfc3339_duration_to_delta("P2W2H")
 
155
    Traceback (most recent call last):
 
156
    ...
 
157
    ValueError: Invalid RFC 3339 duration: u'P2W2H'
113
158
    """
114
159
 
115
160
    # Parsing an RFC 3339 duration with regular expressions is not
235
280
 
236
281
def print_clients(clients, keywords):
237
282
    def valuetostring(value, keyword):
238
 
        if type(value) is dbus.Boolean:
 
283
        if isinstance(value, dbus.Boolean):
239
284
            return "Yes" if value else "No"
240
285
        if keyword in ("Timeout", "Interval", "ApprovalDelay",
241
286
                       "ApprovalDuration", "ExtendedTimeout"):
344
389
    if options.all and not has_actions(options):
345
390
        parser.error("--all requires an action.")
346
391
 
347
 
    if options.check:
348
 
        import doctest
349
 
        fail_count, test_count = doctest.testmod()
350
 
        sys.exit(os.EX_OK if fail_count == 0 else 1)
351
 
 
352
392
    try:
353
393
        bus = dbus.SystemBus()
354
394
        mandos_dbus_objc = bus.get_object(busname, server_path)
355
395
    except dbus.exceptions.DBusException:
356
 
        print("Could not connect to Mandos server", file=sys.stderr)
 
396
        log.critical("Could not connect to Mandos server")
357
397
        sys.exit(1)
358
398
 
359
399
    mandos_serv = dbus.Interface(mandos_dbus_objc,
378
418
            os.dup2(stderrcopy, sys.stderr.fileno())
379
419
            os.close(stderrcopy)
380
420
    except dbus.exceptions.DBusException as e:
381
 
        print("Access denied: "
382
 
              "Accessing mandos server through D-Bus: {}".format(e),
383
 
              file=sys.stderr)
 
421
        log.critical("Failed to access Mandos server through D-Bus:"
 
422
                     "\n%s", e)
384
423
        sys.exit(1)
385
424
 
386
425
    # Compile dict of (clients: properties) to process
397
436
                    clients[client_objc] = client
398
437
                    break
399
438
            else:
400
 
                print("Client not found on server: {!r}"
401
 
                      .format(name), file=sys.stderr)
 
439
                log.critical("Client not found on server: %r", name)
402
440
                sys.exit(1)
403
441
 
404
442
    if not has_actions(options) and clients:
405
443
        if options.verbose or options.dump_json:
406
444
            keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
407
 
                        "Created", "Interval", "Host", "Fingerprint",
408
 
                        "CheckerRunning", "LastEnabled",
409
 
                        "ApprovalPending", "ApprovedByDefault",
410
 
                        "LastApprovalRequest", "ApprovalDelay",
411
 
                        "ApprovalDuration", "Checker",
412
 
                        "ExtendedTimeout", "Expires",
 
445
                        "Created", "Interval", "Host", "KeyID",
 
446
                        "Fingerprint", "CheckerRunning",
 
447
                        "LastEnabled", "ApprovalPending",
 
448
                        "ApprovedByDefault", "LastApprovalRequest",
 
449
                        "ApprovalDelay", "ApprovalDuration",
 
450
                        "Checker", "ExtendedTimeout", "Expires",
413
451
                        "LastCheckerStatus")
414
452
        else:
415
453
            keywords = defaultkeywords
492
530
                client.Approve(dbus.Boolean(False),
493
531
                               dbus_interface=client_interface)
494
532
 
 
533
 
 
534
class Test_milliseconds_to_string(unittest.TestCase):
 
535
    def test_all(self):
 
536
        self.assertEqual(milliseconds_to_string(93785000),
 
537
                         "1T02:03:05")
 
538
    def test_no_days(self):
 
539
        self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
 
540
    def test_all_zero(self):
 
541
        self.assertEqual(milliseconds_to_string(0), "00:00:00")
 
542
    def test_no_fractional_seconds(self):
 
543
        self.assertEqual(milliseconds_to_string(400), "00:00:00")
 
544
        self.assertEqual(milliseconds_to_string(900), "00:00:00")
 
545
        self.assertEqual(milliseconds_to_string(1900), "00:00:01")
 
546
 
 
547
 
 
548
def should_only_run_tests():
 
549
    parser = argparse.ArgumentParser(add_help=False)
 
550
    parser.add_argument("--check", action='store_true')
 
551
    args, unknown_args = parser.parse_known_args()
 
552
    run_tests = args.check
 
553
    if run_tests:
 
554
        # Remove --check argument from sys.argv
 
555
        sys.argv[1:] = unknown_args
 
556
    return run_tests
 
557
 
 
558
# Add all tests from doctest strings
 
559
def load_tests(loader, tests, none):
 
560
    import doctest
 
561
    tests.addTests(doctest.DocTestSuite())
 
562
    return tests
495
563
 
496
564
if __name__ == "__main__":
497
 
    main()
 
565
    if should_only_run_tests():
 
566
        # Call using ./tdd-python-script --check [--verbose]
 
567
        unittest.main()
 
568
    else:
 
569
        main()