2
# -*- mode: python; coding: utf-8 -*-
2
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
4
4
# Mandos Monitor - Control and monitor the Mandos server
6
# Copyright © 2008-2016 Teddy Hogeborn
7
# Copyright © 2008-2016 Björn Påhlsson
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU General Public License as published by
6
# Copyright © 2008-2019 Teddy Hogeborn
7
# Copyright © 2008-2019 Björn Påhlsson
9
# This file is part of Mandos.
11
# Mandos is free software: you can redistribute it and/or modify it
12
# under the terms of the GNU General Public License as published by
11
13
# the Free Software Foundation, either version 3 of the License, or
12
14
# (at your option) any later version.
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# Mandos is distributed in the hope that it will be useful, but
17
# WITHOUT ANY WARRANTY; without even the implied warranty of
16
18
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
19
# GNU General Public License for more details.
19
21
# You should have received a copy of the GNU General Public License
20
# along with this program. If not, see
21
# <http://www.gnu.org/licenses/>.
22
# along with Mandos. If not, see <http://www.gnu.org/licenses/>.
23
24
# Contact the authors at <mandos@recompile.se>.
48
# Show warnings by default
49
if not sys.warnoptions:
51
warnings.simplefilter("default")
53
log = logging.getLogger(sys.argv[0])
54
logging.basicConfig(level="INFO", # Show info level messages
55
format="%(message)s") # Show basic log messages
57
logging.captureWarnings(True) # Show warnings via the logging system
46
59
if sys.version_info.major == 2:
58
71
"Interval": "Interval",
60
73
"Fingerprint": "Fingerprint",
61
75
"CheckerRunning": "Check Is Running",
62
76
"LastEnabled": "Last Enabled",
63
77
"ApprovalPending": "Approval Is Pending",
102
116
datetime.timedelta(0, 60)
103
117
>>> rfc3339_duration_to_delta("PT60M")
104
118
datetime.timedelta(0, 3600)
119
>>> rfc3339_duration_to_delta("P60M")
120
datetime.timedelta(1680)
105
121
>>> rfc3339_duration_to_delta("PT24H")
106
122
datetime.timedelta(1)
107
123
>>> rfc3339_duration_to_delta("P1W")
110
126
datetime.timedelta(0, 330)
111
127
>>> rfc3339_duration_to_delta("P1DT3M20S")
112
128
datetime.timedelta(1, 200)
129
>>> # Can not be empty:
130
>>> rfc3339_duration_to_delta("")
131
Traceback (most recent call last):
133
ValueError: Invalid RFC 3339 duration: u''
134
>>> # Must start with "P":
135
>>> rfc3339_duration_to_delta("1D")
136
Traceback (most recent call last):
138
ValueError: Invalid RFC 3339 duration: u'1D'
139
>>> # Must use correct order
140
>>> rfc3339_duration_to_delta("PT1S2M")
141
Traceback (most recent call last):
143
ValueError: Invalid RFC 3339 duration: u'PT1S2M'
144
>>> # Time needs time marker
145
>>> rfc3339_duration_to_delta("P1H2S")
146
Traceback (most recent call last):
148
ValueError: Invalid RFC 3339 duration: u'P1H2S'
149
>>> # Weeks can not be combined with anything else
150
>>> rfc3339_duration_to_delta("P1D2W")
151
Traceback (most recent call last):
153
ValueError: Invalid RFC 3339 duration: u'P1D2W'
154
>>> rfc3339_duration_to_delta("P2W2H")
155
Traceback (most recent call last):
157
ValueError: Invalid RFC 3339 duration: u'P2W2H'
115
160
# Parsing an RFC 3339 duration with regular expressions is not
236
281
def print_clients(clients, keywords):
237
282
def valuetostring(value, keyword):
238
if type(value) is dbus.Boolean:
283
if isinstance(value, dbus.Boolean):
239
284
return "Yes" if value else "No"
240
285
if keyword in ("Timeout", "Interval", "ApprovalDelay",
241
286
"ApprovalDuration", "ExtendedTimeout"):
344
389
if options.all and not has_actions(options):
345
390
parser.error("--all requires an action.")
348
fail_count, test_count = doctest.testmod()
349
sys.exit(os.EX_OK if fail_count == 0 else 1)
352
393
bus = dbus.SystemBus()
353
394
mandos_dbus_objc = bus.get_object(busname, server_path)
354
395
except dbus.exceptions.DBusException:
355
print("Could not connect to Mandos server", file=sys.stderr)
396
log.critical("Could not connect to Mandos server")
358
399
mandos_serv = dbus.Interface(mandos_dbus_objc,
377
418
os.dup2(stderrcopy, sys.stderr.fileno())
378
419
os.close(stderrcopy)
379
420
except dbus.exceptions.DBusException as e:
380
print("Access denied: "
381
"Accessing mandos server through D-Bus: {}".format(e),
421
log.critical("Failed to access Mandos server through D-Bus:"
385
425
# Compile dict of (clients: properties) to process
396
436
clients[client_objc] = client
399
print("Client not found on server: {!r}"
400
.format(name), file=sys.stderr)
439
log.critical("Client not found on server: %r", name)
403
442
if not has_actions(options) and clients:
404
443
if options.verbose or options.dump_json:
405
444
keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
406
"Created", "Interval", "Host", "Fingerprint",
407
"CheckerRunning", "LastEnabled",
408
"ApprovalPending", "ApprovedByDefault",
409
"LastApprovalRequest", "ApprovalDelay",
410
"ApprovalDuration", "Checker",
411
"ExtendedTimeout", "Expires",
445
"Created", "Interval", "Host", "KeyID",
446
"Fingerprint", "CheckerRunning",
447
"LastEnabled", "ApprovalPending",
448
"ApprovedByDefault", "LastApprovalRequest",
449
"ApprovalDelay", "ApprovalDuration",
450
"Checker", "ExtendedTimeout", "Expires",
412
451
"LastCheckerStatus")
414
453
keywords = defaultkeywords
491
530
client.Approve(dbus.Boolean(False),
492
531
dbus_interface=client_interface)
534
class Test_milliseconds_to_string(unittest.TestCase):
536
self.assertEqual(milliseconds_to_string(93785000),
538
def test_no_days(self):
539
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
540
def test_all_zero(self):
541
self.assertEqual(milliseconds_to_string(0), "00:00:00")
542
def test_no_fractional_seconds(self):
543
self.assertEqual(milliseconds_to_string(400), "00:00:00")
544
self.assertEqual(milliseconds_to_string(900), "00:00:00")
545
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
548
def should_only_run_tests():
549
parser = argparse.ArgumentParser(add_help=False)
550
parser.add_argument("--check", action='store_true')
551
args, unknown_args = parser.parse_known_args()
552
run_tests = args.check
554
# Remove --check argument from sys.argv
555
sys.argv[1:] = unknown_args
558
# Add all tests from doctest strings
559
def load_tests(loader, tests, none):
561
tests.addTests(doctest.DocTestSuite())
495
564
if __name__ == "__main__":
565
if should_only_run_tests():
566
# Call using ./tdd-python-script --check [--verbose]