/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-02 01:40:17 UTC
  • Revision ID: teddy@recompile.se-20190302014017-w6c8qytzlroqej24
mandos-ctl: Refactor

* mandos-ctl (table_rows_of_clients): Refactor into TableOfClients
                                      class.  All callers changed.
  (class TableOfClients): New.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/python
2
 
# -*- mode: python; coding: utf-8 -*-
 
2
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
3
3
#
4
4
# Mandos Monitor - Control and monitor the Mandos server
5
5
#
6
 
# Copyright © 2008-2018 Teddy Hogeborn
7
 
# Copyright © 2008-2018 Björn Påhlsson
 
6
# Copyright © 2008-2019 Teddy Hogeborn
 
7
# Copyright © 2008-2019 Björn Påhlsson
8
8
#
9
9
# This file is part of Mandos.
10
10
#
40
40
import os
41
41
import collections
42
42
import json
 
43
import unittest
 
44
import logging
43
45
 
44
46
import dbus
45
47
 
 
48
# Show warnings by default
 
49
if not sys.warnoptions:
 
50
    import warnings
 
51
    warnings.simplefilter("default")
 
52
 
 
53
log = logging.getLogger(sys.argv[0])
 
54
logging.basicConfig(level="INFO", # Show info level messages
 
55
                    format="%(message)s") # Show basic log messages
 
56
 
 
57
logging.captureWarnings(True)   # Show warnings via the logging system
 
58
 
46
59
if sys.version_info.major == 2:
47
60
    str = unicode
48
61
 
76
89
server_path = "/"
77
90
server_interface = domain + ".Mandos"
78
91
client_interface = domain + ".Mandos.Client"
79
 
version = "1.7.20"
 
92
version = "1.8.3"
80
93
 
81
94
 
82
95
try:
103
116
    datetime.timedelta(0, 60)
104
117
    >>> rfc3339_duration_to_delta("PT60M")
105
118
    datetime.timedelta(0, 3600)
 
119
    >>> rfc3339_duration_to_delta("P60M")
 
120
    datetime.timedelta(1680)
106
121
    >>> rfc3339_duration_to_delta("PT24H")
107
122
    datetime.timedelta(1)
108
123
    >>> rfc3339_duration_to_delta("P1W")
111
126
    datetime.timedelta(0, 330)
112
127
    >>> rfc3339_duration_to_delta("P1DT3M20S")
113
128
    datetime.timedelta(1, 200)
 
129
    >>> # Can not be empty:
 
130
    >>> rfc3339_duration_to_delta("")
 
131
    Traceback (most recent call last):
 
132
    ...
 
133
    ValueError: Invalid RFC 3339 duration: u''
 
134
    >>> # Must start with "P":
 
135
    >>> rfc3339_duration_to_delta("1D")
 
136
    Traceback (most recent call last):
 
137
    ...
 
138
    ValueError: Invalid RFC 3339 duration: u'1D'
 
139
    >>> # Must use correct order
 
140
    >>> rfc3339_duration_to_delta("PT1S2M")
 
141
    Traceback (most recent call last):
 
142
    ...
 
143
    ValueError: Invalid RFC 3339 duration: u'PT1S2M'
 
144
    >>> # Time needs time marker
 
145
    >>> rfc3339_duration_to_delta("P1H2S")
 
146
    Traceback (most recent call last):
 
147
    ...
 
148
    ValueError: Invalid RFC 3339 duration: u'P1H2S'
 
149
    >>> # Weeks can not be combined with anything else
 
150
    >>> rfc3339_duration_to_delta("P1D2W")
 
151
    Traceback (most recent call last):
 
152
    ...
 
153
    ValueError: Invalid RFC 3339 duration: u'P1D2W'
 
154
    >>> rfc3339_duration_to_delta("P2W2H")
 
155
    Traceback (most recent call last):
 
156
    ...
 
157
    ValueError: Invalid RFC 3339 duration: u'P2W2H'
114
158
    """
115
159
 
116
160
    # Parsing an RFC 3339 duration with regular expressions is not
195
239
 
196
240
def string_to_delta(interval):
197
241
    """Parse a string and return a datetime.timedelta
198
 
 
199
 
    >>> string_to_delta('7d')
200
 
    datetime.timedelta(7)
