2
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
2
# -*- mode: python; coding: utf-8 -*-
4
4
# Mandos Monitor - Control and monitor the Mandos server
6
# Copyright © 2008-2019 Teddy Hogeborn
7
# Copyright © 2008-2019 Björn Påhlsson
9
# This file is part of Mandos.
11
# Mandos is free software: you can redistribute it and/or modify it
12
# under the terms of the GNU General Public License as published by
6
# Copyright © 2008-2010 Teddy Hogeborn
7
# Copyright © 2008-2010 Björn Påhlsson
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU General Public License as published by
13
11
# the Free Software Foundation, either version 3 of the License, or
14
12
# (at your option) any later version.
16
# Mandos is distributed in the hope that it will be useful, but
17
# WITHOUT ANY WARRANTY; without even the implied warranty of
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
18
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19
17
# GNU General Public License for more details.
21
19
# You should have received a copy of the GNU General Public License
22
# along with Mandos. If not, see <http://www.gnu.org/licenses/>.
24
# Contact the authors at <mandos@recompile.se>.
27
from __future__ import (division, absolute_import, print_function,
31
from future_builtins import *
20
# along with this program. If not, see <http://www.gnu.org/licenses/>.
22
# Contact the authors at <mandos@fukt.bsnet.se>.
25
from __future__ import division
28
from optparse import OptionParser
48
# Show warnings by default
49
if not sys.warnoptions:
51
warnings.simplefilter("default")
53
log = logging.getLogger(sys.argv[0])
54
logging.basicConfig(level="INFO", # Show info level messages
55
format="%(message)s") # Show basic log messages
57
logging.captureWarnings(True) # Show warnings via the logging system
59
if sys.version_info.major == 2:
62
locale.setlocale(locale.LC_ALL, "")
34
locale.setlocale(locale.LC_ALL, u'')
68
"LastCheckedOK": "Last Successful Check",
69
"LastApprovalRequest": "Last Approval Request",
71
"Interval": "Interval",
73
"Fingerprint": "Fingerprint",
75
"CheckerRunning": "Check Is Running",
76
"LastEnabled": "Last Enabled",
77
"ApprovalPending": "Approval Is Pending",
78
"ApprovedByDefault": "Approved By Default",
79
"ApprovalDelay": "Approval Delay",
80
"ApprovalDuration": "Approval Duration",
82
"ExtendedTimeout": "Extended Timeout",
84
"LastCheckerStatus": "Last Checker Status",
86
defaultkeywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
87
domain = "se.recompile"
88
busname = domain + ".Mandos"
90
server_interface = domain + ".Mandos"
91
client_interface = domain + ".Mandos.Client"
96
dbus.OBJECT_MANAGER_IFACE
97
except AttributeError:
98
dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"
38
'Enabled': u'Enabled',
39
'Timeout': u'Timeout',
40
'LastCheckedOK': u'Last Successful Check',
41
'LastApprovalRequest': u'Last Approval Request',
42
'Created': u'Created',
43
'Interval': u'Interval',
45
'Fingerprint': u'Fingerprint',
46
'CheckerRunning': u'Check Is Running',
47
'LastEnabled': u'Last Enabled',
48
'ApprovalPending': u'Approval Is Pending',
49
'ApprovedByDefault': u'Approved By Default',
50
'ApprovalDelay': u"Approval Delay",
51
'ApprovalDuration': u"Approval Duration",
52
'Checker': u'Checker',
54
defaultkeywords = ('Name', 'Enabled', 'Timeout', 'LastCheckedOK')
55
domain = 'se.bsnet.fukt'
56
busname = domain + '.Mandos'
58
server_interface = domain + '.Mandos'
59
client_interface = domain + '.Mandos.Client'
62
def timedelta_to_milliseconds(td):
63
"Convert a datetime.timedelta object to milliseconds"
64
return ((td.days * 24 * 60 * 60 * 1000)
66
+ (td.microseconds // 1000))
101
68
def milliseconds_to_string(ms):
102
69
td = datetime.timedelta(0, 0, 0, ms)
103
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
104
.format(days="{}T".format(td.days) if td.days else "",
105
hours=td.seconds // 3600,
106
minutes=(td.seconds % 3600) // 60,
107
seconds=td.seconds % 60))
110
def rfc3339_duration_to_delta(duration):
111
"""Parse an RFC 3339 "duration" and return a datetime.timedelta
113
>>> rfc3339_duration_to_delta("P7D")
114
datetime.timedelta(7)
115
>>> rfc3339_duration_to_delta("PT60S")
116
datetime.timedelta(0, 60)
117
>>> rfc3339_duration_to_delta("PT60M")
118
datetime.timedelta(0, 3600)
119
>>> rfc3339_duration_to_delta("P60M")
120
datetime.timedelta(1680)
121
>>> rfc3339_duration_to_delta("PT24H")
122
datetime.timedelta(1)
123
>>> rfc3339_duration_to_delta("P1W")
124
datetime.timedelta(7)
125
>>> rfc3339_duration_to_delta("PT5M30S")
126
datetime.timedelta(0, 330)
127
>>> rfc3339_duration_to_delta("P1DT3M20S")
128
datetime.timedelta(1, 200)
129
>>> # Can not be empty:
130
>>> rfc3339_duration_to_delta("")
131
Traceback (most recent call last):
133
ValueError: Invalid RFC 3339 duration: u''
134
>>> # Must start with "P":
135
>>> rfc3339_duration_to_delta("1D")
136
Traceback (most recent call last):
138
ValueError: Invalid RFC 3339 duration: u'1D'
139
>>> # Must use correct order
140
>>> rfc3339_duration_to_delta("PT1S2M")
141
Traceback (most recent call last):
143
ValueError: Invalid RFC 3339 duration: u'PT1S2M'
144
>>> # Time needs time marker
145
>>> rfc3339_duration_to_delta("P1H2S")
146
Traceback (most recent call last):
148
ValueError: Invalid RFC 3339 duration: u'P1H2S'
149
>>> # Weeks can not be combined with anything else
150
>>> rfc3339_duration_to_delta("P1D2W")
151
Traceback (most recent call last):
153
ValueError: Invalid RFC 3339 duration: u'P1D2W'
154
>>> rfc3339_duration_to_delta("P2W2H")
155
Traceback (most recent call last):
157
ValueError: Invalid RFC 3339 duration: u'P2W2H'
160
# Parsing an RFC 3339 duration with regular expressions is not
161
# possible - there would have to be multiple places for the same
162
# values, like seconds. The current code, while more esoteric, is
163
# cleaner without depending on a parsing library. If Python had a
164
# built-in library for parsing we would use it, but we'd like to
165
# avoid excessive use of external libraries.
167
# New type for defining tokens, syntax, and semantics all-in-one
168
Token = collections.namedtuple("Token", (
169
"regexp", # To match token; if "value" is not None, must have
170
# a "group" containing digits
171
"value", # datetime.timedelta or None
172
"followers")) # Tokens valid after this token
173
# RFC 3339 "duration" tokens, syntax, and semantics; taken from
174
# the "duration" ABNF definition in RFC 3339, Appendix A.
175
token_end = Token(re.compile(r"$"), None, frozenset())
176
token_second = Token(re.compile(r"(\d+)S"),
177
datetime.timedelta(seconds=1),
178
frozenset((token_end, )))
179
token_minute = Token(re.compile(r"(\d+)M"),
180
datetime.timedelta(minutes=1),
181
frozenset((token_second, token_end)))
182
token_hour = Token(re.compile(r"(\d+)H"),
183
datetime.timedelta(hours=1),
184
frozenset((token_minute, token_end)))
185
token_time = Token(re.compile(r"T"),
187
frozenset((token_hour, token_minute,
189
token_day = Token(re.compile(r"(\d+)D"),
190
datetime.timedelta(days=1),
191
frozenset((token_time, token_end)))
192
token_month = Token(re.compile(r"(\d+)M"),
193
datetime.timedelta(weeks=4),
194
frozenset((token_day, token_end)))
195
token_year = Token(re.compile(r"(\d+)Y"),
196
datetime.timedelta(weeks=52),
197
frozenset((token_month, token_end)))
198
token_week = Token(re.compile(r"(\d+)W"),
199
datetime.timedelta(weeks=1),
200
frozenset((token_end, )))
201
token_duration = Token(re.compile(r"P"), None,
202
frozenset((token_year, token_month,
203
token_day, token_time,
205
# Define starting values:
207
value = datetime.timedelta()
209
# Following valid tokens
210
followers = frozenset((token_duration, ))
211
# String left to parse
213
# Loop until end token is found
214
while found_token is not token_end:
215
# Search for any currently valid tokens
216
for token in followers:
217
match = token.regexp.match(s)
218
if match is not None:
220
if token.value is not None:
221
# Value found, parse digits
222
factor = int(match.group(1), 10)
223
# Add to value so far
224
value += factor * token.value
225
# Strip token from string
226
s = token.regexp.sub("", s, 1)
229
# Set valid next tokens
230
followers = found_token.followers
233
# No currently valid tokens were found
234
raise ValueError("Invalid RFC 3339 duration: {!r}"
70
return (u"%(days)s%(hours)02d:%(minutes)02d:%(seconds)02d"
71
% { "days": "%dT" % td.days if td.days else "",
72
"hours": td.seconds // 3600,
73
"minutes": (td.seconds % 3600) // 60,
74
"seconds": td.seconds % 60,
240
78
def string_to_delta(interval):
241
79
"""Parse a string and return a datetime.timedelta
245
return rfc3339_duration_to_delta(interval)
246
except ValueError as e:
247
log.warning("%s - Parsing as pre-1.6.1 interval instead",
249
return parse_pre_1_6_1_interval(interval)
252
def parse_pre_1_6_1_interval(interval):
253
"""Parse an interval string as documented by Mandos before 1.6.1, and
254
return a datetime.timedelta
255
>>> parse_pre_1_6_1_interval('7d')
81
>>> string_to_delta('7d')
256
82
datetime.timedelta(7)
257
>>> parse_pre_1_6_1_interval('60s')
83
>>> string_to_delta('60s')
258
84
datetime.timedelta(0, 60)
259
>>> parse_pre_1_6_1_interval('60m')
85
>>> string_to_delta('60m')
260
86
datetime.timedelta(0, 3600)
261
>>> parse_pre_1_6_1_interval('24h')
87
>>> string_to_delta('24h')
262
88
datetime.timedelta(1)
263
>>> parse_pre_1_6_1_interval('1w')
89
>>> string_to_delta(u'1w')
264
90
datetime.timedelta(7)
265
>>> parse_pre_1_6_1_interval('5m 30s')
91
>>> string_to_delta('5m 30s')
266
92
datetime.timedelta(0, 330)
267
>>> parse_pre_1_6_1_interval('')
268
datetime.timedelta(0)
269
>>> # Ignore unknown characters, allow any order and repetitions
270
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
271
datetime.timedelta(2, 480, 18000)
275
value = datetime.timedelta(0)
276
regexp = re.compile(r"(\d+)([dsmhw]?)")
278
for num, suffix in regexp.findall(interval):
280
value += datetime.timedelta(int(num))
282
value += datetime.timedelta(0, int(num))
284
value += datetime.timedelta(0, 0, 0, 0, int(num))
286
value += datetime.timedelta(0, 0, 0, 0, 0, int(num))
288
value += datetime.timedelta(0, 0, 0, 0, 0, 0, int(num))
290
value += datetime.timedelta(0, 0, 0, int(num))
94
timevalue = datetime.timedelta(0)
95
regexp = re.compile("\d+[dsmhw]")
97
for s in regexp.findall(interval):
99
suffix = unicode(s[-1])
102
delta = datetime.timedelta(value)
104
delta = datetime.timedelta(0, value)
106
delta = datetime.timedelta(0, 0, 0, 0, value)
108
delta = datetime.timedelta(0, 0, 0, 0, 0, value)
110
delta = datetime.timedelta(0, 0, 0, 0, 0, 0, value)
113
except (ValueError, IndexError):
294
118
def print_clients(clients, keywords):
295
print('\n'.join(table_rows_of_clients(clients, keywords)))
297
def table_rows_of_clients(clients, keywords):
298
119
def valuetostring(value, keyword):
299
if isinstance(value, dbus.Boolean):
300
return "Yes" if value else "No"
301
if keyword in ("Timeout", "Interval", "ApprovalDelay",
302
"ApprovalDuration", "ExtendedTimeout"):
120
if type(value) is dbus.Boolean:
121
return u"Yes" if value else u"No"
122
if keyword in (u"Timeout", u"Interval", u"ApprovalDelay",
123
u"ApprovalDuration"):
303
124
return milliseconds_to_string(value)
125
return unicode(value)
306
127
# Create format string to print table rows
307
format_string = " ".join("{{{key}:{width}}}".format(
308
width=max(len(tablewords[key]),
309
max(len(valuetostring(client[key], key))
310
for client in clients)),
313
# Start with header line
314
rows = [format_string.format(**tablewords)]
128
format_string = u' '.join(u'%%-%ds' %
129
max(len(tablewords[key]),
130
max(len(valuetostring(client[key],
136
print format_string % tuple(tablewords[key] for key in keywords)
315
137
for client in clients:
316
rows.append(format_string
317
.format(**{key: valuetostring(client[key], key)
318
for key in keywords}))
138
print format_string % tuple(valuetostring(client[key], key)
322
141
def has_actions(options):
323
142
return any((options.enable,
338
156
options.secret is not None,
344
parser = argparse.ArgumentParser()
345
parser.add_argument("--version", action="version",
346
version="%(prog)s {}".format(version),
347
help="show version number and exit")
348
parser.add_argument("-a", "--all", action="store_true",
349
help="Select all clients")
350
parser.add_argument("-v", "--verbose", action="store_true",
351
help="Print all fields")
352
parser.add_argument("-j", "--dump-json", action="store_true",
353
help="Dump client data in JSON format")
354
parser.add_argument("-e", "--enable", action="store_true",
355
help="Enable client")
356
parser.add_argument("-d", "--disable", action="store_true",
357
help="disable client")
358
parser.add_argument("-b", "--bump-timeout", action="store_true",
359
help="Bump timeout for client")
360
parser.add_argument("--start-checker", action="store_true",
361
help="Start checker for client")
362
parser.add_argument("--stop-checker", action="store_true",
363
help="Stop checker for client")
364
parser.add_argument("-V", "--is-enabled", action="store_true",
365
help="Check if client is enabled")
366
parser.add_argument("-r", "--remove", action="store_true",
367
help="Remove client")
368
parser.add_argument("-c", "--checker",
369
help="Set checker command for client")
370
parser.add_argument("-t", "--timeout",
371
help="Set timeout for client")
372
parser.add_argument("--extended-timeout",
373
help="Set extended timeout for client")
374
parser.add_argument("-i", "--interval",
375
help="Set checker interval for client")
376
parser.add_argument("--approve-by-default", action="store_true",
377
default=None, dest="approved_by_default",
378
help="Set client to be approved by default")
379
parser.add_argument("--deny-by-default", action="store_false",
380
dest="approved_by_default",
381
help="Set client to be denied by default")
382
parser.add_argument("--approval-delay",
383
help="Set delay before client approve/deny")
384
parser.add_argument("--approval-duration",
385
help="Set duration of one client approval")
386
parser.add_argument("-H", "--host", help="Set host for client")
387
parser.add_argument("-s", "--secret",
388
type=argparse.FileType(mode="rb"),
389
help="Set password blob (file) for client")
390
parser.add_argument("-A", "--approve", action="store_true",
391
help="Approve any current client request")
392
parser.add_argument("-D", "--deny", action="store_true",
393
help="Deny any current client request")
394
parser.add_argument("--check", action="store_true",
395
help="Run self-test")
396
parser.add_argument("client", nargs="*", help="Client name")
397
options = parser.parse_args()
399
if has_actions(options) and not (options.client or options.all):
400
parser.error("Options require clients names or --all.")
401
if options.verbose and has_actions(options):
402
parser.error("--verbose can only be used alone.")
403
if options.dump_json and (options.verbose
404
or has_actions(options)):
405
parser.error("--dump-json can only be used alone.")
406
if options.all and not has_actions(options):
407
parser.error("--all requires an action.")
410
bus = dbus.SystemBus()
411
mandos_dbus_objc = bus.get_object(busname, server_path)
412
except dbus.exceptions.DBusException:
413
log.critical("Could not connect to Mandos server")
416
mandos_serv = dbus.Interface(mandos_dbus_objc,
417
dbus_interface=server_interface)
418
mandos_serv_object_manager = dbus.Interface(
419
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
421
# block stderr since dbus library prints to stderr
422
null = os.open(os.path.devnull, os.O_RDWR)
423
stderrcopy = os.dup(sys.stderr.fileno())
424
os.dup2(null, sys.stderr.fileno())
428
mandos_clients = {path: ifs_and_props[client_interface]
429
for path, ifs_and_props in
430
mandos_serv_object_manager
431
.GetManagedObjects().items()
432
if client_interface in ifs_and_props}
435
os.dup2(stderrcopy, sys.stderr.fileno())
437
except dbus.exceptions.DBusException as e:
438
log.critical("Failed to access Mandos server through D-Bus:"
442
# Compile dict of (clients: properties) to process
445
if options.all or not options.client:
446
clients = {bus.get_object(busname, path): properties
447
for path, properties in mandos_clients.items()}
449
for name in options.client:
450
for path, client in mandos_clients.items():
451
if client["Name"] == name:
452
client_objc = bus.get_object(busname, path)
453
clients[client_objc] = client
456
log.critical("Client not found on server: %r", name)
459
if not has_actions(options) and clients:
460
if options.verbose or options.dump_json:
461
keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
462
"Created", "Interval", "Host", "KeyID",
463
"Fingerprint", "CheckerRunning",
464
"LastEnabled", "ApprovalPending",
465
"ApprovedByDefault", "LastApprovalRequest",
466
"ApprovalDelay", "ApprovalDuration",
467
"Checker", "ExtendedTimeout", "Expires",
470
keywords = defaultkeywords
472
if options.dump_json:
473
json.dump({client["Name"]: {key:
475
if isinstance(client[key],
479
for client in clients.values()},
480
fp=sys.stdout, indent=4,
481
separators=(',', ': '))
484
print_clients(clients.values(), keywords)
486
# Process each client in the list by all selected options
487
for client in clients:
489
def set_client_prop(prop, value):
490
"""Set a Client D-Bus property"""
491
client.Set(client_interface, prop, value,
492
dbus_interface=dbus.PROPERTIES_IFACE)
494
def set_client_prop_ms(prop, value):
495
"""Set a Client D-Bus property, converted
496
from a string to milliseconds."""
497
set_client_prop(prop,
498
string_to_delta(value).total_seconds()
502
mandos_serv.RemoveClient(client.__dbus_object_path__)
504
set_client_prop("Enabled", dbus.Boolean(True))
506
set_client_prop("Enabled", dbus.Boolean(False))
507
if options.bump_timeout:
508
set_client_prop("LastCheckedOK", "")
509
if options.start_checker:
510
set_client_prop("CheckerRunning", dbus.Boolean(True))
511
if options.stop_checker:
512
set_client_prop("CheckerRunning", dbus.Boolean(False))
513
if options.is_enabled:
514
if client.Get(client_interface, "Enabled",
515
dbus_interface=dbus.PROPERTIES_IFACE):
161
parser = OptionParser(version = "%%prog %s" % version)
162
parser.add_option("-a", "--all", action="store_true",
163
help="Select all clients")
164
parser.add_option("-v", "--verbose", action="store_true",
165
help="Print all fields")
166
parser.add_option("-e", "--enable", action="store_true",
167
help="Enable client")
168
parser.add_option("-d", "--disable", action="store_true",
169
help="disable client")
170
parser.add_option("-b", "--bump-timeout", action="store_true",
171
help="Bump timeout for client")
172
parser.add_option("--start-checker", action="store_true",
173
help="Start checker for client")
174
parser.add_option("--stop-checker", action="store_true",
175
help="Stop checker for client")
176
parser.add_option("-V", "--is-enabled", action="store_true",
177
help="Check if client is enabled")
178
parser.add_option("-r", "--remove", action="store_true",
179
help="Remove client")
180
parser.add_option("-c", "--checker", type="string",
181
help="Set checker command for client")
182
parser.add_option("-t", "--timeout", type="string",
183
help="Set timeout for client")
184
parser.add_option("-i", "--interval", type="string",
185
help="Set checker interval for client")
186
parser.add_option("--approve-by-default", action="store_true",
187
dest=u"approved_by_default",
188
help="Set client to be approved by default")
189
parser.add_option("--deny-by-default", action="store_false",
190
dest=u"approved_by_default",
191
help="Set client to be denied by default")
192
parser.add_option("--approval-delay", type="string",
193
help="Set delay before client approve/deny")
194
parser.add_option("--approval-duration", type="string",
195
help="Set duration of one client approval")
196
parser.add_option("-H", "--host", type="string",
197
help="Set host for client")
198
parser.add_option("-s", "--secret", type="string",
199
help="Set password blob (file) for client")
200
parser.add_option("-A", "--approve", action="store_true",
201
help="Approve any current client request")
202
parser.add_option("-D", "--deny", action="store_true",
203
help="Deny any current client request")
204
options, client_names = parser.parse_args()
206
if has_actions(options) and not client_names and not options.all:
207
parser.error('Options require clients names or --all.')
208
if options.verbose and has_actions(options):
209
parser.error('--verbose can only be used alone or with'
211
if options.all and not has_actions(options):
212
parser.error('--all requires an action.')
215
bus = dbus.SystemBus()
216
mandos_dbus_objc = bus.get_object(busname, server_path)
217
except dbus.exceptions.DBusException:
218
print >> sys.stderr, "Could not connect to Mandos server"
221
mandos_serv = dbus.Interface(mandos_dbus_objc,
222
dbus_interface = server_interface)
224
#block stderr since dbus library prints to stderr
225
null = os.open(os.path.devnull, os.O_RDWR)
226
stderrcopy = os.dup(sys.stderr.fileno())
227
os.dup2(null, sys.stderr.fileno())
231
mandos_clients = mandos_serv.GetAllClientsWithProperties()
234
os.dup2(stderrcopy, sys.stderr.fileno())
236
except dbus.exceptions.DBusException, e:
237
print >> sys.stderr, "Access denied: Accessing mandos server through dbus."
240
# Compile dict of (clients: properties) to process
243
if options.all or not client_names:
244
clients = dict((bus.get_object(busname, path), properties)
245
for path, properties in
246
mandos_clients.iteritems())
248
for name in client_names:
249
for path, client in mandos_clients.iteritems():
250
if client['Name'] == name:
251
client_objc = bus.get_object(busname, path)
252
clients[client_objc] = client
255
print >> sys.stderr, "Client not found on server: %r" % name
519
if options.checker is not None:
520
set_client_prop("Checker", options.checker)
521
if options.host is not None:
522
set_client_prop("Host", options.host)
523
if options.interval is not None:
524
set_client_prop_ms("Interval", options.interval)
525
if options.approval_delay is not None:
526
set_client_prop_ms("ApprovalDelay",
527
options.approval_delay)
528
if options.approval_duration is not None:
529
set_client_prop_ms("ApprovalDuration",
530
options.approval_duration)
531
if options.timeout is not None:
532
set_client_prop_ms("Timeout", options.timeout)
533
if options.extended_timeout is not None:
534
set_client_prop_ms("ExtendedTimeout",
535
options.extended_timeout)
536
if options.secret is not None:
537
set_client_prop("Secret",
538
dbus.ByteArray(options.secret.read()))
539
if options.approved_by_default is not None:
540
set_client_prop("ApprovedByDefault",
542
.approved_by_default))
544
client.Approve(dbus.Boolean(True),
545
dbus_interface=client_interface)
547
client.Approve(dbus.Boolean(False),
548
dbus_interface=client_interface)
551
class Test_milliseconds_to_string(unittest.TestCase):
553
self.assertEqual(milliseconds_to_string(93785000),
555
def test_no_days(self):
556
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
557
def test_all_zero(self):
558
self.assertEqual(milliseconds_to_string(0), "00:00:00")
559
def test_no_fractional_seconds(self):
560
self.assertEqual(milliseconds_to_string(400), "00:00:00")
561
self.assertEqual(milliseconds_to_string(900), "00:00:00")
562
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
564
class Test_string_to_delta(unittest.TestCase):
565
def test_handles_basic_rfc3339(self):
566
self.assertEqual(string_to_delta("PT2H"),
567
datetime.timedelta(0, 7200))
568
def test_falls_back_to_pre_1_6_1_with_warning(self):
569
# assertLogs only exists in Python 3.4
570
if hasattr(self, "assertLogs"):
571
with self.assertLogs(log, logging.WARNING):
572
value = string_to_delta("2h")
258
if not has_actions(options) and clients:
260
keywords = ('Name', 'Enabled', 'Timeout',
261
'LastCheckedOK', 'Created', 'Interval',
262
'Host', 'Fingerprint', 'CheckerRunning',
263
'LastEnabled', 'ApprovalPending',
265
'LastApprovalRequest', 'ApprovalDelay',
266
'ApprovalDuration', 'Checker')
268
keywords = defaultkeywords
270
print_clients(clients.values(), keywords)
574
value = string_to_delta("2h")
575
self.assertEqual(value, datetime.timedelta(0, 7200))
577
class Test_table_rows_of_clients(unittest.TestCase):
580
self.old_tablewords = tablewords
585
"Bool": "A D-BUS Boolean",
586
"NonDbusBoolean": "A Non-D-BUS Boolean",
587
"Integer": "An Integer",
588
"Timeout": "Timedelta 1",
589
"Interval": "Timedelta 2",
590
"ApprovalDelay": "Timedelta 3",
591
"ApprovalDuration": "Timedelta 4",
592
"ExtendedTimeout": "Timedelta 5",
593
"String": "A String",
595
self.keywords = ["Attr1", "AttrTwo"]
601
"Bool": dbus.Boolean(False),
602
"NonDbusBoolean": False,
606
"ApprovalDelay": 2000,
607
"ApprovalDuration": 3000,
608
"ExtendedTimeout": 4000,
615
"Bool": dbus.Boolean(True),
616
"NonDbusBoolean": True,
619
"Interval": 93786000,
620
"ApprovalDelay": 93787000,
621
"ApprovalDuration": 93788000,
622
"ExtendedTimeout": 93789000,
623
"String": "A huge string which will not fit," * 10,
628
tablewords = self.old_tablewords
629
def test_short_header(self):
630
rows = table_rows_of_clients(self.clients, self.keywords)
635
self.assertEqual(rows, expected_rows)
636
def test_booleans(self):
637
keywords = ["Bool", "NonDbusBoolean"]
638
rows = table_rows_of_clients(self.clients, keywords)
640
"A D-BUS Boolean A Non-D-BUS Boolean",
644
self.assertEqual(rows, expected_rows)
645
def test_milliseconds_detection(self):
646
keywords = ["Integer", "Timeout", "Interval", "ApprovalDelay",
647
"ApprovalDuration", "ExtendedTimeout"]
648
rows = table_rows_of_clients(self.clients, keywords)
650
An Integer Timedelta 1 Timedelta 2 Timedelta 3 Timedelta 4 Timedelta 5
651
0 00:00:00 00:00:01 00:00:02 00:00:03 00:00:04
652
1 1T02:03:05 1T02:03:06 1T02:03:07 1T02:03:08 1T02:03:09
655
self.assertEqual(rows, expected_rows)
656
def test_empty_and_long_string_values(self):
657
keywords = ["String"]
658
rows = table_rows_of_clients(self.clients, keywords)
662
A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,
665
self.assertEqual(rows, expected_rows)
669
def should_only_run_tests():
670
parser = argparse.ArgumentParser(add_help=False)
671
parser.add_argument("--check", action='store_true')
672
args, unknown_args = parser.parse_known_args()
673
run_tests = args.check
675
# Remove --check argument from sys.argv
676
sys.argv[1:] = unknown_args
679
# Add all tests from doctest strings
680
def load_tests(loader, tests, none):
682
tests.addTests(doctest.DocTestSuite())
685
if __name__ == "__main__":
686
if should_only_run_tests():
687
# Call using ./tdd-python-script --check [--verbose]
272
# Process each client in the list by all selected options
273
for client in clients:
275
mandos_serv.RemoveClient(client.__dbus_object_path__)
277
client.Enable(dbus_interface=client_interface)
279
client.Disable(dbus_interface=client_interface)
280
if options.bump_timeout:
281
client.CheckedOK(dbus_interface=client_interface)
282
if options.start_checker:
283
client.StartChecker(dbus_interface=client_interface)
284
if options.stop_checker:
285
client.StopChecker(dbus_interface=client_interface)
286
if options.is_enabled:
287
sys.exit(0 if client.Get(client_interface,
289
dbus_interface=dbus.PROPERTIES_IFACE)
292
client.Set(client_interface, u"Checker", options.checker,
293
dbus_interface=dbus.PROPERTIES_IFACE)
295
client.Set(client_interface, u"Host", options.host,
296
dbus_interface=dbus.PROPERTIES_IFACE)
298
client.Set(client_interface, u"Interval",
299
timedelta_to_milliseconds
300
(string_to_delta(options.interval)),
301
dbus_interface=dbus.PROPERTIES_IFACE)
302
if options.approval_delay:
303
client.Set(client_interface, u"ApprovalDelay",
304
timedelta_to_milliseconds
305
(string_to_delta(options.
307
dbus_interface=dbus.PROPERTIES_IFACE)
308
if options.approval_duration:
309
client.Set(client_interface, u"ApprovalDuration",
310
timedelta_to_milliseconds
311
(string_to_delta(options.
313
dbus_interface=dbus.PROPERTIES_IFACE)
315
client.Set(client_interface, u"Timeout",
316
timedelta_to_milliseconds
317
(string_to_delta(options.timeout)),
318
dbus_interface=dbus.PROPERTIES_IFACE)
320
client.Set(client_interface, u"Secret",
321
dbus.ByteArray(open(options.secret,
323
dbus_interface=dbus.PROPERTIES_IFACE)
324
if options.approved_by_default is not None:
325
client.Set(client_interface, u"ApprovedByDefault",
327
.approved_by_default),
328
dbus_interface=dbus.PROPERTIES_IFACE)
330
client.Approve(dbus.Boolean(True),
331
dbus_interface=client_interface)
333
client.Approve(dbus.Boolean(False),
334
dbus_interface=client_interface)
336
if __name__ == '__main__':