2
# -*- mode: python; coding: utf-8 -*-
2
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
4
4
# Mandos Monitor - Control and monitor the Mandos server
6
# Copyright © 2008-2017 Teddy Hogeborn
7
# Copyright © 2008-2017 Björn Påhlsson
6
# Copyright © 2008-2019 Teddy Hogeborn
7
# Copyright © 2008-2019 Björn Påhlsson
9
9
# This file is part of Mandos.
48
# Show warnings by default
49
if not sys.warnoptions:
51
warnings.simplefilter("default")
53
log = logging.getLogger(sys.argv[0])
54
logging.basicConfig(level="INFO", # Show info level messages
55
format="%(message)s") # Show basic log messages
57
logging.captureWarnings(True) # Show warnings via the logging system
46
59
if sys.version_info.major == 2:
58
71
"Interval": "Interval",
60
73
"Fingerprint": "Fingerprint",
61
75
"CheckerRunning": "Check Is Running",
62
76
"LastEnabled": "Last Enabled",
63
77
"ApprovalPending": "Approval Is Pending",
102
116
datetime.timedelta(0, 60)
103
117
>>> rfc3339_duration_to_delta("PT60M")
104
118
datetime.timedelta(0, 3600)
119
>>> rfc3339_duration_to_delta("P60M")
120
datetime.timedelta(1680)
105
121
>>> rfc3339_duration_to_delta("PT24H")
106
122
datetime.timedelta(1)
107
123
>>> rfc3339_duration_to_delta("P1W")
110
126
datetime.timedelta(0, 330)
111
127
>>> rfc3339_duration_to_delta("P1DT3M20S")
112
128
datetime.timedelta(1, 200)
129
>>> # Can not be empty:
130
>>> rfc3339_duration_to_delta("")
131
Traceback (most recent call last):
133
ValueError: Invalid RFC 3339 duration: u''
134
>>> # Must start with "P":
135
>>> rfc3339_duration_to_delta("1D")
136
Traceback (most recent call last):
138
ValueError: Invalid RFC 3339 duration: u'1D'
139
>>> # Must use correct order
140
>>> rfc3339_duration_to_delta("PT1S2M")
141
Traceback (most recent call last):
143
ValueError: Invalid RFC 3339 duration: u'PT1S2M'
144
>>> # Time needs time marker
145
>>> rfc3339_duration_to_delta("P1H2S")
146
Traceback (most recent call last):
148
ValueError: Invalid RFC 3339 duration: u'P1H2S'
149
>>> # Weeks can not be combined with anything else
150
>>> rfc3339_duration_to_delta("P1D2W")
151
Traceback (most recent call last):
153
ValueError: Invalid RFC 3339 duration: u'P1D2W'
154
>>> rfc3339_duration_to_delta("P2W2H")
155
Traceback (most recent call last):
157
ValueError: Invalid RFC 3339 duration: u'P2W2H'
115
160
# Parsing an RFC 3339 duration with regular expressions is not
195
240
def string_to_delta(interval):
196
241
"""Parse a string and return a datetime.timedelta
198
>>> string_to_delta('7d')
199
datetime.timedelta(7)
200
>>> string_to_delta('60s')
201
datetime.timedelta(0, 60)
202
>>> string_to_delta('60m')
203
datetime.timedelta(0, 3600)
204
>>> string_to_delta('24h')
205
datetime.timedelta(1)
206
>>> string_to_delta('1w')
207
datetime.timedelta(7)
208
>>> string_to_delta('5m 30s')
209
datetime.timedelta(0, 330)
213
245
return rfc3339_duration_to_delta(interval)
246
except ValueError as e:
247
log.warning("%s - Parsing as pre-1.6.1 interval instead",
249
return parse_pre_1_6_1_interval(interval)
252
def parse_pre_1_6_1_interval(interval):
253
"""Parse an interval string as documented by Mandos before 1.6.1, and
254
return a datetime.timedelta
255
>>> parse_pre_1_6_1_interval('7d')
256
datetime.timedelta(7)
257
>>> parse_pre_1_6_1_interval('60s')
258
datetime.timedelta(0, 60)
259
>>> parse_pre_1_6_1_interval('60m')
260
datetime.timedelta(0, 3600)
261
>>> parse_pre_1_6_1_interval('24h')
262
datetime.timedelta(1)
263
>>> parse_pre_1_6_1_interval('1w')
264
datetime.timedelta(7)
265
>>> parse_pre_1_6_1_interval('5m 30s')
266
datetime.timedelta(0, 330)
267
>>> parse_pre_1_6_1_interval('')
268
datetime.timedelta(0)
269
>>> # Ignore unknown characters, allow any order and repetitions
270
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
271
datetime.timedelta(2, 480, 18000)
217
275
value = datetime.timedelta(0)
218
276
regexp = re.compile(r"(\d+)([dsmhw]?)")
236
294
def print_clients(clients, keywords):
295
print('\n'.join(table_rows_of_clients(clients, keywords)))
297
def table_rows_of_clients(clients, keywords):
237
298
def valuetostring(value, keyword):
238
if type(value) is dbus.Boolean:
299
if isinstance(value, dbus.Boolean):
239
300
return "Yes" if value else "No"
240
301
if keyword in ("Timeout", "Interval", "ApprovalDelay",
241
302
"ApprovalDuration", "ExtendedTimeout"):
249
310
for client in clients)),
251
312
for key in keywords)
253
print(format_string.format(**tablewords))
313
# Start with header line
314
rows = [format_string.format(**tablewords)]
254
315
for client in clients:
256
.format(**{key: valuetostring(client[key], key)
257
for key in keywords}))
316
rows.append(format_string
317
.format(**{key: valuetostring(client[key], key)
318
for key in keywords}))
260
322
def has_actions(options):
344
406
if options.all and not has_actions(options):
345
407
parser.error("--all requires an action.")
349
fail_count, test_count = doctest.testmod()
350
sys.exit(os.EX_OK if fail_count == 0 else 1)
353
410
bus = dbus.SystemBus()
354
411
mandos_dbus_objc = bus.get_object(busname, server_path)
355
412
except dbus.exceptions.DBusException:
356
print("Could not connect to Mandos server", file=sys.stderr)
413
log.critical("Could not connect to Mandos server")
359
416
mandos_serv = dbus.Interface(mandos_dbus_objc,
378
435
os.dup2(stderrcopy, sys.stderr.fileno())
379
436
os.close(stderrcopy)
380
437
except dbus.exceptions.DBusException as e:
381
print("Access denied: "
382
"Accessing mandos server through D-Bus: {}".format(e),
438
log.critical("Failed to access Mandos server through D-Bus:"
386
442
# Compile dict of (clients: properties) to process
397
453
clients[client_objc] = client
400
print("Client not found on server: {!r}"
401
.format(name), file=sys.stderr)
456
log.critical("Client not found on server: %r", name)
404
459
if not has_actions(options) and clients:
405
460
if options.verbose or options.dump_json:
406
461
keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
407
"Created", "Interval", "Host", "Fingerprint",
408
"CheckerRunning", "LastEnabled",
409
"ApprovalPending", "ApprovedByDefault",
410
"LastApprovalRequest", "ApprovalDelay",
411
"ApprovalDuration", "Checker",
412
"ExtendedTimeout", "Expires",
462
"Created", "Interval", "Host", "KeyID",
463
"Fingerprint", "CheckerRunning",
464
"LastEnabled", "ApprovalPending",
465
"ApprovedByDefault", "LastApprovalRequest",
466
"ApprovalDelay", "ApprovalDuration",
467
"Checker", "ExtendedTimeout", "Expires",
413
468
"LastCheckerStatus")
415
470
keywords = defaultkeywords
492
547
client.Approve(dbus.Boolean(False),
493
548
dbus_interface=client_interface)
551
class Test_milliseconds_to_string(unittest.TestCase):
553
self.assertEqual(milliseconds_to_string(93785000),
555
def test_no_days(self):
556
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
557
def test_all_zero(self):
558
self.assertEqual(milliseconds_to_string(0), "00:00:00")
559
def test_no_fractional_seconds(self):
560
self.assertEqual(milliseconds_to_string(400), "00:00:00")
561
self.assertEqual(milliseconds_to_string(900), "00:00:00")
562
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
564
class Test_string_to_delta(unittest.TestCase):
565
def test_handles_basic_rfc3339(self):
566
self.assertEqual(string_to_delta("PT2H"),
567
datetime.timedelta(0, 7200))
568
def test_falls_back_to_pre_1_6_1_with_warning(self):
569
# assertLogs only exists in Python 3.4
570
if hasattr(self, "assertLogs"):
571
with self.assertLogs(log, logging.WARNING):
572
value = string_to_delta("2h")
574
value = string_to_delta("2h")
575
self.assertEqual(value, datetime.timedelta(0, 7200))
578
def should_only_run_tests():
579
parser = argparse.ArgumentParser(add_help=False)
580
parser.add_argument("--check", action='store_true')
581
args, unknown_args = parser.parse_known_args()
582
run_tests = args.check
584
# Remove --check argument from sys.argv
585
sys.argv[1:] = unknown_args
588
# Add all tests from doctest strings
589
def load_tests(loader, tests, none):
591
tests.addTests(doctest.DocTestSuite())
496
594
if __name__ == "__main__":
595
if should_only_run_tests():
596
# Call using ./tdd-python-script --check [--verbose]