/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-02 01:56:35 UTC
  • Revision ID: teddy@recompile.se-20190302015635-xgodxd3ivltju7qn
mandos-ctl: Refactor

* mandos-ctl (global tablewords): Refactor into TableOfClients.
  (TableOfClients.tablewords): New overrideable class attribute.

Show diffs side-by-side

added added

removed removed

Lines of Context:
216
216
 
217
217
 
218
218
def string_to_delta(interval):
219
 
    """Parse a string and return a datetime.timedelta"""
 
219
    """Parse a string and return a datetime.timedelta
 
220
    """
220
221
 
221
222
    try:
222
223
        return rfc3339_duration_to_delta(interval)
227
228
 
228
229
 
229
230
def parse_pre_1_6_1_interval(interval):
230
 
    """Parse an interval string as documented by Mandos before 1.6.1,
231
 
    and return a datetime.timedelta
232
 
 
 
231
    """Parse an interval string as documented by Mandos before 1.6.1, and
 
232
    return a datetime.timedelta
233
233
    >>> parse_pre_1_6_1_interval('7d')
234
234
    datetime.timedelta(7)
235
235
    >>> parse_pre_1_6_1_interval('60s')
269
269
    return value
270
270
 
271
271
 
 
272
def print_clients(clients, keywords):
 
273
    print('\n'.join(TableOfClients(clients, keywords).rows()))
 
274
 
272
275
class TableOfClients(object):
273
276
    tablewords = {
274
277
        "Name": "Name",
299
302
        if tablewords is not None:
300
303
            self.tablewords = tablewords
301
304
 
302
 
    def __str__(self):
303
 
        return "\n".join(self.rows())
304
 
 
305
 
    if sys.version_info.major == 2:
306
 
        __unicode__ = __str__
307
 
        def __str__(self):
308
 
            return str(self).encode(locale.getpreferredencoding())
309
 
 
310
 
    def rows(self):
311
 
        format_string = self.row_formatting_string()
312
 
        rows = [self.header_line(format_string)]
313
 
        rows.extend(self.client_line(client, format_string)
314
 
                    for client in self.clients)
315
 
        return rows
316
 
 
317
 
    def row_formatting_string(self):
318
 
        "Format string used to format table rows"
319
 
        return " ".join("{{{key}:{width}}}".format(
320
 
            width=max(len(self.tablewords[key]),
321
 
                      *(len(self.string_from_client(client, key))
322
 
                        for client in self.clients)),
323
 
            key=key)
324
 
                        for key in self.keywords)
325
 
 
326
 
    def string_from_client(self, client, key):
327
 
        return self.valuetostring(client[key], key)
328
 
 
329
305
    @staticmethod
330
306
    def valuetostring(value, keyword):
331
307
        if isinstance(value, dbus.Boolean):
335
311
            return milliseconds_to_string(value)
336
312
        return str(value)
337
313
 
338
 
    def header_line(self, format_string):
339
 
        return format_string.format(**self.tablewords)
340
 
 
341
 
    def client_line(self, client, format_string):
342
 
        return format_string.format(
343
 
            **{key: self.string_from_client(client, key)
344
 
               for key in self.keywords})
 
314
    def rows(self):
 
315
        # Create format string to format table rows
 
316
        format_string = " ".join("{{{key}:{width}}}".format(
 
317
            width=max(len(self.tablewords[key]),
 
318
                      max(len(self.valuetostring(client[key], key))
 
319
                          for client in self.clients)),
 
320
            key=key)
 
321
                                 for key in self.keywords)
 
322
        # Start with header line
 
323
        rows = [format_string.format(**self.tablewords)]
 
324
        for client in self.clients:
 
325
            rows.append(format_string
 
326
                        .format(**{key: self.valuetostring(client[key], key)
 
327
                                   for key in self.keywords}))
 
328
        return rows
345
329
 
346
330
 
347
331
def has_actions(options):
376
360
                        help="Print all fields")
377
361
    parser.add_argument("-j", "--dump-json", action="store_true",
378
362
                        help="Dump client data in JSON format")
379
 
    enable_disable = parser.add_mutually_exclusive_group()
380
 
    enable_disable.add_argument("-e", "--enable", action="store_true",
381
 
                                help="Enable client")
382
 
    enable_disable.add_argument("-d", "--disable",
383
 
                                action="store_true",
384
 
                                help="disable client")
 
363
    parser.add_argument("-e", "--enable", action="store_true",
 
364
                        help="Enable client")
 
365
    parser.add_argument("-d", "--disable", action="store_true",
 
366
                        help="disable client")
385
367
    parser.add_argument("-b", "--bump-timeout", action="store_true",
386
368
                        help="Bump timeout for client")
387
 
    start_stop_checker = parser.add_mutually_exclusive_group()
388
 
    start_stop_checker.add_argument("--start-checker",
389
 
                                    action="store_true",
390
 
                                    help="Start checker for client")
391
 
    start_stop_checker.add_argument("--stop-checker",
392
 
                                    action="store_true",
393
 
                                    help="Stop checker for client")
 
369
    parser.add_argument("--start-checker", action="store_true",
 
370
                        help="Start checker for client")
 
371
    parser.add_argument("--stop-checker", action="store_true",
 
372
                        help="Stop checker for client")
394
373
    parser.add_argument("-V", "--is-enabled", action="store_true",
395
374
                        help="Check if client is enabled")
396
375
    parser.add_argument("-r", "--remove", action="store_true",
403
382
                        help="Set extended timeout for client")
404
383
    parser.add_argument("-i", "--interval",
405
384
                        help="Set checker interval for client")
406
 
    approve_deny_default = parser.add_mutually_exclusive_group()
