/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-02-28 21:07:58 UTC
  • mto: (237.7.594 trunk)
  • mto: This revision was merged to the branch mainline in revision 382.
  • Revision ID: teddy@recompile.se-20190228210758-zt2bfeer3v66porp
mandos-ctl (rfc3339_duration_to_delta): Improve tests

* mandos-ctl (rfc3339_duration_to_delta): Add test for month-only
                                          value, and tests that
                                          invalid values raises
                                          ValueError.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/python
2
 
# -*- mode: python; coding: utf-8 -*-
 
2
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
3
3
#
4
4
# Mandos Monitor - Control and monitor the Mandos server
5
5
#
6
 
# Copyright © 2008-2016 Teddy Hogeborn
7
 
# Copyright © 2008-2016 Björn Påhlsson
8
 
#
9
 
# This program is free software: you can redistribute it and/or modify
10
 
# it under the terms of the GNU General Public License as published by
 
6
# Copyright © 2008-2019 Teddy Hogeborn
 
7
# Copyright © 2008-2019 Björn Påhlsson
 
8
#
 
9
# This file is part of Mandos.
 
10
#
 
11
# Mandos is free software: you can redistribute it and/or modify it
 
12
# under the terms of the GNU General Public License as published by
11
13
# the Free Software Foundation, either version 3 of the License, or
12
14
# (at your option) any later version.
13
15
#
14
 
#     This program is distributed in the hope that it will be useful,
15
 
#     but WITHOUT ANY WARRANTY; without even the implied warranty of
 
16
#     Mandos is distributed in the hope that it will be useful, but
 
17
#     WITHOUT ANY WARRANTY; without even the implied warranty of
16
18
#     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17
19
#     GNU General Public License for more details.
18
20
#
19
21
# You should have received a copy of the GNU General Public License
20
 
# along with this program.  If not, see
21
 
# <http://www.gnu.org/licenses/>.
 
22
# along with Mandos.  If not, see <http://www.gnu.org/licenses/>.
22
23
#
23
24
# Contact the authors at <mandos@recompile.se>.
24
25
#
38
39
import re
39
40
import os
40
41
import collections
41
 
import doctest
42
42
import json
 
43
import unittest
 
44
import logging
43
45
 
44
46
import dbus
45
47
 
 
48
# Show warnings by default
 
49
if not sys.warnoptions:
 
50
    import warnings
 
51
    warnings.simplefilter("default")
 
52
 
 
53
log = logging.getLogger(sys.argv[0])
 
54
logging.basicConfig(level="INFO", # Show info level messages
 
55
                    format="%(message)s") # Show basic log messages
 
56
 
 
57
logging.captureWarnings(True)   # Show warnings via the logging system
 
58
 
46
59
if sys.version_info.major == 2:
47
60
    str = unicode
48
61
 
58
71
    "Interval": "Interval",
59
72
    "Host": "Host",
60
73
    "Fingerprint": "Fingerprint",
 
74
    "KeyID": "Key ID",
61
75
    "CheckerRunning": "Check Is Running",
62
76
    "LastEnabled": "Last Enabled",
63
77
    "ApprovalPending": "Approval Is Pending",
75
89
server_path = "/"
76
90
server_interface = domain + ".Mandos"
77
91
client_interface = domain + ".Mandos.Client"
78
 
version = "1.7.13"
 
92
version = "1.8.3"
79
93
 
80
94
 
81
95
try:
102
116
    datetime.timedelta(0, 60)
103
117
    >>> rfc3339_duration_to_delta("PT60M")
104
118
    datetime.timedelta(0, 3600)
 
119
    >>> rfc3339_duration_to_delta("P60M")
 
120
    datetime.timedelta(1680)
105
121
    >>> rfc3339_duration_to_delta("PT24H")
106
122
    datetime.timedelta(1)
107
123
    >>> rfc3339_duration_to_delta("P1W")
110
126
    datetime.timedelta(0, 330)
111
127
    >>> rfc3339_duration_to_delta("P1DT3M20S")
112
128
    datetime.timedelta(1, 200)
 
129
    >>> # Can not be empty:
 
130
    >>> rfc3339_duration_to_delta("")
 
131
    Traceback (most recent call last):
 
132
    ...
 
133
    ValueError: Invalid RFC 3339 duration: u''
 
134
    >>> # Must start with "P":
 
135
    >>> rfc3339_duration_to_delta("1D")
 
136
    Traceback (most recent call last):
 
137
    ...
 
138
    ValueError: Invalid RFC 3339 duration: u'1D'
 
139
    >>> # Must use correct order
 
140
    >>> rfc3339_duration_to_delta("PT1S2M")
 
141
    Traceback (most recent call last):
 
142
    ...
 
143
    ValueError: Invalid RFC 3339 duration: u'PT1S2M'
 
144
    >>> # Time needs time marker
 
145
    >>> rfc3339_duration_to_delta("P1H2S")
 
146
    Traceback (most recent call last):
 
147
    ...
 
148
    ValueError: Invalid RFC 3339 duration: u'P1H2S'
 
149
    >>> # Weeks can not be combined with anything else
 
150
    >>> rfc3339_duration_to_delta("P1D2W")
 