201
 
    >>> string_to_delta('60s')
202
 
    datetime.timedelta(0, 60)
203
 
    >>> string_to_delta('60m')
204
 
    datetime.timedelta(0, 3600)
205
 
    >>> string_to_delta('24h')
206
 
    datetime.timedelta(1)
207
 
    >>> string_to_delta('1w')
208
 
    datetime.timedelta(7)
209
 
    >>> string_to_delta('5m 30s')
210
 
    datetime.timedelta(0, 330)
211
242
    """
212
243
 
213
244
    try:
214
245
        return rfc3339_duration_to_delta(interval)
215
 
    except ValueError:
216
 
        pass
 
246
    except ValueError as e:
 
247
        log.warning("%s - Parsing as pre-1.6.1 interval instead",
 
248
                    ' '.join(e.args))
 
249
    return parse_pre_1_6_1_interval(interval)
 
250
 
 
251
 
 
252
def parse_pre_1_6_1_interval(interval):
 
253
    """Parse an interval string as documented by Mandos before 1.6.1, and
 
254
    return a datetime.timedelta
 
255
    >>> parse_pre_1_6_1_interval('7d')
 
256
    datetime.timedelta(7)
 
257
    >>> parse_pre_1_6_1_interval('60s')
 
258
    datetime.timedelta(0, 60)
 
259
    >>> parse_pre_1_6_1_interval('60m')
 
260
    datetime.timedelta(0, 3600)
 
261
    >>> parse_pre_1_6_1_interval('24h')
 
262
    datetime.timedelta(1)
 
263
    >>> parse_pre_1_6_1_interval('1w')
 
264
    datetime.timedelta(7)
 
265
    >>> parse_pre_1_6_1_interval('5m 30s')
 
266
    datetime.timedelta(0, 330)
 
267
    >>> parse_pre_1_6_1_interval('')
 
268
    datetime.timedelta(0)
 
269
    >>> # Ignore unknown characters, allow any order and repetitions
 
270
    >>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
 
271
    datetime.timedelta(2, 480, 18000)
 