407
 
    approve_deny_default.add_argument(
408
 
        "--approve-by-default", action="store_true",
409
 
        default=None, dest="approved_by_default",
410
 
        help="Set client to be approved by default")
411
 
    approve_deny_default.add_argument(
412
 
        "--deny-by-default", action="store_false",
413
 
        dest="approved_by_default",
414
 
        help="Set client to be denied by default")
 
385
    parser.add_argument("--approve-by-default", action="store_true",
 
386
                        default=None, dest="approved_by_default",
 
387
                        help="Set client to be approved by default")
 
388
    parser.add_argument("--deny-by-default", action="store_false",
 
389
                        dest="approved_by_default",
 
390
                        help="Set client to be denied by default")
415
391
    parser.add_argument("--approval-delay",
416
392
                        help="Set delay before client approve/deny")
417
393
    parser.add_argument("--approval-duration",
420
396
    parser.add_argument("-s", "--secret",
421
397
                        type=argparse.FileType(mode="rb"),
422
398
                        help="Set password blob (file) for client")
423
 
    approve_deny = parser.add_mutually_exclusive_group()
424
 
    approve_deny.add_argument(
425
 
        "-A", "--approve", action="store_true",
426
 
        help="Approve any current client request")
427
 
    approve_deny.add_argument("-D", "--deny", action="store_true",
428
 
                              help="Deny any current client request")
 
399
    parser.add_argument("-A", "--approve", action="store_true",
 
400
                        help="Approve any current client request")
 
401
    parser.add_argument("-D", "--deny", action="store_true",
 
402
                        help="Deny any current client request")
429
403
    parser.add_argument("--check", action="store_true",
430
404
                        help="Run self-test")
431
405
    parser.add_argument("client", nargs="*", help="Client name")
440
414
        parser.error("--dump-json can only be used alone.")
441
415
    if options.all and not has_actions(options):
442
416
        parser.error("--all requires an action.")
443
 
    if options.is_enabled and len(options.client) > 1:
444
 
            parser.error("--is-enabled requires exactly one client")
445
417
 
446
418
    try:
447
419
        bus = dbus.SystemBus()
518
490
                      separators=(',', ': '))
519
491
            print()
520
492
        else:
521
 
            print(TableOfClients(clients.values(), keywords))
 
493
            print_clients(clients.values(), keywords)
522
494
    else:
523
495
        # Process each client in the list by all selected options
524
496
        for client in clients:
659
631
            },
660
632
        ]
661
633
    def test_short_header(self):
662
 
        text = str(TableOfClients(self.clients, self.keywords,
663
 
                                  self.tablewords))
664
 
        expected_text = """
665
 
X  Yy
666
 
x1 y1
667
 
x2 y2
668
 
"""[1:-1]
669
 
        self.assertEqual(text, expected_text)
 
634
        rows = TableOfClients(self.clients, self.keywords,
 
635
                              self.tablewords).rows()
 
636
        expected_rows = [
 
637
            "X  Yy",
 
638
            "x1 y1",
 
639
            "x2 y2"]
 
640
        self.assertEqual(rows, expected_rows)
670
641
    def test_booleans(self):
671
642
        keywords = ["Bool", "NonDbusBoolean"]
672
 
        text = str(TableOfClients(self.clients, keywords,
673
 
                                  self.tablewords))
674
 
        expected_text = """
675
 
A D-BUS Boolean A Non-D-BUS Boolean
676
 
No              False              
677
 
Yes             True               
678
 
"""[1:-1]
679
 
        self.assertEqual(text, expected_text)
 
643
        rows = TableOfClients(self.clients, keywords,
 
644
                              self.tablewords).rows()
 
645
        expected_rows = [
 
646
            "A D-BUS Boolean A Non-D-BUS Boolean",
 
647
            "No              False              ",
 
648
            "Yes             True               ",
 
649
        ]
 
650
        self.assertEqual(rows, expected_rows)
680
651
    def test_milliseconds_detection(self):
681
652
        keywords = ["Integer", "Timeout", "Interval", "ApprovalDelay",
682
653
                    "ApprovalDuration", "ExtendedTimeout"]
683
 
        text = str(TableOfClients(self.clients, keywords,
684
 
                                  self.tablewords))
685
 
        expected_text = """
 
654
        rows = TableOfClients(self.clients, keywords,
 
655
                              self.tablewords).rows()
 
656
        expected_rows = ("""
686
657
An Integer Timedelta 1 Timedelta 2 Timedelta 3 Timedelta 4 Timedelta 5
687
658
0          00:00:00    00:00:01    00:00:02    00:00:03    00:00:04   
688
659
1          1T02:03:05  1T02:03:06  1T02:03:07  1T02:03:08  1T02:03:09 
689
 
"""[1:-1]
690
 
        self.assertEqual(text, expected_text)
 
660
"""
 
661
        ).splitlines()[1:]
 
662
        self.assertEqual(rows, expected_rows)
691
663
    def test_empty_and_long_string_values(self):
692
664
        keywords = ["String"]
693
 
        text = str(TableOfClients(self.clients, keywords,
694
 
                                  self.tablewords))
695
 
        expected_text = """
 
665
        rows = TableOfClients(self.clients, keywords,
 
666
                              self.tablewords).rows()
 
667
        expected_rows = ("""
696
668
A String                                                                                                                                                                                                                                                                                                                                  
697
669
                                                                                                                                                                                                                                                                                                                                          
698
670
A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,
699
 
"""[1:-1]
700
 
        self.assertEqual(text, expected_text)
 
671
"""
 
672
        ).splitlines()[1:]
 
673
        self.assertEqual(rows, expected_rows)
701
674
 
702
675
 
703
676