151
    Traceback (most recent call last):
 
152
    ...
 
153
    ValueError: Invalid RFC 3339 duration: u'P1D2W'
 
154
    >>> rfc3339_duration_to_delta("P2W2H")
 
155
    Traceback (most recent call last):
 
156
    ...
 
157
    ValueError: Invalid RFC 3339 duration: u'P2W2H'
113
158
    """
114
159
 
115
160
    # Parsing an RFC 3339 duration with regular expressions is not
235
280
 
236
281
def print_clients(clients, keywords):
237
282
    def valuetostring(value, keyword):
238
 
        if type(value) is dbus.Boolean:
 
283
        if isinstance(value, dbus.Boolean):
239
284
            return "Yes" if value else "No"
240
285
        if keyword in ("Timeout", "Interval", "ApprovalDelay",
241
286
                       "ApprovalDuration", "ExtendedTimeout"):
344
389
    if options.all and not has_actions(options):
345
390
        parser.error("--all requires an action.")
346
391
 
347
 
    if options.check:
348
 
        fail_count, test_count = doctest.testmod()
349
 
        sys.exit(os.EX_OK if fail_count == 0 else 1)
350
 
 
351
392
    try:
352
393
        bus = dbus.SystemBus()
353
394
        mandos_dbus_objc = bus.get_object(busname, server_path)
354
395
    except dbus.exceptions.DBusException:
355
 
        print("Could not connect to Mandos server", file=sys.stderr)
 
396
        log.critical("Could not connect to Mandos server")
356
397
        sys.exit(1)
357
398
 
358
399
    mandos_serv = dbus.Interface(mandos_dbus_objc,
377
418
            os.dup2(stderrcopy, sys.stderr.fileno())
378
419
            os.close(stderrcopy)
379
420
    except dbus.exceptions.DBusException as e:
380
 
        print("Access denied: "
381
 
              "Accessing mandos server through D-Bus: {}".format(e),
382
 
              file=sys.stderr)
 
421
        log.critical("Failed to access Mandos server through D-Bus:"
 
422
                     "\n%s", e)
383
423
        sys.exit(1)
384
424
 
385
425
    # Compile dict of (clients: properties) to process
396
436
                    clients[client_objc] = client
397
437
                    break
398
438
            else:
399
 
                print("Client not found on server: {!r}"
400
 
                      .format(name), file=sys.stderr)
 
439
                log.critical("Client not found on server: %r", name)
401
440
                sys.exit(1)
402
441
 
403
442
    if not has_actions(options) and clients:
404
443
        if options.verbose or options.dump_json:
405
444
            keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
406
 
                        "Created", "Interval", "Host", "Fingerprint",
407
 
                        "CheckerRunning", "LastEnabled",
408
 
                        "ApprovalPending", "ApprovedByDefault",
409
 
                        "LastApprovalRequest", "ApprovalDelay",
410
 
                        "ApprovalDuration", "Checker",
411
 
                        "ExtendedTimeout", "Expires",
 
445
                        "Created", "Interval", "Host", "KeyID",
 
446
                        "Fingerprint", "CheckerRunning",
 
447
                        "LastEnabled", "ApprovalPending",
 
448
                        "ApprovedByDefault", "LastApprovalRequest",
 
449
                        "ApprovalDelay", "ApprovalDuration",
 
450
                        "Checker", "ExtendedTimeout", "Expires",
412
451
                        "LastCheckerStatus")
413
452
        else:
414
453
            keywords = defaultkeywords
491
530
                client.Approve(dbus.Boolean(False),
492
531
                               dbus_interface=client_interface)
493
532
 
 
533
 
 
534
class Test_milliseconds_to_string(unittest.TestCase):
 
535
    def test_all(self):
 
536
        self.assertEqual(milliseconds_to_string(93785000),
 
537
                         "1T02:03:05")
 
538
    def test_no_days(self):
 
539
        self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
 
540
    def test_all_zero(self):
 
541
        self.assertEqual(milliseconds_to_string(0), "00:00:00")
 
542
    def test_no_fractional_seconds(self):
 
543
        self.assertEqual(milliseconds_to_string(400), "00:00:00")
 
544
        self.assertEqual(milliseconds_to_string(900), "00:00:00")
 
545
        self.assertEqual(milliseconds_to_string(1900), "00:00:01")
 
546
 
 
547
 
 
548
def should_only_run_tests():
 
549
    parser = argparse.ArgumentParser(add_help=False)
 
550
    parser.add_argument("--check", action='store_true')
 
551
    args, unknown_args = parser.parse_known_args()
 
552
    run_tests = args.check
 
553
    if run_tests:
 
554
        # Remove --check argument from sys.argv
 
555
        sys.argv[1:] = unknown_args
 
556
    return run_tests
 
557
 
 
558
# Add all tests from doctest strings
 
559
def load_tests(loader, tests, none):
 
560
    import doctest
 
561
    tests.addTests(doctest.DocTestSuite())
 
562
    return tests
494
563
 
495
564
if __name__ == "__main__":
496
 
    main()
 
565
    if should_only_run_tests():
 
566
        # Call using ./tdd-python-script --check [--verbose]
 
567
        unittest.main()
 
568
    else:
 
569
        main()