272
 
 
273
    """
217
274
 
218
275
    value = datetime.timedelta(0)
219
276
    regexp = re.compile(r"(\d+)([dsmhw]?)")
235
292
 
236
293
 
237
294
def print_clients(clients, keywords):
 
295
    print('\n'.join(TableOfClients(clients, keywords).rows()))
 
296
 
 
297
class TableOfClients(object):
 
298
    def __init__(self, clients, keywords):
 
299
        self.clients = clients
 
300
        self.keywords = keywords
 
301
 
 
302
    @staticmethod
238
303
    def valuetostring(value, keyword):
239
 
        if type(value) is dbus.Boolean:
 
304
        if isinstance(value, dbus.Boolean):
240
305
            return "Yes" if value else "No"
241
306
        if keyword in ("Timeout", "Interval", "ApprovalDelay",
242
307
                       "ApprovalDuration", "ExtendedTimeout"):
243
308
            return milliseconds_to_string(value)
244
309
        return str(value)
245
310
 
246
 
    # Create format string to print table rows
247
 
    format_string = " ".join("{{{key}:{width}}}".format(
248
 
        width=max(len(tablewords[key]),
249
 
                  max(len(valuetostring(client[key], key))
250
 
                      for client in clients)),
251
 
        key=key)
252
 
                             for key in keywords)
253
 
    # Print header line
254
 
    print(format_string.format(**tablewords))
255
 
    for client in clients:
256
 
        print(format_string
257
 
              .format(**{key: valuetostring(client[key], key)
258
 
                         for key in keywords}))
 
311
    def rows(self):
 
312
        # Create format string to format table rows
 
313
        format_string = " ".join("{{{key}:{width}}}".format(
 
314
            width=max(len(tablewords[key]),
 
315
                      max(len(self.valuetostring(client[key], key))
 
316
                          for client in self.clients)),
 
317
            key=key)
 
318
                                 for key in self.keywords)
 
319
        # Start with header line
 
320
        rows = [format_string.format(**tablewords)]
 
321
        for client in self.clients:
 
322
            rows.append(format_string
 
323
                        .format(**{key: self.valuetostring(client[key], key)
 
324
                                   for key in self.keywords}))
 
325
        return rows
259
326
 
260
327
 
261
328
def has_actions(options):
345
412
    if options.all and not has_actions(options):
346
413
        parser.error("--all requires an action.")
347
414
 
348
 
    if options.check:
349
 
        import doctest
350
 
        fail_count, test_count = doctest.testmod()
351
 
        sys.exit(os.EX_OK if fail_count == 0 else 1)
352
 
 
353
415
    try:
354
416
        bus = dbus.SystemBus()
355
417
        mandos_dbus_objc = bus.get_object(busname, server_path)
356
418
    except dbus.exceptions.DBusException:
357
 
        print("Could not connect to Mandos server", file=sys.stderr)
 
419
        log.critical("Could not connect to Mandos server")
358
420
        sys.exit(1)
359
421
 
360
422
    mandos_serv = dbus.Interface(mandos_dbus_objc,
379
441
            os.dup2(stderrcopy, sys.stderr.fileno())
380
442
            os.close(stderrcopy)
381
443
    except dbus.exceptions.DBusException as e:
382
 
        print("Access denied: "
383
 
              "Accessing mandos server through D-Bus: {}".format(e),
384
 
              file=sys.stderr)
 
444
        log.critical("Failed to access Mandos server through D-Bus:"
 
445
                     "\n%s", e)
385
446
        sys.exit(1)
386
447
 
387
448
    # Compile dict of (clients: properties) to process
398
459
                    clients[client_objc] = client
399
460
                    break
400
461
            else:
401
 
                print("Client not found on server: {!r}"
402
 
                      .format(name), file=sys.stderr)
 
462
                log.critical("Client not found on server: %r", name)
403
463
                sys.exit(1)
404
464
 
405
465
    if not has_actions(options) and clients:
493
553
                client.Approve(dbus.Boolean(False),
494
554
                               dbus_interface=client_interface)
495
555
 
 
556
 
 
557
class Test_milliseconds_to_string(unittest.TestCase):
 
558
    def test_all(self):
 
559
        self.assertEqual(milliseconds_to_string(93785000),
 
560
                         "1T02:03:05")
 
561
    def test_no_days(self):
 
562
        self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
 
563
    def test_all_zero(self):
 
564
        self.assertEqual(milliseconds_to_string(0), "00:00:00")
 
565
    def test_no_fractional_seconds(self):
 
566
        self.assertEqual(milliseconds_to_string(400), "00:00:00")
 
567
        self.assertEqual(milliseconds_to_string(900), "00:00:00")
 
568
        self.assertEqual(milliseconds_to_string(1900), "00:00:01")
 
569
 
 
570
class Test_string_to_delta(unittest.TestCase):
 
571
    def test_handles_basic_rfc3339(self):
 
572
        self.assertEqual(string_to_delta("PT2H"),
 
573
                         datetime.timedelta(0, 7200))
 
574
    def test_falls_back_to_pre_1_6_1_with_warning(self):
 
575
        # assertLogs only exists in Python 3.4
 
576
        if hasattr(self, "assertLogs"):
 
577
            with self.assertLogs(log, logging.WARNING):
 
578
                value = string_to_delta("2h")
 
579
        else:
 
580
            value = string_to_delta("2h")
 
581
        self.assertEqual(value, datetime.timedelta(0, 7200))
 
582
 
 
583
class Test_TableOfClients(unittest.TestCase):
 
584
    def setUp(self):
 
585
        global tablewords
 
586
        self.old_tablewords = tablewords
 
587
        tablewords = {
 
588
            "Attr1": "X",
 
589
            "AttrTwo": "Yy",
 
590
            "AttrThree": "Zzz",
 
591
            "Bool": "A D-BUS Boolean",
 
592
            "NonDbusBoolean": "A Non-D-BUS Boolean",
 
593
            "Integer": "An Integer",
 
594
            "Timeout": "Timedelta 1",
 
595
            "Interval": "Timedelta 2",
 
596
            "ApprovalDelay": "Timedelta 3",
 
597
            "ApprovalDuration": "Timedelta 4",
 
598
            "ExtendedTimeout": "Timedelta 5",
 
599
            "String": "A String",
 
600
        }
 
601
        self.keywords = ["Attr1", "AttrTwo"]
 
602
        self.clients = [
 
603
            {
 
604
                "Attr1": "x1",
 
605
                "AttrTwo": "y1",
 
606
                "AttrThree": "z1",
 
607
                "Bool": dbus.Boolean(False),
 
608
                "NonDbusBoolean": False,
 
609
                "Integer": 0,
 
610
                "Timeout": 0,
 
611
                "Interval": 1000,
 
612
                "ApprovalDelay": 2000,
 
613
                "ApprovalDuration": 3000,
 
614
                "ExtendedTimeout": 4000,
 
615
                "String": "",
 
616
            },
 
617
            {
 
618
                "Attr1": "x2",
 
619
                "AttrTwo": "y2",
 
620
                "AttrThree": "z2",
 
621
                "Bool": dbus.Boolean(True),
 
622
                "NonDbusBoolean": True,
 
623
                "Integer": 1,
 
624
                "Timeout": 93785000,
 
625
                "Interval": 93786000,
 
626
                "ApprovalDelay": 93787000,
 
627
                "ApprovalDuration": 93788000,
 
628
                "ExtendedTimeout": 93789000,
 
629
                "String": "A huge string which will not fit," * 10,
 
630
            },
 
631
        ]
 
632
    def tearDown(self):
 
633
        global tablewords
 
634
        tablewords = self.old_tablewords
 
635
    def test_short_header(self):
 
636
        rows = TableOfClients(self.clients, self.keywords).rows()
 
637
        expected_rows = [
 
638
            "X  Yy",
 
639
            "x1 y1",
 
640
            "x2 y2"]
 
641
        self.assertEqual(rows, expected_rows)
 
642
    def test_booleans(self):
 
643
        keywords = ["Bool", "NonDbusBoolean"]
 
644
        rows = TableOfClients(self.clients, keywords).rows()
 
645
        expected_rows = [
 
646
            "A D-BUS Boolean A Non-D-BUS Boolean",
 
647
            "No              False              ",
 
648
            "Yes             True               ",
 
649
        ]
 
650
        self.assertEqual(rows, expected_rows)
 
651
    def test_milliseconds_detection(self):
 
652
        keywords = ["Integer", "Timeout", "Interval", "ApprovalDelay",
 
653
                    "ApprovalDuration", "ExtendedTimeout"]
 
654
        rows = TableOfClients(self.clients, keywords).rows()
 
655
        expected_rows = ("""
 
