2
# -*- mode: python; coding: utf-8; after-save-hook: (lambda () (let ((command (if (and (boundp 'tramp-file-name-structure) (string-match (car tramp-file-name-structure) (buffer-file-name))) (tramp-file-name-localname (tramp-dissect-file-name (buffer-file-name))) (buffer-file-name)))) (if (= (shell-command (format "%s --check" (shell-quote-argument command)) "*Test*") 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w)) (kill-buffer "*Test*")) (display-buffer "*Test*")))); -*-
4
# Mandos Monitor - Control and monitor the Mandos server
6
# Copyright © 2008-2019 Teddy Hogeborn
7
# Copyright © 2008-2019 Björn Påhlsson
9
# This file is part of Mandos.
11
# Mandos is free software: you can redistribute it and/or modify it
12
# under the terms of the GNU General Public License as published by
13
# the Free Software Foundation, either version 3 of the License, or
14
# (at your option) any later version.
16
# Mandos is distributed in the hope that it will be useful, but
17
# WITHOUT ANY WARRANTY; without even the implied warranty of
18
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19
# GNU General Public License for more details.
21
# You should have received a copy of the GNU General Public License
22
# along with Mandos. If not, see <http://www.gnu.org/licenses/>.
24
# Contact the authors at <mandos@recompile.se>.
27
from __future__ import (division, absolute_import, print_function,
31
from future_builtins import *
2
# -*- mode: python; coding: utf-8 -*-
4
from __future__ import division
7
from optparse import OptionParser
51
# Show warnings by default
52
if not sys.warnoptions:
54
warnings.simplefilter("default")
56
log = logging.getLogger(sys.argv[0])
57
logging.basicConfig(level="INFO", # Show info level messages
58
format="%(message)s") # Show basic log messages
60
logging.captureWarnings(True) # Show warnings via the logging system
62
if sys.version_info.major == 2:
65
locale.setlocale(locale.LC_ALL, "")
67
domain = "se.recompile"
68
busname = domain + ".Mandos"
70
server_interface = domain + ".Mandos"
71
client_interface = domain + ".Mandos.Client"
76
dbus.OBJECT_MANAGER_IFACE
77
except AttributeError:
78
dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"
12
locale.setlocale(locale.LC_ALL, u'')
16
'enabled': u'Enabled',
17
'timeout': u'Timeout',
18
'last_checked_ok': u'Last Successful Check',
19
'created': u'Created',
20
'interval': u'Interval',
22
'fingerprint': u'Fingerprint',
23
'checker_running': u'Check Is Running',
24
'last_enabled': u'Last Enabled',
25
'checker': u'Checker',
27
defaultkeywords = ('name', 'enabled', 'timeout', 'last_checked_ok',
29
domain = 'se.bsnet.fukt'
30
busname = domain + '.Mandos'
32
server_interface = domain + '.Mandos'
33
client_interface = domain + '.Mandos.Client'
35
bus = dbus.SystemBus()
36
mandos_dbus_objc = bus.get_object(busname, server_path)
37
mandos_serv = dbus.Interface(mandos_dbus_objc,
38
dbus_interface = server_interface)
39
mandos_clients = mandos_serv.GetAllClientsWithProperties()
41
def timedelta_to_milliseconds(td):
42
"Convert a datetime.timedelta object to milliseconds"
43
return ((td.days * 24 * 60 * 60 * 1000)
45
+ (td.microseconds // 1000))
81
47
def milliseconds_to_string(ms):
82
48
td = datetime.timedelta(0, 0, 0, ms)
83
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
84
.format(days="{}T".format(td.days) if td.days else "",
85
hours=td.seconds // 3600,
86
minutes=(td.seconds % 3600) // 60,
87
seconds=td.seconds % 60))
90
def rfc3339_duration_to_delta(duration):
91
"""Parse an RFC 3339 "duration" and return a datetime.timedelta
93
>>> rfc3339_duration_to_delta("P7D")
95
>>> rfc3339_duration_to_delta("PT60S")
96
datetime.timedelta(0, 60)
97
>>> rfc3339_duration_to_delta("PT60M")
98
datetime.timedelta(0, 3600)
99
>>> rfc3339_duration_to_delta("P60M")
100
datetime.timedelta(1680)
101
>>> rfc3339_duration_to_delta("PT24H")
102
datetime.timedelta(1)
103
>>> rfc3339_duration_to_delta("P1W")
104
datetime.timedelta(7)
105
>>> rfc3339_duration_to_delta("PT5M30S")
106
datetime.timedelta(0, 330)
107
>>> rfc3339_duration_to_delta("P1DT3M20S")
108
datetime.timedelta(1, 200)
109
>>> # Can not be empty:
110
>>> rfc3339_duration_to_delta("")
111
Traceback (most recent call last):
113
ValueError: Invalid RFC 3339 duration: u''
114
>>> # Must start with "P":
115
>>> rfc3339_duration_to_delta("1D")
116
Traceback (most recent call last):
118
ValueError: Invalid RFC 3339 duration: u'1D'
119
>>> # Must use correct order
120
>>> rfc3339_duration_to_delta("PT1S2M")
121
Traceback (most recent call last):
123
ValueError: Invalid RFC 3339 duration: u'PT1S2M'
124
>>> # Time needs time marker
125
>>> rfc3339_duration_to_delta("P1H2S")
126
Traceback (most recent call last):
128
ValueError: Invalid RFC 3339 duration: u'P1H2S'
129
>>> # Weeks can not be combined with anything else
130
>>> rfc3339_duration_to_delta("P1D2W")
131
Traceback (most recent call last):
133
ValueError: Invalid RFC 3339 duration: u'P1D2W'
134
>>> rfc3339_duration_to_delta("P2W2H")
135
Traceback (most recent call last):
137
ValueError: Invalid RFC 3339 duration: u'P2W2H'
140
# Parsing an RFC 3339 duration with regular expressions is not
141
# possible - there would have to be multiple places for the same
142
# values, like seconds. The current code, while more esoteric, is
143
# cleaner without depending on a parsing library. If Python had a
144
# built-in library for parsing we would use it, but we'd like to
145
# avoid excessive use of external libraries.
147
# New type for defining tokens, syntax, and semantics all-in-one
148
Token = collections.namedtuple("Token", (
149
"regexp", # To match token; if "value" is not None, must have
150
# a "group" containing digits
151
"value", # datetime.timedelta or None
152
"followers")) # Tokens valid after this token
153
# RFC 3339 "duration" tokens, syntax, and semantics; taken from
154
# the "duration" ABNF definition in RFC 3339, Appendix A.
155
token_end = Token(re.compile(r"$"), None, frozenset())
156
token_second = Token(re.compile(r"(\d+)S"),
157
datetime.timedelta(seconds=1),
158
frozenset((token_end, )))
159
token_minute = Token(re.compile(r"(\d+)M"),
160
datetime.timedelta(minutes=1),
161
frozenset((token_second, token_end)))
162
token_hour = Token(re.compile(r"(\d+)H"),
163
datetime.timedelta(hours=1),
164
frozenset((token_minute, token_end)))
165
token_time = Token(re.compile(r"T"),
167
frozenset((token_hour, token_minute,
169
token_day = Token(re.compile(r"(\d+)D"),
170
datetime.timedelta(days=1),
171
frozenset((token_time, token_end)))
172
token_month = Token(re.compile(r"(\d+)M"),
173
datetime.timedelta(weeks=4),
174
frozenset((token_day, token_end)))
175
token_year = Token(re.compile(r"(\d+)Y"),
176
datetime.timedelta(weeks=52),
177
frozenset((token_month, token_end)))
178
token_week = Token(re.compile(r"(\d+)W"),
179
datetime.timedelta(weeks=1),
180
frozenset((token_end, )))
181
token_duration = Token(re.compile(r"P"), None,
182
frozenset((token_year, token_month,
183
token_day, token_time,
185
# Define starting values:
187
value = datetime.timedelta()
189
# Following valid tokens
190
followers = frozenset((token_duration, ))
191
# String left to parse
193
# Loop until end token is found
194
while found_token is not token_end:
195
# Search for any currently valid tokens
196
for token in followers:
197
match = token.regexp.match(s)
198
if match is not None:
200
if token.value is not None:
201
# Value found, parse digits
202
factor = int(match.group(1), 10)
203
# Add to value so far
204
value += factor * token.value
205
# Strip token from string
206
s = token.regexp.sub("", s, 1)
209
# Set valid next tokens
210
followers = found_token.followers
213
# No currently valid tokens were found
214
raise ValueError("Invalid RFC 3339 duration: {!r}"
49
return (u"%(days)s%(hours)02d:%(minutes)02d:%(seconds)02d"
50
% { "days": "%dT" % td.days if td.days else "",
51
"hours": td.seconds // 3600,
52
"minutes": (td.seconds % 3600) // 60,
53
"seconds": td.seconds % 60,
220
57
def string_to_delta(interval):
221
"""Parse a string and return a datetime.timedelta"""
224
return rfc3339_duration_to_delta(interval)
225
except ValueError as e:
226
log.warning("%s - Parsing as pre-1.6.1 interval instead",
228
return parse_pre_1_6_1_interval(interval)
231
def parse_pre_1_6_1_interval(interval):
232
"""Parse an interval string as documented by Mandos before 1.6.1,
233
and return a datetime.timedelta
235
>>> parse_pre_1_6_1_interval('7d')
58
"""Parse a string and return a datetime.timedelta
60
>>> string_to_delta('7d')
236
61
datetime.timedelta(7)
237
>>> parse_pre_1_6_1_interval('60s')
62
>>> string_to_delta('60s')
238
63
datetime.timedelta(0, 60)
239
>>> parse_pre_1_6_1_interval('60m')
64
>>> string_to_delta('60m')
240
65
datetime.timedelta(0, 3600)
241
>>> parse_pre_1_6_1_interval('24h')
66
>>> string_to_delta('24h')
242
67
datetime.timedelta(1)
243
>>> parse_pre_1_6_1_interval('1w')
68
>>> string_to_delta(u'1w')
244
69
datetime.timedelta(7)
245
>>> parse_pre_1_6_1_interval('5m 30s')
70
>>> string_to_delta('5m 30s')
246
71
datetime.timedelta(0, 330)
247
>>> parse_pre_1_6_1_interval('')
248
datetime.timedelta(0)
249
>>> # Ignore unknown characters, allow any order and repetitions
250
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
251
datetime.timedelta(2, 480, 18000)
255
value = datetime.timedelta(0)
256
regexp = re.compile(r"(\d+)([dsmhw]?)")
258
for num, suffix in regexp.findall(interval):
260
value += datetime.timedelta(int(num))
262
value += datetime.timedelta(0, int(num))
264
value += datetime.timedelta(0, 0, 0, 0, int(num))
266
value += datetime.timedelta(0, 0, 0, 0, 0, int(num))
268
value += datetime.timedelta(0, 0, 0, 0, 0, 0, int(num))
270
value += datetime.timedelta(0, 0, 0, int(num))
274
## Classes for commands.
276
# Abstract classes first
277
class Command(object):
278
"""Abstract class for commands"""
279
def run(self, clients, bus=None, mandos=None):
280
"""Normal commands should implement run_on_one_client(), but
281
commands which want to operate on all clients at the same time
282
can override this run() method instead."""
284
for clientpath, properties in clients.items():
285
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
286
busname, str(clientpath))
287
client = bus.get_object(busname, clientpath)
288
self.run_on_one_client(client, properties)
290
class PrintCmd(Command):
291
"""Abstract class for commands printing client details"""
292
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
293
"Created", "Interval", "Host", "KeyID",
294
"Fingerprint", "CheckerRunning", "LastEnabled",
295
"ApprovalPending", "ApprovedByDefault",
296
"LastApprovalRequest", "ApprovalDelay",
297
"ApprovalDuration", "Checker", "ExtendedTimeout",
298
"Expires", "LastCheckerStatus")
299
def run(self, clients, bus=None, mandos=None):
300
print(self.output(clients.values()))
301
def output(self, clients):
302
raise NotImplementedError()
304
class PropertyCmd(Command):
305
"""Abstract class for Actions for setting one client property"""
306
def run_on_one_client(self, client, properties):
307
"""Set the Client's D-Bus property"""
308
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
309
client.__dbus_object_path__,
310
dbus.PROPERTIES_IFACE, client_interface,
311
self.propname, self.value_to_set
312
if not isinstance(self.value_to_set, dbus.Boolean)
313
else bool(self.value_to_set))
314
client.Set(client_interface, self.propname, self.value_to_set,
315
dbus_interface=dbus.PROPERTIES_IFACE)
318
raise NotImplementedError()
320
class PropertyValueCmd(PropertyCmd):
321
"""Abstract class for PropertyCmd recieving a value as argument"""
322
def __init__(self, value):
323
self.value_to_set = value
325
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
326
"""Abstract class for PropertyValueCmd taking a value argument as
327
a datetime.timedelta() but should store it as milliseconds."""
329
def value_to_set(self):
332
def value_to_set(self, value):
333
"""When setting, convert value from a datetime.timedelta"""
334
self._vts = int(round(value.total_seconds() * 1000))
336
# Actual (non-abstract) command classes
338
class PrintTableCmd(PrintCmd):
339
def __init__(self, verbose=False):
340
self.verbose = verbose
342
def output(self, clients):
343
default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
344
keywords = default_keywords
346
keywords = self.all_keywords
347
return str(self.TableOfClients(clients, keywords))
349
class TableOfClients(object):
352
"Enabled": "Enabled",
353
"Timeout": "Timeout",
354
"LastCheckedOK": "Last Successful Check",
355
"LastApprovalRequest": "Last Approval Request",
356
"Created": "Created",
357
"Interval": "Interval",
359
"Fingerprint": "Fingerprint",
361
"CheckerRunning": "Check Is Running",
362
"LastEnabled": "Last Enabled",
363
"ApprovalPending": "Approval Is Pending",
364
"ApprovedByDefault": "Approved By Default",
365
"ApprovalDelay": "Approval Delay",
366
"ApprovalDuration": "Approval Duration",
367
"Checker": "Checker",
368
"ExtendedTimeout": "Extended Timeout",
369
"Expires": "Expires",
370
"LastCheckerStatus": "Last Checker Status",
373
def __init__(self, clients, keywords, tableheaders=None):
374
self.clients = clients
375
self.keywords = keywords
376
if tableheaders is not None:
377
self.tableheaders = tableheaders
380
return "\n".join(self.rows())
382
if sys.version_info.major == 2:
383
__unicode__ = __str__
385
return str(self).encode(locale.getpreferredencoding())
388
format_string = self.row_formatting_string()
389
rows = [self.header_line(format_string)]
390
rows.extend(self.client_line(client, format_string)
391
for client in self.clients)
394
def row_formatting_string(self):
395
"Format string used to format table rows"
396
return " ".join("{{{key}:{width}}}".format(
397
width=max(len(self.tableheaders[key]),
398
*(len(self.string_from_client(client, key))
399
for client in self.clients)),
401
for key in self.keywords)
403
def string_from_client(self, client, key):
404
return self.valuetostring(client[key], key)
407
def valuetostring(value, keyword):
408
if isinstance(value, dbus.Boolean):
409
return "Yes" if value else "No"
410
if keyword in ("Timeout", "Interval", "ApprovalDelay",
411
"ApprovalDuration", "ExtendedTimeout"):
412
return milliseconds_to_string(value)
415
def header_line(self, format_string):
416
return format_string.format(**self.tableheaders)
418
def client_line(self, client, format_string):
419
return format_string.format(
420
**{key: self.string_from_client(client, key)
421
for key in self.keywords})
425
class DumpJSONCmd(PrintCmd):
426
def output(self, clients):
427
data = {client["Name"]:
428
{key: self.dbus_boolean_to_bool(client[key])
429
for key in self.all_keywords}
430
for client in clients.values()}
431
return json.dumps(data, indent=4, separators=(',', ': '))
433
def dbus_boolean_to_bool(value):
434
if isinstance(value, dbus.Boolean):
438
class IsEnabledCmd(Command):
439
def run(self, clients, bus=None, mandos=None):
440
client, properties = next(iter(clients.items()))
441
if self.is_enabled(client, properties):
73
timevalue = datetime.timedelta(0)
74
regexp = re.compile("\d+[dsmhw]")
76
for s in regexp.findall(interval):
78
suffix = unicode(s[-1])
81
delta = datetime.timedelta(value)
83
delta = datetime.timedelta(0, value)
85
delta = datetime.timedelta(0, 0, 0, 0, value)
87
delta = datetime.timedelta(0, 0, 0, 0, 0, value)
89
delta = datetime.timedelta(0, 0, 0, 0, 0, 0, value)
92
except (ValueError, IndexError):
97
def print_clients(clients):
98
def valuetostring(value, keyword):
99
if type(value) is dbus.Boolean:
100
return u"Yes" if value else u"No"
101
if keyword in (u"timeout", u"interval"):
102
return milliseconds_to_string(value)
103
return unicode(value)
105
# Create format string to print table rows
106
format_string = u' '.join(u'%%-%ds' %
107
max(len(tablewords[key]),
108
max(len(valuetostring(client[key], key))
113
print format_string % tuple(tablewords[key] for key in keywords)
114
for client in clients:
115
print format_string % tuple(valuetostring(client[key], key)
118
parser = OptionParser(version = "%%prog %s" % version)
119
parser.add_option("-a", "--all", action="store_true",
120
help="Print all fields")
121
parser.add_option("-e", "--enable", action="store_true",
122
help="Enable client")
123
parser.add_option("-d", "--disable", action="store_true",
124
help="disable client")
125
parser.add_option("-b", "--bump-timeout", action="store_true",
126
help="Bump timeout for client")
127
parser.add_option("--start-checker", action="store_true",
128
help="Start checker for client")
129
parser.add_option("--stop-checker", action="store_true",
130
help="Stop checker for client")
131
parser.add_option("-V", "--is-valid", action="store_true",
132
help="Check if client is still valid")
133
parser.add_option("-r", "--remove", action="store_true",
134
help="Remove client")
135
parser.add_option("-c", "--checker", type="string",
136
help="Set checker command for client")
137
parser.add_option("-t", "--timeout", type="string",
138
help="Set timeout for client")
139
parser.add_option("-i", "--interval", type="string",
140
help="Set checker interval for client")
141
parser.add_option("-H", "--host", type="string",
142
help="Set host for client")
143
parser.add_option("-s", "--secret", type="string",
144
help="Set password blob (file) for client")
145
options, client_names = parser.parse_args()
147
# Compile list of clients to process
149
for name in client_names:
150
for path, client in mandos_clients.iteritems():
151
if client['name'] == name:
152
client_objc = bus.get_object(busname, path)
153
clients.append(client_objc)
156
print >> sys.stderr, "Client not found on server: %r" % name
444
def is_enabled(self, client, properties):
445
return properties["Enabled"]
447
class RemoveCmd(Command):
448
def run_on_one_client(self, client, properties):
449
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
450
server_path, server_interface,
451
str(client.__dbus_object_path__))
452
self.mandos.RemoveClient(client.__dbus_object_path__)
454
class ApproveCmd(Command):
455
def run_on_one_client(self, client, properties):
456
log.debug("D-Bus: %s:%s:%s.Approve(True)", busname,
457
client.__dbus_object_path__, client_interface)
458
client.Approve(dbus.Boolean(True),
459
dbus_interface=client_interface)
461
class DenyCmd(Command):
462
def run_on_one_client(self, client, properties):
463
log.debug("D-Bus: %s:%s:%s.Approve(False)", busname,
464
client.__dbus_object_path__, client_interface)
465
client.Approve(dbus.Boolean(False),
466
dbus_interface=client_interface)
468
class EnableCmd(PropertyCmd):
470
value_to_set = dbus.Boolean(True)
472
class DisableCmd(PropertyCmd):
474
value_to_set = dbus.Boolean(False)
476
class BumpTimeoutCmd(PropertyCmd):
477
propname = "LastCheckedOK"
480
class StartCheckerCmd(PropertyCmd):
481
propname = "CheckerRunning"
482
value_to_set = dbus.Boolean(True)
484
class StopCheckerCmd(PropertyCmd):
485
propname = "CheckerRunning"
486
value_to_set = dbus.Boolean(False)
488
class ApproveByDefaultCmd(PropertyCmd):
489
propname = "ApprovedByDefault"
490
value_to_set = dbus.Boolean(True)
492
class DenyByDefaultCmd(PropertyCmd):
493
propname = "ApprovedByDefault"
494
value_to_set = dbus.Boolean(False)
496
class SetCheckerCmd(PropertyValueCmd):
499
class SetHostCmd(PropertyValueCmd):
502
class SetSecretCmd(PropertyValueCmd):
505
def value_to_set(self):
508
def value_to_set(self, value):
509
"""When setting, read data from supplied file object"""
510
self._vts = value.read()
513
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
516
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
517
propname = "ExtendedTimeout"
519
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
520
propname = "Interval"
522
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
523
propname = "ApprovalDelay"
525
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
526
propname = "ApprovalDuration"
528
def add_command_line_options(parser):
529
parser.add_argument("--version", action="version",
530
version="%(prog)s {}".format(version),
531
help="show version number and exit")
532
parser.add_argument("-a", "--all", action="store_true",
533
help="Select all clients")
534
parser.add_argument("-v", "--verbose", action="store_true",
535
help="Print all fields")
536
parser.add_argument("-j", "--dump-json", action="store_true",
537
help="Dump client data in JSON format")
538
enable_disable = parser.add_mutually_exclusive_group()
539
enable_disable.add_argument("-e", "--enable", action="store_true",
540
help="Enable client")
541
enable_disable.add_argument("-d", "--disable",
543
help="disable client")
544
parser.add_argument("-b", "--bump-timeout", action="store_true",
545
help="Bump timeout for client")
546
start_stop_checker = parser.add_mutually_exclusive_group()
547
start_stop_checker.add_argument("--start-checker",
549
help="Start checker for client")
550
start_stop_checker.add_argument("--stop-checker",
552
help="Stop checker for client")
553
parser.add_argument("-V", "--is-enabled", action="store_true",
554
help="Check if client is enabled")
555
parser.add_argument("-r", "--remove", action="store_true",
556
help="Remove client")
557
parser.add_argument("-c", "--checker",
558
help="Set checker command for client")
559
parser.add_argument("-t", "--timeout", type=string_to_delta,
560
help="Set timeout for client")
561
parser.add_argument("--extended-timeout", type=string_to_delta,
562
help="Set extended timeout for client")
563
parser.add_argument("-i", "--interval", type=string_to_delta,
564
help="Set checker interval for client")
565
approve_deny_default = parser.add_mutually_exclusive_group()
566
approve_deny_default.add_argument(
567
"--approve-by-default", action="store_true",
568
default=None, dest="approved_by_default",
569
help="Set client to be approved by default")
570
approve_deny_default.add_argument(
571
"--deny-by-default", action="store_false",
572
dest="approved_by_default",
573
help="Set client to be denied by default")
574
parser.add_argument("--approval-delay", type=string_to_delta,
575
help="Set delay before client approve/deny")
576
parser.add_argument("--approval-duration", type=string_to_delta,
577
help="Set duration of one client approval")
578
parser.add_argument("-H", "--host", help="Set host for client")
579
parser.add_argument("-s", "--secret",
580
type=argparse.FileType(mode="rb"),
581
help="Set password blob (file) for client")
582
approve_deny = parser.add_mutually_exclusive_group()
583
approve_deny.add_argument(
584
"-A", "--approve", action="store_true",
585
help="Approve any current client request")
586
approve_deny.add_argument("-D", "--deny", action="store_true",
587
help="Deny any current client request")
588
parser.add_argument("--debug", action="store_true",
589
help="Debug mode (show D-Bus commands)")
590
parser.add_argument("--check", action="store_true",
591
help="Run self-test")
592
parser.add_argument("client", nargs="*", help="Client name")
595
def commands_from_options(options):
599
if options.dump_json:
600
commands.append(DumpJSONCmd())
159
if not clients and mandos_clients.values():
160
keywords = defaultkeywords
162
keywords = ('name', 'enabled', 'timeout', 'last_checked_ok',
163
'created', 'interval', 'host', 'fingerprint',
164
'checker_running', 'last_enabled', 'checker')
165
print_clients(mandos_clients.values())
167
# Process each client in the list by all selected options
168
for client in clients:
170
mandos_serv.RemoveClient(client.__dbus_object_path__)
602
171
if options.enable:
603
commands.append(EnableCmd())
172
client.Enable(dbus_interface=client_interface)
605
173
if options.disable:
606
commands.append(DisableCmd())
174
client.Disable(dbus_interface=client_interface)
608
175
if options.bump_timeout:
609
commands.append(BumpTimeoutCmd())
176
client.CheckedOK(dbus_interface=client_interface)
611
177
if options.start_checker:
612
commands.append(StartCheckerCmd())
178
client.StartChecker(dbus_interface=client_interface)
614
179
if options.stop_checker:
615
commands.append(StopCheckerCmd())
617
if options.is_enabled:
618
commands.append(IsEnabledCmd())
620
if options.checker is not None:
621
commands.append(SetCheckerCmd(options.checker))
623
if options.timeout is not None:
624
commands.append(SetTimeoutCmd(options.timeout))
626
if options.extended_timeout:
628
SetExtendedTimeoutCmd(options.extended_timeout))
630
if options.interval is not None:
631
commands.append(SetIntervalCmd(options.interval))
633
if options.approved_by_default is not None:
634
if options.approved_by_default:
635
commands.append(ApproveByDefaultCmd())
637
commands.append(DenyByDefaultCmd())
639
if options.approval_delay is not None:
640
commands.append(SetApprovalDelayCmd(options.approval_delay))
642
if options.approval_duration is not None:
644
SetApprovalDurationCmd(options.approval_duration))
646
if options.host is not None:
647
commands.append(SetHostCmd(options.host))
649
if options.secret is not None:
650
commands.append(SetSecretCmd(options.secret))
653
commands.append(ApproveCmd())
656
commands.append(DenyCmd())
659
commands.append(RemoveCmd())
661
# If no command option has been given, show table of clients,
662
# optionally verbosely
664
commands.append(PrintTableCmd(verbose=options.verbose))
669
def check_option_syntax(parser, options):
670
"""Apply additional restrictions on options, not expressible in
673
def has_actions(options):
674
return any((options.enable,
676
options.bump_timeout,
677
options.start_checker,
678
options.stop_checker,
681
options.checker is not None,
682
options.timeout is not None,
683
options.extended_timeout is not None,
684
options.interval is not None,
685
options.approved_by_default is not None,
686
options.approval_delay is not None,
687
options.approval_duration is not None,
688
options.host is not None,
689
options.secret is not None,
693
if has_actions(options) and not (options.client or options.all):
694
parser.error("Options require clients names or --all.")
695
if options.verbose and has_actions(options):
696
parser.error("--verbose can only be used alone.")
697
if options.dump_json and (options.verbose
698
or has_actions(options)):
699
parser.error("--dump-json can only be used alone.")
700
if options.all and not has_actions(options):
701
parser.error("--all requires an action.")
702
if options.is_enabled and len(options.client) > 1:
703
parser.error("--is-enabled requires exactly one client")
705
options.remove = False
706
if has_actions(options) and not options.deny:
707
parser.error("--remove can only be combined with --deny")
708
options.remove = True
712
parser = argparse.ArgumentParser()
714
add_command_line_options(parser)
716
options = parser.parse_args()
718
check_option_syntax(parser, options)
720
clientnames = options.client
723
log.setLevel(logging.DEBUG)
726
bus = dbus.SystemBus()
727
log.debug("D-Bus: Connect to: (busname=%r, path=%r)", busname,
729
mandos_dbus_objc = bus.get_object(busname, server_path)
730
except dbus.exceptions.DBusException:
731
log.critical("Could not connect to Mandos server")
734
mandos_serv = dbus.Interface(mandos_dbus_objc,
735
dbus_interface=server_interface)
736
mandos_serv_object_manager = dbus.Interface(
737
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
739
# Filter out log message from dbus module
740
dbus_logger = logging.getLogger("dbus.proxies")
741
class NullFilter(logging.Filter):
742
def filter(self, record):
744
dbus_filter = NullFilter()
746
dbus_logger.addFilter(dbus_filter)
747
log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", busname,
748
server_path, dbus.OBJECT_MANAGER_IFACE)
749
mandos_clients = {path: ifs_and_props[client_interface]
750
for path, ifs_and_props in
751
mandos_serv_object_manager
752
.GetManagedObjects().items()
753
if client_interface in ifs_and_props}
754
except dbus.exceptions.DBusException as e:
755
log.critical("Failed to access Mandos server through D-Bus:"
759
# restore dbus logger
760
dbus_logger.removeFilter(dbus_filter)
762
# Compile dict of (clients: properties) to process
766
clients = {objpath: properties
767
for objpath, properties in mandos_clients.items()}
769
for name in clientnames:
770
for objpath, properties in mandos_clients.items():
771
if properties["Name"] == name:
772
clients[objpath] = properties
775
log.critical("Client not found on server: %r", name)
778
# Run all commands on clients
779
commands = commands_from_options(options)
780
for command in commands:
781
command.run(clients, bus, mandos_serv)
784
class Test_milliseconds_to_string(unittest.TestCase):
786
self.assertEqual(milliseconds_to_string(93785000),
788
def test_no_days(self):
789
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
790
def test_all_zero(self):
791
self.assertEqual(milliseconds_to_string(0), "00:00:00")
792
def test_no_fractional_seconds(self):
793
self.assertEqual(milliseconds_to_string(400), "00:00:00")
794
self.assertEqual(milliseconds_to_string(900), "00:00:00")
795
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
797
class Test_string_to_delta(unittest.TestCase):
798
def test_handles_basic_rfc3339(self):
799
self.assertEqual(string_to_delta("PT0S"),
800
datetime.timedelta())
801
self.assertEqual(string_to_delta("P0D"),
802
datetime.timedelta())
803
self.assertEqual(string_to_delta("PT1S"),
804
datetime.timedelta(0, 1))
805
self.assertEqual(string_to_delta("PT2H"),
806
datetime.timedelta(0, 7200))
807
def test_falls_back_to_pre_1_6_1_with_warning(self):
808
# assertLogs only exists in Python 3.4
809
if hasattr(self, "assertLogs"):
810
with self.assertLogs(log, logging.WARNING):
811
value = string_to_delta("2h")
813
class WarningFilter(logging.Filter):
814
"""Don't show, but record the presence of, warnings"""
815
def filter(self, record):
816
is_warning = record.levelno >= logging.WARNING
817
self.found = is_warning or getattr(self, "found",
819
return not is_warning
820
warning_filter = WarningFilter()
821
log.addFilter(warning_filter)
823
value = string_to_delta("2h")
825
log.removeFilter(warning_filter)
826
self.assertTrue(getattr(warning_filter, "found", False))
827
self.assertEqual(value, datetime.timedelta(0, 7200))
830
class TestCmd(unittest.TestCase):
831
"""Abstract class for tests of command classes"""
834
class MockClient(object):
835
def __init__(self, name, **attributes):
836
self.__dbus_object_path__ = "/clients/{}".format(name)
837
self.attributes = attributes
838
self.attributes["Name"] = name
840
def Set(self, interface, propname, value, dbus_interface):
841
testcase.assertEqual(interface, client_interface)
842
testcase.assertEqual(dbus_interface,
843
dbus.PROPERTIES_IFACE)
844
self.attributes[propname] = value
845
def Get(self, interface, propname, dbus_interface):
846
testcase.assertEqual(interface, client_interface)
847
testcase.assertEqual(dbus_interface,
848
dbus.PROPERTIES_IFACE)
849
return self.attributes[propname]
850
def Approve(self, approve, dbus_interface):
851
testcase.assertEqual(dbus_interface, client_interface)
852
self.calls.append(("Approve", (approve,
854
self.client = MockClient(
856
KeyID=("92ed150794387c03ce684574b1139a65"
857
"94a34f895daaaf09fd8ea90a27cddb12"),
859
Host="foo.example.org",
860
Enabled=dbus.Boolean(True),
862
LastCheckedOK="2019-02-03T00:00:00",
863
Created="2019-01-02T00:00:00",
865
Fingerprint=("778827225BA7DE539C5A"
866
"7CFA59CFF7CDBD9A5920"),
867
CheckerRunning=dbus.Boolean(False),
868
LastEnabled="2019-01-03T00:00:00",
869
ApprovalPending=dbus.Boolean(False),
870
ApprovedByDefault=dbus.Boolean(True),
871
LastApprovalRequest="",
873
ApprovalDuration=1000,
874
Checker="fping -q -- %(host)s",
875
ExtendedTimeout=900000,
876
Expires="2019-02-04T00:00:00",
878
self.other_client = MockClient(
880
KeyID=("0558568eedd67d622f5c83b35a115f79"
881
"6ab612cff5ad227247e46c2b020f441c"),
884
Enabled=dbus.Boolean(True),
886
LastCheckedOK="2019-02-04T00:00:00",
887
Created="2019-01-03T00:00:00",
889
Fingerprint=("3E393AEAEFB84C7E89E2"
890
"F547B3A107558FCA3A27"),
891
CheckerRunning=dbus.Boolean(True),
892
LastEnabled="2019-01-04T00:00:00",
893
ApprovalPending=dbus.Boolean(False),
894
ApprovedByDefault=dbus.Boolean(False),
895
LastApprovalRequest="2019-01-03T00:00:00",
897
ApprovalDuration=1000,
899
ExtendedTimeout=900000,
900
Expires="2019-02-05T00:00:00",
901
LastCheckerStatus=-2)
902
self.clients = collections.OrderedDict(
904
("/clients/foo", self.client.attributes),
905
("/clients/barbar", self.other_client.attributes),
907
self.one_client = {"/clients/foo": self.client.attributes}
912
def get_object(client_bus_name, path):
913
self.assertEqual(client_bus_name, busname)
915
"/clients/foo": self.client,
916
"/clients/barbar": self.other_client,
920
class TestPrintTableCmd(TestCmd):
921
def test_normal(self):
922
output = PrintTableCmd().output(self.clients.values())
923
expected_output = """
924
Name Enabled Timeout Last Successful Check
925
foo Yes 00:05:00 2019-02-03T00:00:00
926
barbar Yes 00:05:00 2019-02-04T00:00:00
928
self.assertEqual(output, expected_output)
929
def test_verbose(self):
930
output = PrintTableCmd(verbose=True).output(
931
self.clients.values())
932
expected_output = """
933
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
934
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
935
barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04T00:00:00 No No 2019-01-03T00:00:00 00:00:30 00:00:01 : 00:15:00 2019-02-05T00:00:00 -2
937
self.assertEqual(output, expected_output)
938
def test_one_client(self):
939
output = PrintTableCmd().output(self.one_client.values())
940
expected_output = """
941
Name Enabled Timeout Last Successful Check
942
foo Yes 00:05:00 2019-02-03T00:00:00
944
self.assertEqual(output, expected_output)
946
class TestDumpJSONCmd(TestCmd):
948
self.expected_json = {
951
"KeyID": ("92ed150794387c03ce684574b1139a65"
952
"94a34f895daaaf09fd8ea90a27cddb12"),
953
"Host": "foo.example.org",
956
"LastCheckedOK": "2019-02-03T00:00:00",
957
"Created": "2019-01-02T00:00:00",
959
"Fingerprint": ("778827225BA7DE539C5A"
960
"7CFA59CFF7CDBD9A5920"),
961
"CheckerRunning": False,
962
"LastEnabled": "2019-01-03T00:00:00",
963
"ApprovalPending": False,
964
"ApprovedByDefault": True,
965
"LastApprovalRequest": "",
967
"ApprovalDuration": 1000,
968
"Checker": "fping -q -- %(host)s",
969
"ExtendedTimeout": 900000,
970
"Expires": "2019-02-04T00:00:00",
971
"LastCheckerStatus": 0,
975
"KeyID": ("0558568eedd67d622f5c83b35a115f79"
976
"6ab612cff5ad227247e46c2b020f441c"),
980
"LastCheckedOK": "2019-02-04T00:00:00",
981
"Created": "2019-01-03T00:00:00",
983
"Fingerprint": ("3E393AEAEFB84C7E89E2"
984
"F547B3A107558FCA3A27"),
985
"CheckerRunning": True,
986
"LastEnabled": "2019-01-04T00:00:00",
987
"ApprovalPending": False,
988
"ApprovedByDefault": False,
989
"LastApprovalRequest": "2019-01-03T00:00:00",
990
"ApprovalDelay": 30000,
991
"ApprovalDuration": 1000,
993
"ExtendedTimeout": 900000,
994
"Expires": "2019-02-05T00:00:00",
995
"LastCheckerStatus": -2,
998
return super(TestDumpJSONCmd, self).setUp()
999
def test_normal(self):
1000
json_data = json.loads(DumpJSONCmd().output(self.clients))
1001
self.assertDictEqual(json_data, self.expected_json)
1002
def test_one_client(self):
1003
clients = self.one_client
1004
json_data = json.loads(DumpJSONCmd().output(clients))
1005
expected_json = {"foo": self.expected_json["foo"]}
1006
self.assertDictEqual(json_data, expected_json)
1008
class TestIsEnabledCmd(TestCmd):
1009
def test_is_enabled(self):
1010
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
1011
for client, properties in self.clients.items()))
1012
def test_is_enabled_run_exits_successfully(self):
1013
with self.assertRaises(SystemExit) as e:
1014
IsEnabledCmd().run(self.one_client)
1015
if e.exception.code is not None:
1016
self.assertEqual(e.exception.code, 0)
1018
self.assertIsNone(e.exception.code)
1019
def test_is_enabled_run_exits_with_failure(self):
1020
self.client.attributes["Enabled"] = dbus.Boolean(False)
1021
with self.assertRaises(SystemExit) as e:
1022
IsEnabledCmd().run(self.one_client)
1023
if isinstance(e.exception.code, int):
1024
self.assertNotEqual(e.exception.code, 0)
1026
self.assertIsNotNone(e.exception.code)
1028
class TestRemoveCmd(TestCmd):
1029
def test_remove(self):
1030
class MockMandos(object):
1033
def RemoveClient(self, dbus_path):
1034
self.calls.append(("RemoveClient", (dbus_path,)))
1035
mandos = MockMandos()
1036
super(TestRemoveCmd, self).setUp()
1037
RemoveCmd().run(self.clients, self.bus, mandos)
1038
self.assertEqual(len(mandos.calls), 2)
1039
for clientpath in self.clients:
1040
self.assertIn(("RemoveClient", (clientpath,)),
1043
class TestApproveCmd(TestCmd):
1044
def test_approve(self):
1045
ApproveCmd().run(self.clients, self.bus)
1046
for clientpath in self.clients:
1047
client = self.bus.get_object(busname, clientpath)
1048
self.assertIn(("Approve", (True, client_interface)),
1051
class TestDenyCmd(TestCmd):
1052
def test_deny(self):
1053
DenyCmd().run(self.clients, self.bus)
1054
for clientpath in self.clients:
1055
client = self.bus.get_object(busname, clientpath)
1056
self.assertIn(("Approve", (False, client_interface)),
1059
class TestEnableCmd(TestCmd):
1060
def test_enable(self):
1061
for clientpath in self.clients:
1062
client = self.bus.get_object(busname, clientpath)
1063
client.attributes["Enabled"] = False
1065
EnableCmd().run(self.clients, self.bus)
1067
for clientpath in self.clients:
1068
client = self.bus.get_object(busname, clientpath)
1069
self.assertTrue(client.attributes["Enabled"])
1071
class TestDisableCmd(TestCmd):
1072
def test_disable(self):
1073
DisableCmd().run(self.clients, self.bus)
1074
for clientpath in self.clients:
1075
client = self.bus.get_object(busname, clientpath)
1076
self.assertFalse(client.attributes["Enabled"])
1078
class Unique(object):
1079
"""Class for objects which exist only to be unique objects, since
1080
unittest.mock.sentinel only exists in Python 3.3"""
1082
class TestPropertyCmd(TestCmd):
1083
"""Abstract class for tests of PropertyCmd classes"""
1085
if not hasattr(self, "command"):
1087
values_to_get = getattr(self, "values_to_get",
1089
for value_to_set, value_to_get in zip(self.values_to_set,
1091
for clientpath in self.clients:
1092
client = self.bus.get_object(busname, clientpath)
1093
old_value = client.attributes[self.propname]
1094
self.assertNotIsInstance(old_value, Unique)
1095
client.attributes[self.propname] = Unique()
1096
self.run_command(value_to_set, self.clients)
1097
for clientpath in self.clients:
1098
client = self.bus.get_object(busname, clientpath)
1099
value = client.attributes[self.propname]
1100
self.assertNotIsInstance(value, Unique)
1101
self.assertEqual(value, value_to_get)
1102
def run_command(self, value, clients):
1103
self.command().run(clients, self.bus)
1105
class TestBumpTimeoutCmd(TestPropertyCmd):
1106
command = BumpTimeoutCmd
1107
propname = "LastCheckedOK"
1108
values_to_set = [""]
1110
class TestStartCheckerCmd(TestPropertyCmd):
1111
command = StartCheckerCmd
1112
propname = "CheckerRunning"
1113
values_to_set = [dbus.Boolean(True)]
1115
class TestStopCheckerCmd(TestPropertyCmd):
1116
command = StopCheckerCmd
1117
propname = "CheckerRunning"
1118
values_to_set = [dbus.Boolean(False)]
1120
class TestApproveByDefaultCmd(TestPropertyCmd):
1121
command = ApproveByDefaultCmd
1122
propname = "ApprovedByDefault"
1123
values_to_set = [dbus.Boolean(True)]
1125
class TestDenyByDefaultCmd(TestPropertyCmd):
1126
command = DenyByDefaultCmd
1127
propname = "ApprovedByDefault"
1128
values_to_set = [dbus.Boolean(False)]
1130
class TestPropertyValueCmd(TestPropertyCmd):
1131
"""Abstract class for tests of PropertyValueCmd classes"""
1133
if type(self) is TestPropertyValueCmd:
1135
return super(TestPropertyValueCmd, self).runTest()
1136
def run_command(self, value, clients):
1137
self.command(value).run(clients, self.bus)
1139
class TestSetCheckerCmd(TestPropertyValueCmd):
1140
command = SetCheckerCmd
1141
propname = "Checker"
1142
values_to_set = ["", ":", "fping -q -- %s"]
1144
class TestSetHostCmd(TestPropertyValueCmd):
1145
command = SetHostCmd
1147
values_to_set = ["192.0.2.3", "foo.example.org"]
1149
class TestSetSecretCmd(TestPropertyValueCmd):
1150
command = SetSecretCmd
1152
values_to_set = [io.BytesIO(b""),
1153
io.BytesIO(b"secret\0xyzzy\nbar")]
1154
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1156
class TestSetTimeoutCmd(TestPropertyValueCmd):
1157
command = SetTimeoutCmd
1158
propname = "Timeout"
1159
values_to_set = [datetime.timedelta(),
1160
datetime.timedelta(minutes=5),
1161
datetime.timedelta(seconds=1),
1162
datetime.timedelta(weeks=1),
1163
datetime.timedelta(weeks=52)]
1164
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1166
class TestSetExtendedTimeoutCmd(TestPropertyValueCmd):
1167
command = SetExtendedTimeoutCmd
1168
propname = "ExtendedTimeout"
1169
values_to_set = [datetime.timedelta(),
1170
datetime.timedelta(minutes=5),
1171
datetime.timedelta(seconds=1),
1172
datetime.timedelta(weeks=1),
1173
datetime.timedelta(weeks=52)]
1174
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1176
class TestSetIntervalCmd(TestPropertyValueCmd):
1177
command = SetIntervalCmd
1178
propname = "Interval"
1179
values_to_set = [datetime.timedelta(),
1180
datetime.timedelta(minutes=5),
1181
datetime.timedelta(seconds=1),
1182
datetime.timedelta(weeks=1),
1183
datetime.timedelta(weeks=52)]
1184
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1186
class TestSetApprovalDelayCmd(TestPropertyValueCmd):
1187
command = SetApprovalDelayCmd
1188
propname = "ApprovalDelay"
1189
values_to_set = [datetime.timedelta(),
1190
datetime.timedelta(minutes=5),
1191
datetime.timedelta(seconds=1),
1192
datetime.timedelta(weeks=1),
1193
datetime.timedelta(weeks=52)]
1194
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1196
class TestSetApprovalDurationCmd(TestPropertyValueCmd):
1197
command = SetApprovalDurationCmd
1198
propname = "ApprovalDuration"
1199
values_to_set = [datetime.timedelta(),
1200
datetime.timedelta(minutes=5),
1201
datetime.timedelta(seconds=1),
1202
datetime.timedelta(weeks=1),
1203
datetime.timedelta(weeks=52)]
1204
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1206
class Test_command_from_options(unittest.TestCase):
1208
self.parser = argparse.ArgumentParser()
1209
add_command_line_options(self.parser)
1210
def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1211
"""Assert that parsing ARGS should result in an instance of
1212
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1213
options = self.parser.parse_args(args)
1214
check_option_syntax(self.parser, options)
1215
commands = commands_from_options(options)
1216
self.assertEqual(len(commands), 1)
1217
command = commands[0]
1218
self.assertIsInstance(command, command_cls)
1219
for key, value in cmd_attrs.items():
1220
self.assertEqual(getattr(command, key), value)
1221
def test_print_table(self):
1222
self.assert_command_from_args([], PrintTableCmd,
1225
def test_print_table_verbose(self):
1226
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1229
def test_print_table_verbose_short(self):
1230
self.assert_command_from_args(["-v"], PrintTableCmd,
1233
def test_enable(self):
1234
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1236
def test_enable_short(self):
1237
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1239
def test_disable(self):
1240
self.assert_command_from_args(["--disable", "foo"],
1243
def test_disable_short(self):
1244
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1246
def test_bump_timeout(self):
1247
self.assert_command_from_args(["--bump-timeout", "foo"],
1250
def test_bump_timeout_short(self):
1251
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1253
def test_start_checker(self):
1254
self.assert_command_from_args(["--start-checker", "foo"],
1257
def test_stop_checker(self):
1258
self.assert_command_from_args(["--stop-checker", "foo"],
1261
def test_remove(self):
1262
self.assert_command_from_args(["--remove", "foo"],
1265
def test_remove_short(self):
1266
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1268
def test_checker(self):
1269
self.assert_command_from_args(["--checker", ":", "foo"],
1270
SetCheckerCmd, value_to_set=":")
1272
def test_checker_empty(self):
1273
self.assert_command_from_args(["--checker", "", "foo"],
1274
SetCheckerCmd, value_to_set="")
1276
def test_checker_short(self):
1277
self.assert_command_from_args(["-c", ":", "foo"],
1278
SetCheckerCmd, value_to_set=":")
1280
def test_timeout(self):
1281
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1283
value_to_set=300000)
1285
def test_timeout_short(self):
1286
self.assert_command_from_args(["-t", "PT5M", "foo"],
1288
value_to_set=300000)
1290
def test_extended_timeout(self):
1291
self.assert_command_from_args(["--extended-timeout", "PT15M",
1293
SetExtendedTimeoutCmd,
1294
value_to_set=900000)
1296
def test_interval(self):
1297
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1299
value_to_set=120000)
1301
def test_interval_short(self):
1302
self.assert_command_from_args(["-i", "PT2M", "foo"],
1304
value_to_set=120000)
1306
def test_approve_by_default(self):
1307
self.assert_command_from_args(["--approve-by-default", "foo"],
1308
ApproveByDefaultCmd)
1310
def test_deny_by_default(self):
1311
self.assert_command_from_args(["--deny-by-default", "foo"],
1314
def test_approval_delay(self):
1315
self.assert_command_from_args(["--approval-delay", "PT30S",
1316
"foo"], SetApprovalDelayCmd,
1319
def test_approval_duration(self):
1320
self.assert_command_from_args(["--approval-duration", "PT1S",
1321
"foo"], SetApprovalDurationCmd,
1324
def test_host(self):
1325
self.assert_command_from_args(["--host", "foo.example.org",
1327
value_to_set="foo.example.org")
1329
def test_host_short(self):
1330
self.assert_command_from_args(["-H", "foo.example.org",
1332
value_to_set="foo.example.org")
1334
def test_secret_devnull(self):
1335
self.assert_command_from_args(["--secret", os.path.devnull,
1336
"foo"], SetSecretCmd,
1339
def test_secret_tempfile(self):
1340
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1341
value = b"secret\0xyzzy\nbar"
1344
self.assert_command_from_args(["--secret", f.name,
1345
"foo"], SetSecretCmd,
1348
def test_secret_devnull_short(self):
1349
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1350
SetSecretCmd, value_to_set=b"")
1352
def test_secret_tempfile_short(self):
1353
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1354
value = b"secret\0xyzzy\nbar"
1357
self.assert_command_from_args(["-s", f.name, "foo"],
1361
def test_approve(self):
1362
self.assert_command_from_args(["--approve", "foo"],
1365
def test_approve_short(self):
1366
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1368
def test_deny(self):
1369
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1371
def test_deny_short(self):
1372
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1374
def test_dump_json(self):
1375
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1377
def test_is_enabled(self):
1378
self.assert_command_from_args(["--is-enabled", "foo"],
1381
def test_is_enabled_short(self):
1382
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1384
def test_deny_before_remove(self):
1385
options = self.parser.parse_args(["--deny", "--remove", "foo"])
1386
check_option_syntax(self.parser, options)
1387
commands = commands_from_options(options)
1388
self.assertEqual(len(commands), 2)
1389
self.assertIsInstance(commands[0], DenyCmd)
1390
self.assertIsInstance(commands[1], RemoveCmd)
1392
def test_deny_before_remove_reversed(self):
1393
options = self.parser.parse_args(["--remove", "--deny", "--all"])
1394
check_option_syntax(self.parser, options)
1395
commands = commands_from_options(options)
1396
self.assertEqual(len(commands), 2)
1397
self.assertIsInstance(commands[0], DenyCmd)
1398
self.assertIsInstance(commands[1], RemoveCmd)
1401
class Test_check_option_syntax(unittest.TestCase):
1402
# This mostly corresponds to the definition from has_actions() in
1403
# check_option_syntax()
1405
# The actual values set here are not that important, but we do
1406
# at least stick to the correct types, even though they are
1410
"bump_timeout": True,
1411
"start_checker": True,
1412
"stop_checker": True,
1416
"timeout": datetime.timedelta(),
1417
"extended_timeout": datetime.timedelta(),
1418
"interval": datetime.timedelta(),
1419
"approved_by_default": True,
1420
"approval_delay": datetime.timedelta(),
1421
"approval_duration": datetime.timedelta(),
1423
"secret": io.BytesIO(b"x"),
1429
self.parser = argparse.ArgumentParser()
1430
add_command_line_options(self.parser)
1432
@contextlib.contextmanager
1433
def assertParseError(self):
1434
with self.assertRaises(SystemExit) as e:
1435
with self.temporarily_suppress_stderr():
1437
# Exit code from argparse is guaranteed to be "2". Reference:
1438
# https://docs.python.org/3/library/argparse.html#exiting-methods
1439
self.assertEqual(e.exception.code, 2)
1442
@contextlib.contextmanager
1443
def temporarily_suppress_stderr():
1444
null = os.open(os.path.devnull, os.O_RDWR)
1445
stderrcopy = os.dup(sys.stderr.fileno())
1446
os.dup2(null, sys.stderr.fileno())
1452
os.dup2(stderrcopy, sys.stderr.fileno())
1453
os.close(stderrcopy)
1455
def check_option_syntax(self, options):
1456
check_option_syntax(self.parser, options)
1458
def test_actions_requires_client_or_all(self):
1459
for action, value in self.actions.items():
1460
options = self.parser.parse_args()
1461
setattr(options, action, value)
1462
with self.assertParseError():
1463
self.check_option_syntax(options)
1465
def test_actions_conflicts_with_verbose(self):
1466
for action, value in self.actions.items():
1467
options = self.parser.parse_args()
1468
setattr(options, action, value)
1469
options.verbose = True
1470
with self.assertParseError():
1471
self.check_option_syntax(options)
1473
def test_dump_json_conflicts_with_verbose(self):
1474
options = self.parser.parse_args()
1475
options.dump_json = True
1476
options.verbose = True
1477
with self.assertParseError():
1478
self.check_option_syntax(options)
1480
def test_dump_json_conflicts_with_action(self):
1481
for action, value in self.actions.items():
1482
options = self.parser.parse_args()
1483
setattr(options, action, value)
1484
options.dump_json = True
1485
with self.assertParseError():
1486
self.check_option_syntax(options)
1488
def test_all_can_not_be_alone(self):
1489
options = self.parser.parse_args()
1491
with self.assertParseError():
1492
self.check_option_syntax(options)
1494
def test_all_is_ok_with_any_action(self):
1495
for action, value in self.actions.items():
1496
options = self.parser.parse_args()
1497
setattr(options, action, value)
1499
self.check_option_syntax(options)
1501
def test_is_enabled_fails_without_client(self):
1502
options = self.parser.parse_args()
1503
options.is_enabled = True
1504
with self.assertParseError():
1505
self.check_option_syntax(options)
1507
def test_is_enabled_works_with_one_client(self):
1508
options = self.parser.parse_args()
1509
options.is_enabled = True
1510
options.client = ["foo"]
1511
self.check_option_syntax(options)
1513
def test_is_enabled_fails_with_two_clients(self):
1514
options = self.parser.parse_args()
1515
options.is_enabled = True
1516
options.client = ["foo", "barbar"]
1517
with self.assertParseError():
1518
self.check_option_syntax(options)
1520
def test_remove_can_only_be_combined_with_action_deny(self):
1521
for action, value in self.actions.items():
1522
if action in {"remove", "deny"}:
1524
options = self.parser.parse_args()
1525
setattr(options, action, value)
1527
options.remove = True
1528
with self.assertParseError():
1529
self.check_option_syntax(options)
1533
def should_only_run_tests():
1534
parser = argparse.ArgumentParser(add_help=False)
1535
parser.add_argument("--check", action='store_true')
1536
args, unknown_args = parser.parse_known_args()
1537
run_tests = args.check
1539
# Remove --check argument from sys.argv
1540
sys.argv[1:] = unknown_args
1543
# Add all tests from doctest strings
1544
def load_tests(loader, tests, none):
1546
tests.addTests(doctest.DocTestSuite())
1549
if __name__ == "__main__":
1550
if should_only_run_tests():
1551
# Call using ./tdd-python-script --check [--verbose]
180
client.StopChecker(dbus_interface=client_interface)
182
sys.exit(0 if client.Get(client_interface,
184
dbus_interface=dbus.PROPERTIES_IFACE)
187
client.Set(client_interface, u"checker", options.checker,
188
dbus_interface=dbus.PROPERTIES_IFACE)
190
client.Set(client_interface, u"host", options.host,
191
dbus_interface=dbus.PROPERTIES_IFACE)
193
client.Set(client_interface, u"interval",
194
timedelta_to_milliseconds
195
(string_to_delta(options.interval)),
196
dbus_interface=dbus.PROPERTIES_IFACE)
198
client.Set(client_interface, u"timeout",
199
timedelta_to_milliseconds(string_to_delta
201
dbus_interface=dbus.PROPERTIES_IFACE)
203
client.Set(client_interface, u"secret",
204
dbus.ByteArray(open(options.secret, u'rb').read()),
205
dbus_interface=dbus.PROPERTIES_IFACE)