2
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
2
# -*- mode: python; coding: utf-8 -*-
4
4
# Mandos Monitor - Control and monitor the Mandos server
6
# Copyright © 2008-2019 Teddy Hogeborn
7
# Copyright © 2008-2019 Björn Påhlsson
9
# This file is part of Mandos.
11
# Mandos is free software: you can redistribute it and/or modify it
12
# under the terms of the GNU General Public License as published by
6
# Copyright © 2008-2010 Teddy Hogeborn
7
# Copyright © 2008-2010 Björn Påhlsson
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU General Public License as published by
13
11
# the Free Software Foundation, either version 3 of the License, or
14
12
# (at your option) any later version.
16
# Mandos is distributed in the hope that it will be useful, but
17
# WITHOUT ANY WARRANTY; without even the implied warranty of
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
18
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19
17
# GNU General Public License for more details.
21
19
# You should have received a copy of the GNU General Public License
22
# along with Mandos. If not, see <http://www.gnu.org/licenses/>.
24
# Contact the authors at <mandos@recompile.se>.
20
# along with this program. If not, see <http://www.gnu.org/licenses/>.
22
# Contact the authors at <mandos@fukt.bsnet.se>.
27
25
from __future__ import (division, absolute_import, print_function,
31
from future_builtins import *
30
from optparse import OptionParser
48
# Show warnings by default
49
if not sys.warnoptions:
51
warnings.simplefilter("default")
53
log = logging.getLogger(sys.argv[0])
54
logging.basicConfig(level="INFO", # Show info level messages
55
format="%(message)s") # Show basic log messages
57
logging.captureWarnings(True) # Show warnings via the logging system
59
if sys.version_info.major == 2:
62
36
locale.setlocale(locale.LC_ALL, "")
42
"LastCheckedOK": "Last Successful Check",
43
"LastApprovalRequest": "Last Approval Request",
45
"Interval": "Interval",
47
"Fingerprint": "Fingerprint",
48
"CheckerRunning": "Check Is Running",
49
"LastEnabled": "Last Enabled",
50
"ApprovalPending": "Approval Is Pending",
51
"ApprovedByDefault": "Approved By Default",
52
"ApprovalDelay": "Approval Delay",
53
"ApprovalDuration": "Approval Duration",
64
56
defaultkeywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
65
domain = "se.recompile"
57
domain = "se.bsnet.fukt"
66
58
busname = domain + ".Mandos"
68
60
server_interface = domain + ".Mandos"
69
61
client_interface = domain + ".Mandos.Client"
74
dbus.OBJECT_MANAGER_IFACE
75
except AttributeError:
76
dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"
64
def timedelta_to_milliseconds(td):
65
"""Convert a datetime.timedelta object to milliseconds"""
66
return ((td.days * 24 * 60 * 60 * 1000)
68
+ (td.microseconds // 1000))
79
70
def milliseconds_to_string(ms):
80
71
td = datetime.timedelta(0, 0, 0, ms)
81
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
82
.format(days="{}T".format(td.days) if td.days else "",
83
hours=td.seconds // 3600,
84
minutes=(td.seconds % 3600) // 60,
85
seconds=td.seconds % 60))
88
def rfc3339_duration_to_delta(duration):
89
"""Parse an RFC 3339 "duration" and return a datetime.timedelta
91
>>> rfc3339_duration_to_delta("P7D")
93
>>> rfc3339_duration_to_delta("PT60S")
94
datetime.timedelta(0, 60)
95
>>> rfc3339_duration_to_delta("PT60M")
96
datetime.timedelta(0, 3600)
97
>>> rfc3339_duration_to_delta("P60M")
98
datetime.timedelta(1680)
99
>>> rfc3339_duration_to_delta("PT24H")
100
datetime.timedelta(1)
101
>>> rfc3339_duration_to_delta("P1W")
102
datetime.timedelta(7)
103
>>> rfc3339_duration_to_delta("PT5M30S")
104
datetime.timedelta(0, 330)
105
>>> rfc3339_duration_to_delta("P1DT3M20S")
106
datetime.timedelta(1, 200)
107
>>> # Can not be empty:
108
>>> rfc3339_duration_to_delta("")
109
Traceback (most recent call last):
111
ValueError: Invalid RFC 3339 duration: u''
112
>>> # Must start with "P":
113
>>> rfc3339_duration_to_delta("1D")
114
Traceback (most recent call last):
116
ValueError: Invalid RFC 3339 duration: u'1D'
117
>>> # Must use correct order
118
>>> rfc3339_duration_to_delta("PT1S2M")
119
Traceback (most recent call last):
121
ValueError: Invalid RFC 3339 duration: u'PT1S2M'
122
>>> # Time needs time marker
123
>>> rfc3339_duration_to_delta("P1H2S")
124
Traceback (most recent call last):
126
ValueError: Invalid RFC 3339 duration: u'P1H2S'
127
>>> # Weeks can not be combined with anything else
128
>>> rfc3339_duration_to_delta("P1D2W")
129
Traceback (most recent call last):
131
ValueError: Invalid RFC 3339 duration: u'P1D2W'
132
>>> rfc3339_duration_to_delta("P2W2H")
133
Traceback (most recent call last):
135
ValueError: Invalid RFC 3339 duration: u'P2W2H'
138
# Parsing an RFC 3339 duration with regular expressions is not
139
# possible - there would have to be multiple places for the same
140
# values, like seconds. The current code, while more esoteric, is
141
# cleaner without depending on a parsing library. If Python had a
142
# built-in library for parsing we would use it, but we'd like to
143
# avoid excessive use of external libraries.
145
# New type for defining tokens, syntax, and semantics all-in-one
146
Token = collections.namedtuple("Token", (
147
"regexp", # To match token; if "value" is not None, must have
148
# a "group" containing digits
149
"value", # datetime.timedelta or None
150
"followers")) # Tokens valid after this token
151
# RFC 3339 "duration" tokens, syntax, and semantics; taken from
152
# the "duration" ABNF definition in RFC 3339, Appendix A.
153
token_end = Token(re.compile(r"$"), None, frozenset())
154
token_second = Token(re.compile(r"(\d+)S"),
155
datetime.timedelta(seconds=1),
156
frozenset((token_end, )))
157
token_minute = Token(re.compile(r"(\d+)M"),
158
datetime.timedelta(minutes=1),
159
frozenset((token_second, token_end)))
160
token_hour = Token(re.compile(r"(\d+)H"),
161
datetime.timedelta(hours=1),
162
frozenset((token_minute, token_end)))
163
token_time = Token(re.compile(r"T"),
165
frozenset((token_hour, token_minute,
167
token_day = Token(re.compile(r"(\d+)D"),
168
datetime.timedelta(days=1),
169
frozenset((token_time, token_end)))
170
token_month = Token(re.compile(r"(\d+)M"),
171
datetime.timedelta(weeks=4),
172
frozenset((token_day, token_end)))
173
token_year = Token(re.compile(r"(\d+)Y"),
174
datetime.timedelta(weeks=52),
175
frozenset((token_month, token_end)))
176
token_week = Token(re.compile(r"(\d+)W"),
177
datetime.timedelta(weeks=1),
178
frozenset((token_end, )))
179
token_duration = Token(re.compile(r"P"), None,
180
frozenset((token_year, token_month,
181
token_day, token_time,
183
# Define starting values:
185
value = datetime.timedelta()
187
# Following valid tokens
188
followers = frozenset((token_duration, ))
189
# String left to parse
191
# Loop until end token is found
192
while found_token is not token_end:
193
# Search for any currently valid tokens
194
for token in followers:
195
match = token.regexp.match(s)
196
if match is not None:
198
if token.value is not None:
199
# Value found, parse digits
200
factor = int(match.group(1), 10)
201
# Add to value so far
202
value += factor * token.value
203
# Strip token from string
204
s = token.regexp.sub("", s, 1)
207
# Set valid next tokens
208
followers = found_token.followers
211
# No currently valid tokens were found
212
raise ValueError("Invalid RFC 3339 duration: {!r}"
72
return ("%(days)s%(hours)02d:%(minutes)02d:%(seconds)02d"
73
% { "days": "%dT" % td.days if td.days else "",
74
"hours": td.seconds // 3600,
75
"minutes": (td.seconds % 3600) // 60,
76
"seconds": td.seconds % 60,
218
80
def string_to_delta(interval):
219
81
"""Parse a string and return a datetime.timedelta
223
return rfc3339_duration_to_delta(interval)
224
except ValueError as e:
225
log.warning("%s - Parsing as pre-1.6.1 interval instead",
227
return parse_pre_1_6_1_interval(interval)
230
def parse_pre_1_6_1_interval(interval):
231
"""Parse an interval string as documented by Mandos before 1.6.1, and
232
return a datetime.timedelta
233
>>> parse_pre_1_6_1_interval('7d')
83
>>> string_to_delta("7d")
234
84
datetime.timedelta(7)
235
>>> parse_pre_1_6_1_interval('60s')
85
>>> string_to_delta("60s")
236
86
datetime.timedelta(0, 60)
237
>>> parse_pre_1_6_1_interval('60m')
87
>>> string_to_delta("60m")
238
88
datetime.timedelta(0, 3600)
239
>>> parse_pre_1_6_1_interval('24h')
89
>>> string_to_delta("24h")
240
90
datetime.timedelta(1)
241
>>> parse_pre_1_6_1_interval('1w')
91
>>> string_to_delta("1w")
242
92
datetime.timedelta(7)
243
>>> parse_pre_1_6_1_interval('5m 30s')
93
>>> string_to_delta("5m 30s")
244
94
datetime.timedelta(0, 330)
245
>>> parse_pre_1_6_1_interval('')
246
datetime.timedelta(0)
247
>>> # Ignore unknown characters, allow any order and repetitions
248
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
249
datetime.timedelta(2, 480, 18000)
253
value = datetime.timedelta(0)
254
regexp = re.compile(r"(\d+)([dsmhw]?)")
256
for num, suffix in regexp.findall(interval):
258
value += datetime.timedelta(int(num))
260
value += datetime.timedelta(0, int(num))
262
value += datetime.timedelta(0, 0, 0, 0, int(num))
264
value += datetime.timedelta(0, 0, 0, 0, 0, int(num))
266
value += datetime.timedelta(0, 0, 0, 0, 0, 0, int(num))
268
value += datetime.timedelta(0, 0, 0, int(num))
96
timevalue = datetime.timedelta(0)
97
regexp = re.compile("\d+[dsmhw]")
99
for s in regexp.findall(interval):
101
suffix = unicode(s[-1])
104
delta = datetime.timedelta(value)
106
delta = datetime.timedelta(0, value)
108
delta = datetime.timedelta(0, 0, 0, 0, value)
110
delta = datetime.timedelta(0, 0, 0, 0, 0, value)
112
delta = datetime.timedelta(0, 0, 0, 0, 0, 0, value)
115
except (ValueError, IndexError):
272
120
def print_clients(clients, keywords):
273
print('\n'.join(TableOfClients(clients, keywords).rows()))
275
class TableOfClients(object):
278
"Enabled": "Enabled",
279
"Timeout": "Timeout",
280
"LastCheckedOK": "Last Successful Check",
281
"LastApprovalRequest": "Last Approval Request",
282
"Created": "Created",
283
"Interval": "Interval",
285
"Fingerprint": "Fingerprint",
287
"CheckerRunning": "Check Is Running",
288
"LastEnabled": "Last Enabled",
289
"ApprovalPending": "Approval Is Pending",
290
"ApprovedByDefault": "Approved By Default",
291
"ApprovalDelay": "Approval Delay",
292
"ApprovalDuration": "Approval Duration",
293
"Checker": "Checker",
294
"ExtendedTimeout": "Extended Timeout",
295
"Expires": "Expires",
296
"LastCheckerStatus": "Last Checker Status",
299
def __init__(self, clients, keywords, tablewords=None):
300
self.clients = clients
301
self.keywords = keywords
302
if tablewords is not None:
303
self.tablewords = tablewords
306
121
def valuetostring(value, keyword):
307
if isinstance(value, dbus.Boolean):
122
if type(value) is dbus.Boolean:
308
123
return "Yes" if value else "No"
309
124
if keyword in ("Timeout", "Interval", "ApprovalDelay",
310
"ApprovalDuration", "ExtendedTimeout"):
311
126
return milliseconds_to_string(value)
315
# Create format string to format table rows
316
format_string = " ".join("{{{key}:{width}}}".format(
317
width=max(len(self.tablewords[key]),
318
max(len(self.valuetostring(client[key], key))
319
for client in self.clients)),
321
for key in self.keywords)
322
# Start with header line
323
rows = [format_string.format(**self.tablewords)]
324
for client in self.clients:
325
rows.append(format_string
326
.format(**{key: self.valuetostring(client[key], key)
327
for key in self.keywords}))
127
return unicode(value)
129
# Create format string to print table rows
130
format_string = " ".join("%%-%ds" %
131
max(len(tablewords[key]),
132
max(len(valuetostring(client[key],
138
print(format_string % tuple(tablewords[key] for key in keywords))
139
for client in clients:
140
print(format_string % tuple(valuetostring(client[key], key)
141
for key in keywords))
331
143
def has_actions(options):
332
144
return any((options.enable,
347
158
options.secret is not None,
353
parser = argparse.ArgumentParser()
354
parser.add_argument("--version", action="version",
355
version="%(prog)s {}".format(version),
356
help="show version number and exit")
357
parser.add_argument("-a", "--all", action="store_true",
358
help="Select all clients")
359
parser.add_argument("-v", "--verbose", action="store_true",
360
help="Print all fields")
361
parser.add_argument("-j", "--dump-json", action="store_true",
362
help="Dump client data in JSON format")
363
parser.add_argument("-e", "--enable", action="store_true",
364
help="Enable client")
365
parser.add_argument("-d", "--disable", action="store_true",
366
help="disable client")
367
parser.add_argument("-b", "--bump-timeout", action="store_true",
368
help="Bump timeout for client")
369
parser.add_argument("--start-checker", action="store_true",
370
help="Start checker for client")
371
parser.add_argument("--stop-checker", action="store_true",
372
help="Stop checker for client")
373
parser.add_argument("-V", "--is-enabled", action="store_true",
374
help="Check if client is enabled")
375
parser.add_argument("-r", "--remove", action="store_true",
376
help="Remove client")
377
parser.add_argument("-c", "--checker",
378
help="Set checker command for client")
379
parser.add_argument("-t", "--timeout",
380
help="Set timeout for client")
381
parser.add_argument("--extended-timeout",
382
help="Set extended timeout for client")
383
parser.add_argument("-i", "--interval",
384
help="Set checker interval for client")
385
parser.add_argument("--approve-by-default", action="store_true",
386
default=None, dest="approved_by_default",
387
help="Set client to be approved by default")
388
parser.add_argument("--deny-by-default", action="store_false",
389
dest="approved_by_default",
390
help="Set client to be denied by default")
391
parser.add_argument("--approval-delay",
392
help="Set delay before client approve/deny")
393
parser.add_argument("--approval-duration",
394
help="Set duration of one client approval")
395
parser.add_argument("-H", "--host", help="Set host for client")
396
parser.add_argument("-s", "--secret",
397
type=argparse.FileType(mode="rb"),
398
help="Set password blob (file) for client")
399
parser.add_argument("-A", "--approve", action="store_true",
400
help="Approve any current client request")
401
parser.add_argument("-D", "--deny", action="store_true",
402
help="Deny any current client request")
403
parser.add_argument("--check", action="store_true",
404
help="Run self-test")
405
parser.add_argument("client", nargs="*", help="Client name")
406
options = parser.parse_args()
408
if has_actions(options) and not (options.client or options.all):
409
parser.error("Options require clients names or --all.")
410
if options.verbose and has_actions(options):
411
parser.error("--verbose can only be used alone.")
412
if options.dump_json and (options.verbose
413
or has_actions(options)):
414
parser.error("--dump-json can only be used alone.")
415
if options.all and not has_actions(options):
416
parser.error("--all requires an action.")
419
bus = dbus.SystemBus()
420
mandos_dbus_objc = bus.get_object(busname, server_path)
421
except dbus.exceptions.DBusException:
422
log.critical("Could not connect to Mandos server")
425
mandos_serv = dbus.Interface(mandos_dbus_objc,
426
dbus_interface=server_interface)
427
mandos_serv_object_manager = dbus.Interface(
428
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
430
# block stderr since dbus library prints to stderr
431
null = os.open(os.path.devnull, os.O_RDWR)
432
stderrcopy = os.dup(sys.stderr.fileno())
433
os.dup2(null, sys.stderr.fileno())
437
mandos_clients = {path: ifs_and_props[client_interface]
438
for path, ifs_and_props in
439
mandos_serv_object_manager
440
.GetManagedObjects().items()
441
if client_interface in ifs_and_props}
444
os.dup2(stderrcopy, sys.stderr.fileno())
446
except dbus.exceptions.DBusException as e:
447
log.critical("Failed to access Mandos server through D-Bus:"
451
# Compile dict of (clients: properties) to process
454
if options.all or not options.client:
455
clients = {bus.get_object(busname, path): properties
456
for path, properties in mandos_clients.items()}
458
for name in options.client:
459
for path, client in mandos_clients.items():
460
if client["Name"] == name:
461
client_objc = bus.get_object(busname, path)
462
clients[client_objc] = client
465
log.critical("Client not found on server: %r", name)
468
if not has_actions(options) and clients:
469
if options.verbose or options.dump_json:
470
keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
471
"Created", "Interval", "Host", "KeyID",
472
"Fingerprint", "CheckerRunning",
473
"LastEnabled", "ApprovalPending",
474
"ApprovedByDefault", "LastApprovalRequest",
475
"ApprovalDelay", "ApprovalDuration",
476
"Checker", "ExtendedTimeout", "Expires",
479
keywords = defaultkeywords
481
if options.dump_json:
482
json.dump({client["Name"]: {key:
484
if isinstance(client[key],
488
for client in clients.values()},
489
fp=sys.stdout, indent=4,
490
separators=(',', ': '))
493
print_clients(clients.values(), keywords)
495
# Process each client in the list by all selected options
496
for client in clients:
498
def set_client_prop(prop, value):
499
"""Set a Client D-Bus property"""
500
client.Set(client_interface, prop, value,
501
dbus_interface=dbus.PROPERTIES_IFACE)
503
def set_client_prop_ms(prop, value):
504
"""Set a Client D-Bus property, converted
505
from a string to milliseconds."""
506
set_client_prop(prop,
507
string_to_delta(value).total_seconds()
511
mandos_serv.RemoveClient(client.__dbus_object_path__)
513
set_client_prop("Enabled", dbus.Boolean(True))
515
set_client_prop("Enabled", dbus.Boolean(False))
516
if options.bump_timeout:
517
set_client_prop("LastCheckedOK", "")
518
if options.start_checker:
519
set_client_prop("CheckerRunning", dbus.Boolean(True))
520
if options.stop_checker:
521
set_client_prop("CheckerRunning", dbus.Boolean(False))
522
if options.is_enabled:
523
if client.Get(client_interface, "Enabled",
524
dbus_interface=dbus.PROPERTIES_IFACE):
163
parser = OptionParser(version = "%%prog %s" % version)
164
parser.add_option("-a", "--all", action="store_true",
165
help="Select all clients")
166
parser.add_option("-v", "--verbose", action="store_true",
167
help="Print all fields")
168
parser.add_option("-e", "--enable", action="store_true",
169
help="Enable client")
170
parser.add_option("-d", "--disable", action="store_true",
171
help="disable client")
172
parser.add_option("-b", "--bump-timeout", action="store_true",
173
help="Bump timeout for client")
174
parser.add_option("--start-checker", action="store_true",
175
help="Start checker for client")
176
parser.add_option("--stop-checker", action="store_true",
177
help="Stop checker for client")
178
parser.add_option("-V", "--is-enabled", action="store_true",
179
help="Check if client is enabled")
180
parser.add_option("-r", "--remove", action="store_true",
181
help="Remove client")
182
parser.add_option("-c", "--checker", type="string",
183
help="Set checker command for client")
184
parser.add_option("-t", "--timeout", type="string",
185
help="Set timeout for client")
186
parser.add_option("-i", "--interval", type="string",
187
help="Set checker interval for client")
188
parser.add_option("--approve-by-default", action="store_true",
189
dest="approved_by_default",
190
help="Set client to be approved by default")
191
parser.add_option("--deny-by-default", action="store_false",
192
dest="approved_by_default",
193
help="Set client to be denied by default")
194
parser.add_option("--approval-delay", type="string",
195
help="Set delay before client approve/deny")
196
parser.add_option("--approval-duration", type="string",
197
help="Set duration of one client approval")
198
parser.add_option("-H", "--host", type="string",
199
help="Set host for client")
200
parser.add_option("-s", "--secret", type="string",
201
help="Set password blob (file) for client")
202
parser.add_option("-A", "--approve", action="store_true",
203
help="Approve any current client request")
204
parser.add_option("-D", "--deny", action="store_true",
205
help="Deny any current client request")
206
options, client_names = parser.parse_args()
208
if has_actions(options) and not client_names and not options.all:
209
parser.error("Options require clients names or --all.")
210
if options.verbose and has_actions(options):
211
parser.error("--verbose can only be used alone or with"
213
if options.all and not has_actions(options):
214
parser.error("--all requires an action.")
217
bus = dbus.SystemBus()
218
mandos_dbus_objc = bus.get_object(busname, server_path)
219
except dbus.exceptions.DBusException:
220
print("Could not connect to Mandos server",
224
mandos_serv = dbus.Interface(mandos_dbus_objc,
225
dbus_interface = server_interface)
227
#block stderr since dbus library prints to stderr
228
null = os.open(os.path.devnull, os.O_RDWR)
229
stderrcopy = os.dup(sys.stderr.fileno())
230
os.dup2(null, sys.stderr.fileno())
234
mandos_clients = mandos_serv.GetAllClientsWithProperties()
237
os.dup2(stderrcopy, sys.stderr.fileno())
239
except dbus.exceptions.DBusException, e:
240
print("Access denied: Accessing mandos server through dbus.",
244
# Compile dict of (clients: properties) to process
247
if options.all or not client_names:
248
clients = dict((bus.get_object(busname, path), properties)
249
for path, properties in
250
mandos_clients.iteritems())
252
for name in client_names:
253
for path, client in mandos_clients.iteritems():
254
if client["Name"] == name:
255
client_objc = bus.get_object(busname, path)
256
clients[client_objc] = client
259
print("Client not found on server: %r" % name,
528
if options.checker is not None:
529
set_client_prop("Checker", options.checker)
530
if options.host is not None:
531
set_client_prop("Host", options.host)
532
if options.interval is not None:
533
set_client_prop_ms("Interval", options.interval)
534
if options.approval_delay is not None:
535
set_client_prop_ms("ApprovalDelay",
536
options.approval_delay)
537
if options.approval_duration is not None:
538
set_client_prop_ms("ApprovalDuration",
539
options.approval_duration)
540
if options.timeout is not None:
541
set_client_prop_ms("Timeout", options.timeout)
542
if options.extended_timeout is not None:
543
set_client_prop_ms("ExtendedTimeout",
544
options.extended_timeout)
545
if options.secret is not None:
546
set_client_prop("Secret",
547
dbus.ByteArray(options.secret.read()))
548
if options.approved_by_default is not None:
549
set_client_prop("ApprovedByDefault",
551
.approved_by_default))
553
client.Approve(dbus.Boolean(True),
554
dbus_interface=client_interface)
556
client.Approve(dbus.Boolean(False),
557
dbus_interface=client_interface)
560
class Test_milliseconds_to_string(unittest.TestCase):
562
self.assertEqual(milliseconds_to_string(93785000),
564
def test_no_days(self):
565
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
566
def test_all_zero(self):
567
self.assertEqual(milliseconds_to_string(0), "00:00:00")
568
def test_no_fractional_seconds(self):
569
self.assertEqual(milliseconds_to_string(400), "00:00:00")
570
self.assertEqual(milliseconds_to_string(900), "00:00:00")
571
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
573
class Test_string_to_delta(unittest.TestCase):
574
def test_handles_basic_rfc3339(self):
575
self.assertEqual(string_to_delta("PT2H"),
576
datetime.timedelta(0, 7200))
577
def test_falls_back_to_pre_1_6_1_with_warning(self):
578
# assertLogs only exists in Python 3.4
579
if hasattr(self, "assertLogs"):
580
with self.assertLogs(log, logging.WARNING):
581
value = string_to_delta("2h")
263
if not has_actions(options) and clients:
265
keywords = ("Name", "Enabled", "Timeout",
266
"LastCheckedOK", "Created", "Interval",
267
"Host", "Fingerprint", "CheckerRunning",
268
"LastEnabled", "ApprovalPending",
270
"LastApprovalRequest", "ApprovalDelay",
271
"ApprovalDuration", "Checker")
273
keywords = defaultkeywords
275
print_clients(clients.values(), keywords)
583
value = string_to_delta("2h")
584
self.assertEqual(value, datetime.timedelta(0, 7200))
586
class Test_TableOfClients(unittest.TestCase):
592
"Bool": "A D-BUS Boolean",
593
"NonDbusBoolean": "A Non-D-BUS Boolean",
594
"Integer": "An Integer",
595
"Timeout": "Timedelta 1",
596
"Interval": "Timedelta 2",
597
"ApprovalDelay": "Timedelta 3",
598
"ApprovalDuration": "Timedelta 4",
599
"ExtendedTimeout": "Timedelta 5",
600
"String": "A String",
602
self.keywords = ["Attr1", "AttrTwo"]
608
"Bool": dbus.Boolean(False),
609
"NonDbusBoolean": False,
613
"ApprovalDelay": 2000,
614
"ApprovalDuration": 3000,
615
"ExtendedTimeout": 4000,
622
"Bool": dbus.Boolean(True),
623
"NonDbusBoolean": True,
626
"Interval": 93786000,
627
"ApprovalDelay": 93787000,
628
"ApprovalDuration": 93788000,
629
"ExtendedTimeout": 93789000,
630
"String": "A huge string which will not fit," * 10,
633
def test_short_header(self):
634
rows = TableOfClients(self.clients, self.keywords,
635
self.tablewords).rows()
640
self.assertEqual(rows, expected_rows)
641
def test_booleans(self):
642
keywords = ["Bool", "NonDbusBoolean"]
643
rows = TableOfClients(self.clients, keywords,
644
self.tablewords).rows()
646
"A D-BUS Boolean A Non-D-BUS Boolean",
650
self.assertEqual(rows, expected_rows)
651
def test_milliseconds_detection(self):
652
keywords = ["Integer", "Timeout", "Interval", "ApprovalDelay",
653
"ApprovalDuration", "ExtendedTimeout"]
654
rows = TableOfClients(self.clients, keywords,
655
self.tablewords).rows()
657
An Integer Timedelta 1 Timedelta 2 Timedelta 3 Timedelta 4 Timedelta 5
658
0 00:00:00 00:00:01 00:00:02 00:00:03 00:00:04
659
1 1T02:03:05 1T02:03:06 1T02:03:07 1T02:03:08 1T02:03:09
662
self.assertEqual(rows, expected_rows)
663
def test_empty_and_long_string_values(self):
664
keywords = ["String"]
665
rows = TableOfClients(self.clients, keywords,
666
self.tablewords).rows()
670
A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,
673
self.assertEqual(rows, expected_rows)
677
def should_only_run_tests():
678
parser = argparse.ArgumentParser(add_help=False)
679
parser.add_argument("--check", action='store_true')
680
args, unknown_args = parser.parse_known_args()
681
run_tests = args.check
683
# Remove --check argument from sys.argv
684
sys.argv[1:] = unknown_args
687
# Add all tests from doctest strings
688
def load_tests(loader, tests, none):
690
tests.addTests(doctest.DocTestSuite())
277
# Process each client in the list by all selected options
278
for client in clients:
280
mandos_serv.RemoveClient(client.__dbus_object_path__)
282
client.Enable(dbus_interface=client_interface)
284
client.Disable(dbus_interface=client_interface)
285
if options.bump_timeout:
286
client.CheckedOK(dbus_interface=client_interface)
287
if options.start_checker:
288
client.StartChecker(dbus_interface=client_interface)
289
if options.stop_checker:
290
client.StopChecker(dbus_interface=client_interface)
291
if options.is_enabled:
292
sys.exit(0 if client.Get(client_interface,
294
dbus_interface=dbus.PROPERTIES_IFACE)
297
client.Set(client_interface, "Checker", options.checker,
298
dbus_interface=dbus.PROPERTIES_IFACE)
300
client.Set(client_interface, "Host", options.host,
301
dbus_interface=dbus.PROPERTIES_IFACE)
303
client.Set(client_interface, "Interval",
304
timedelta_to_milliseconds
305
(string_to_delta(options.interval)),
306
dbus_interface=dbus.PROPERTIES_IFACE)
307
if options.approval_delay:
308
client.Set(client_interface, "ApprovalDelay",
309
timedelta_to_milliseconds
310
(string_to_delta(options.
312
dbus_interface=dbus.PROPERTIES_IFACE)
313
if options.approval_duration:
314
client.Set(client_interface, "ApprovalDuration",
315
timedelta_to_milliseconds
316
(string_to_delta(options.
318
dbus_interface=dbus.PROPERTIES_IFACE)
320
client.Set(client_interface, "Timeout",
321
timedelta_to_milliseconds
322
(string_to_delta(options.timeout)),
323
dbus_interface=dbus.PROPERTIES_IFACE)
325
client.Set(client_interface, "Secret",
326
dbus.ByteArray(open(options.secret,
328
dbus_interface=dbus.PROPERTIES_IFACE)
329
if options.approved_by_default is not None:
330
client.Set(client_interface, "ApprovedByDefault",
332
.approved_by_default),
333
dbus_interface=dbus.PROPERTIES_IFACE)
335
client.Approve(dbus.Boolean(True),
336
dbus_interface=client_interface)
338
client.Approve(dbus.Boolean(False),
339
dbus_interface=client_interface)
693
341
if __name__ == "__main__":
694
if should_only_run_tests():
695
# Call using ./tdd-python-script --check [--verbose]