656
An Integer Timedelta 1 Timedelta 2 Timedelta 3 Timedelta 4 Timedelta 5
 
657
0          00:00:00    00:00:01    00:00:02    00:00:03    00:00:04   
 
658
1          1T02:03:05  1T02:03:06  1T02:03:07  1T02:03:08  1T02:03:09 
 
659
"""
 
660
        ).splitlines()[1:]
 
661
        self.assertEqual(rows, expected_rows)
 
662
    def test_empty_and_long_string_values(self):
 
663
        keywords = ["String"]
 
664
        rows = TableOfClients(self.clients, keywords).rows()
 
665
        expected_rows = ("""
 
666
A String                                                                                                                                                                                                                                                                                                                                  
 
667
                                                                                                                                                                                                                                                                                                                                          
 
668
A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,
 
669
"""
 
670
        ).splitlines()[1:]
 
671
        self.assertEqual(rows, expected_rows)
 
672
 
 
673
 
 
674
 
 
675
def should_only_run_tests():
 
676
    parser = argparse.ArgumentParser(add_help=False)
 
677
    parser.add_argument("--check", action='store_true')
 
678
    args, unknown_args = parser.parse_known_args()
 
679
    run_tests = args.check
 
680
    if run_tests:
 
681
        # Remove --check argument from sys.argv
 
682
        sys.argv[1:] = unknown_args
 
683
    return run_tests
 
684
 
 
685
# Add all tests from doctest strings
 
686
def load_tests(loader, tests, none):
 
687
    import doctest
 
688
    tests.addTests(doctest.DocTestSuite())
 
689
    return tests
496
690
 
497
691
if __name__ == "__main__":
498
 
    main()
 
692
    if should_only_run_tests():
 
693
        # Call using ./tdd-python-script --check [--verbose]
 
694
        unittest.main()
 
695
    else:
 
696
        main()