45
if sys.version_info[0] == 2:
48
# Show warnings by default
49
if not sys.warnoptions:
51
warnings.simplefilter("default")
53
log = logging.getLogger(sys.argv[0])
54
logging.basicConfig(level="INFO", # Show info level messages
55
format="%(message)s") # Show basic log messages
57
logging.captureWarnings(True) # Show warnings via the logging system
59
if sys.version_info.major == 2:
48
62
locale.setlocale(locale.LC_ALL, "")
54
"LastCheckedOK": "Last Successful Check",
55
"LastApprovalRequest": "Last Approval Request",
57
"Interval": "Interval",
59
"Fingerprint": "Fingerprint",
60
"CheckerRunning": "Check Is Running",
61
"LastEnabled": "Last Enabled",
62
"ApprovalPending": "Approval Is Pending",
63
"ApprovedByDefault": "Approved By Default",
64
"ApprovalDelay": "Approval Delay",
65
"ApprovalDuration": "Approval Duration",
67
"ExtendedTimeout" : "Extended Timeout"
69
defaultkeywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
70
64
domain = "se.recompile"
71
65
busname = domain + ".Mandos"
73
67
server_interface = domain + ".Mandos"
74
68
client_interface = domain + ".Mandos.Client"
77
def timedelta_to_milliseconds(td):
78
"""Convert a datetime.timedelta object to milliseconds"""
79
return ((td.days * 24 * 60 * 60 * 1000)
81
+ (td.microseconds // 1000))
73
dbus.OBJECT_MANAGER_IFACE
74
except AttributeError:
75
dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"
83
78
def milliseconds_to_string(ms):
84
79
td = datetime.timedelta(0, 0, 0, ms)
85
80
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
86
.format(days = "{}T".format(td.days) if td.days else "",
87
hours = td.seconds // 3600,
88
minutes = (td.seconds % 3600) // 60,
89
seconds = td.seconds % 60,
81
.format(days="{}T".format(td.days) if td.days else "",
82
hours=td.seconds // 3600,
83
minutes=(td.seconds % 3600) // 60,
84
seconds=td.seconds % 60))
93
87
def rfc3339_duration_to_delta(duration):
94
88
"""Parse an RFC 3339 "duration" and return a datetime.timedelta
96
90
>>> rfc3339_duration_to_delta("P7D")
97
91
datetime.timedelta(7)
98
92
>>> rfc3339_duration_to_delta("PT60S")
99
93
datetime.timedelta(0, 60)
100
94
>>> rfc3339_duration_to_delta("PT60M")
101
95
datetime.timedelta(0, 3600)
96
>>> rfc3339_duration_to_delta("P60M")
97
datetime.timedelta(1680)
102
98
>>> rfc3339_duration_to_delta("PT24H")
103
99
datetime.timedelta(1)
104
100
>>> rfc3339_duration_to_delta("P1W")
107
103
datetime.timedelta(0, 330)
108
104
>>> rfc3339_duration_to_delta("P1DT3M20S")
109
105
datetime.timedelta(1, 200)
106
>>> # Can not be empty:
107
>>> rfc3339_duration_to_delta("")
108
Traceback (most recent call last):
110
ValueError: Invalid RFC 3339 duration: u''
111
>>> # Must start with "P":
112
>>> rfc3339_duration_to_delta("1D")
113
Traceback (most recent call last):
115
ValueError: Invalid RFC 3339 duration: u'1D'
116
>>> # Must use correct order
117
>>> rfc3339_duration_to_delta("PT1S2M")
118
Traceback (most recent call last):
120
ValueError: Invalid RFC 3339 duration: u'PT1S2M'
121
>>> # Time needs time marker
122
>>> rfc3339_duration_to_delta("P1H2S")
123
Traceback (most recent call last):
125
ValueError: Invalid RFC 3339 duration: u'P1H2S'
126
>>> # Weeks can not be combined with anything else
127
>>> rfc3339_duration_to_delta("P1D2W")
128
Traceback (most recent call last):
130
ValueError: Invalid RFC 3339 duration: u'P1D2W'
131
>>> rfc3339_duration_to_delta("P2W2H")
132
Traceback (most recent call last):
134
ValueError: Invalid RFC 3339 duration: u'P2W2H'
112
137
# Parsing an RFC 3339 duration with regular expressions is not
113
138
# possible - there would have to be multiple places for the same
114
139
# values, like seconds. The current code, while more esoteric, is
115
140
# cleaner without depending on a parsing library. If Python had a
116
141
# built-in library for parsing we would use it, but we'd like to
117
142
# avoid excessive use of external libraries.
119
144
# New type for defining tokens, syntax, and semantics all-in-one
120
Token = collections.namedtuple("Token",
121
("regexp", # To match token; if
122
# "value" is not None,
123
# must have a "group"
125
"value", # datetime.timedelta or
127
"followers")) # Tokens valid after
145
Token = collections.namedtuple("Token", (
146
"regexp", # To match token; if "value" is not None, must have
147
# a "group" containing digits
148
"value", # datetime.timedelta or None
149
"followers")) # Tokens valid after this token
129
150
# RFC 3339 "duration" tokens, syntax, and semantics; taken from
130
151
# the "duration" ABNF definition in RFC 3339, Appendix A.
131
152
token_end = Token(re.compile(r"$"), None, frozenset())
132
153
token_second = Token(re.compile(r"(\d+)S"),
133
154
datetime.timedelta(seconds=1),
134
frozenset((token_end,)))
155
frozenset((token_end, )))
135
156
token_minute = Token(re.compile(r"(\d+)M"),
136
157
datetime.timedelta(minutes=1),
137
158
frozenset((token_second, token_end)))
186
210
# No currently valid tokens were found
187
raise ValueError("Invalid RFC 3339 duration")
211
raise ValueError("Invalid RFC 3339 duration: {!r}"
188
213
# End token found
192
217
def string_to_delta(interval):
193
"""Parse a string and return a datetime.timedelta
195
>>> string_to_delta("7d")
196
datetime.timedelta(7)
197
>>> string_to_delta("60s")
198
datetime.timedelta(0, 60)
199
>>> string_to_delta("60m")
200
datetime.timedelta(0, 3600)
201
>>> string_to_delta("24h")
202
datetime.timedelta(1)
203
>>> string_to_delta("1w")
204
datetime.timedelta(7)
205
>>> string_to_delta("5m 30s")
206
datetime.timedelta(0, 330)
218
"""Parse a string and return a datetime.timedelta"""
210
221
return rfc3339_duration_to_delta(interval)
222
except ValueError as e:
223
log.warning("%s - Parsing as pre-1.6.1 interval instead",
225
return parse_pre_1_6_1_interval(interval)
228
def parse_pre_1_6_1_interval(interval):
229
"""Parse an interval string as documented by Mandos before 1.6.1,
230
and return a datetime.timedelta
232
>>> parse_pre_1_6_1_interval('7d')
233
datetime.timedelta(7)
234
>>> parse_pre_1_6_1_interval('60s')
235
datetime.timedelta(0, 60)
236
>>> parse_pre_1_6_1_interval('60m')
237
datetime.timedelta(0, 3600)
238
>>> parse_pre_1_6_1_interval('24h')
239
datetime.timedelta(1)
240
>>> parse_pre_1_6_1_interval('1w')
241
datetime.timedelta(7)
242
>>> parse_pre_1_6_1_interval('5m 30s')
243
datetime.timedelta(0, 330)
244
>>> parse_pre_1_6_1_interval('')
245
datetime.timedelta(0)
246
>>> # Ignore unknown characters, allow any order and repetitions
247
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
248
datetime.timedelta(2, 480, 18000)
214
252
value = datetime.timedelta(0)
215
253
regexp = re.compile(r"(\d+)([dsmhw]?)")
217
255
for num, suffix in regexp.findall(interval):
218
256
if suffix == "d":
219
257
value += datetime.timedelta(int(num))
229
267
value += datetime.timedelta(0, 0, 0, int(num))
232
def print_clients(clients, keywords):
271
class TableOfClients(object):
274
"Enabled": "Enabled",
275
"Timeout": "Timeout",
276
"LastCheckedOK": "Last Successful Check",
277
"LastApprovalRequest": "Last Approval Request",
278
"Created": "Created",
279
"Interval": "Interval",
281
"Fingerprint": "Fingerprint",
283
"CheckerRunning": "Check Is Running",
284
"LastEnabled": "Last Enabled",
285
"ApprovalPending": "Approval Is Pending",
286
"ApprovedByDefault": "Approved By Default",
287
"ApprovalDelay": "Approval Delay",
288
"ApprovalDuration": "Approval Duration",
289
"Checker": "Checker",
290
"ExtendedTimeout": "Extended Timeout",
291
"Expires": "Expires",
292
"LastCheckerStatus": "Last Checker Status",
295
def __init__(self, clients, keywords, tableheaders=None):
296
self.clients = clients
297
self.keywords = keywords
298
if tableheaders is not None:
299
self.tableheaders = tableheaders
302
return "\n".join(self.rows())
304
if sys.version_info.major == 2:
305
__unicode__ = __str__
307
return str(self).encode(locale.getpreferredencoding())
310
format_string = self.row_formatting_string()
311
rows = [self.header_line(format_string)]
312
rows.extend(self.client_line(client, format_string)
313
for client in self.clients)
316
def row_formatting_string(self):
317
"Format string used to format table rows"
318
return " ".join("{{{key}:{width}}}".format(
319
width=max(len(self.tableheaders[key]),
320
*(len(self.string_from_client(client, key))
321
for client in self.clients)),
323
for key in self.keywords)
325
def string_from_client(self, client, key):
326
return self.valuetostring(client[key], key)
233
329
def valuetostring(value, keyword):
234
if type(value) is dbus.Boolean:
330
if isinstance(value, dbus.Boolean):
235
331
return "Yes" if value else "No"
236
332
if keyword in ("Timeout", "Interval", "ApprovalDelay",
237
333
"ApprovalDuration", "ExtendedTimeout"):
238
334
return milliseconds_to_string(value)
239
335
return str(value)
241
# Create format string to print table rows
242
format_string = " ".join("{{{key}:{width}}}".format(
243
width = max(len(tablewords[key]),
244
max(len(valuetostring(client[key],
248
key = key) for key in keywords)
250
print(format_string.format(**tablewords))
251
for client in clients:
252
print(format_string.format(**{ key:
253
valuetostring(client[key],
255
for key in keywords }))
337
def header_line(self, format_string):
338
return format_string.format(**self.tableheaders)
340
def client_line(self, client, format_string):
341
return format_string.format(
342
**{key: self.string_from_client(client, key)
343
for key in self.keywords})
346
## Classes for commands.
348
# Abstract classes first
349
class Command(object):
350
"""Abstract class for commands"""
351
def run(self, clients):
352
"""Normal commands should implement run_on_one_client(), but
353
commands which want to operate on all clients at the same time
354
can override this run() method instead."""
355
for client in clients:
356
self.run_on_one_client(client)
358
class PrintCmd(Command):
359
"""Abstract class for commands printing client details"""
360
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
361
"Created", "Interval", "Host", "KeyID",
362
"Fingerprint", "CheckerRunning", "LastEnabled",
363
"ApprovalPending", "ApprovedByDefault",
364
"LastApprovalRequest", "ApprovalDelay",
365
"ApprovalDuration", "Checker", "ExtendedTimeout",
366
"Expires", "LastCheckerStatus")
367
def run(self, clients):
368
print(self.output(clients))
370
class PropertyCmd(Command):
371
"""Abstract class for Actions for setting one client property"""
372
def run_on_one_client(self, client):
373
"""Set the Client's D-Bus property"""
374
client.Set(client_interface, self.property, self.value_to_set,
375
dbus_interface=dbus.PROPERTIES_IFACE)
377
class ValueArgumentMixIn(object):
378
"""Mixin class for commands taking a value as argument"""
379
def __init__(self, value):
380
self.value_to_set = value
382
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
383
"""Mixin class for commands taking a value argument as
386
def value_to_set(self):
389
def value_to_set(self, value):
390
"""When setting, convert value to a datetime.timedelta"""
391
self._vts = string_to_delta(value).total_seconds() * 1000
393
# Actual (non-abstract) command classes
395
class PrintTableCmd(PrintCmd):
396
def __init__(self, verbose=False):
397
self.verbose = verbose
398
def output(self, clients):
400
keywords = self.all_keywords
402
keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
403
return str(TableOfClients(clients.values(), keywords))
405
class DumpJSONCmd(PrintCmd):
406
def output(self, clients):
407
data = {client["Name"]:
408
{key: self.dbus_boolean_to_bool(client[key])
409
for key in self.all_keywords}
410
for client in clients.values()}
411
return json.dumps(data, indent=4, separators=(',', ': '))
413
def dbus_boolean_to_bool(value):
414
if isinstance(value, dbus.Boolean):
418
class IsEnabledCmd(Command):
419
def run_on_one_client(self, client):
420
if self.is_enabled(client):
423
def is_enabled(self, client):
424
return client.Get(client_interface, "Enabled",
425
dbus_interface=dbus.PROPERTIES_IFACE)
427
class RemoveCmd(Command):
428
def __init__(self, mandos):
430
def run_on_one_client(self, client):
431
self.mandos.RemoveClient(client.__dbus_object_path__)
433
class ApproveCmd(Command):
434
def run_on_one_client(self, client):
435
client.Approve(dbus.Boolean(True),
436
dbus_interface=client_interface)
438
class DenyCmd(Command):
439
def run_on_one_client(self, client):
440
client.Approve(dbus.Boolean(False),
441
dbus_interface=client_interface)
443
class EnableCmd(PropertyCmd):
445
value_to_set = dbus.Boolean(True)
447
class DisableCmd(PropertyCmd):
449
value_to_set = dbus.Boolean(False)
451
class BumpTimeoutCmd(PropertyCmd):
452
property = "LastCheckedOK"
455
class StartCheckerCmd(PropertyCmd):
456
property = "CheckerRunning"
457
value_to_set = dbus.Boolean(True)
459
class StopCheckerCmd(PropertyCmd):
460
property = "CheckerRunning"
461
value_to_set = dbus.Boolean(False)
463
class ApproveByDefaultCmd(PropertyCmd):
464
property = "ApprovedByDefault"
465
value_to_set = dbus.Boolean(True)
467
class DenyByDefaultCmd(PropertyCmd):
468
property = "ApprovedByDefault"
469
value_to_set = dbus.Boolean(False)
471
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
474
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
477
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
480
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
483
class SetExtendedTimeoutCmd(PropertyCmd,
484
MillisecondsValueArgumentMixIn):
485
property = "ExtendedTimeout"
487
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
488
property = "Interval"
490
class SetApprovalDelayCmd(PropertyCmd,
491
MillisecondsValueArgumentMixIn):
492
property = "ApprovalDelay"
494
class SetApprovalDurationCmd(PropertyCmd,
495
MillisecondsValueArgumentMixIn):
496
property = "ApprovalDuration"
257
498
def has_actions(options):
258
499
return any((options.enable,
278
520
parser = argparse.ArgumentParser()
279
521
parser.add_argument("--version", action="version",
280
version = "%(prog)s {}".format(version),
522
version="%(prog)s {}".format(version),
281
523
help="show version number and exit")
282
524
parser.add_argument("-a", "--all", action="store_true",
283
525
help="Select all clients")
284
526
parser.add_argument("-v", "--verbose", action="store_true",
285
527
help="Print all fields")
286
parser.add_argument("-e", "--enable", action="store_true",
287
help="Enable client")
288
parser.add_argument("-d", "--disable", action="store_true",
289
help="disable client")
528
parser.add_argument("-j", "--dump-json", action="store_true",
529
help="Dump client data in JSON format")
530
enable_disable = parser.add_mutually_exclusive_group()
531
enable_disable.add_argument("-e", "--enable", action="store_true",
532
help="Enable client")
533
enable_disable.add_argument("-d", "--disable",
535
help="disable client")
290
536
parser.add_argument("-b", "--bump-timeout", action="store_true",
291
537
help="Bump timeout for client")
292
parser.add_argument("--start-checker", action="store_true",
293
help="Start checker for client")
294
parser.add_argument("--stop-checker", action="store_true",
295
help="Stop checker for client")
538
start_stop_checker = parser.add_mutually_exclusive_group()
539
start_stop_checker.add_argument("--start-checker",
541
help="Start checker for client")
542
start_stop_checker.add_argument("--stop-checker",
544
help="Stop checker for client")
296
545
parser.add_argument("-V", "--is-enabled", action="store_true",
297
546
help="Check if client is enabled")
298
547
parser.add_argument("-r", "--remove", action="store_true",
319
571
parser.add_argument("-s", "--secret",
320
572
type=argparse.FileType(mode="rb"),
321
573
help="Set password blob (file) for client")
322
parser.add_argument("-A", "--approve", action="store_true",
323
help="Approve any current client request")
324
parser.add_argument("-D", "--deny", action="store_true",
325
help="Deny any current client request")
574
approve_deny = parser.add_mutually_exclusive_group()
575
approve_deny.add_argument(
576
"-A", "--approve", action="store_true",
577
help="Approve any current client request")
578
approve_deny.add_argument("-D", "--deny", action="store_true",
579
help="Deny any current client request")
326
580
parser.add_argument("--check", action="store_true",
327
581
help="Run self-test")
328
582
parser.add_argument("client", nargs="*", help="Client name")
329
583
options = parser.parse_args()
331
585
if has_actions(options) and not (options.client or options.all):
332
586
parser.error("Options require clients names or --all.")
333
587
if options.verbose and has_actions(options):
334
parser.error("--verbose can only be used alone or with"
588
parser.error("--verbose can only be used alone.")
589
if options.dump_json and (options.verbose
590
or has_actions(options)):
591
parser.error("--dump-json can only be used alone.")
336
592
if options.all and not has_actions(options):
337
593
parser.error("--all requires an action.")
594
if options.is_enabled and len(options.client) > 1:
595
parser.error("--is-enabled requires exactly one client")
340
fail_count, test_count = doctest.testmod()
341
sys.exit(os.EX_OK if fail_count == 0 else 1)
344
598
bus = dbus.SystemBus()
345
599
mandos_dbus_objc = bus.get_object(busname, server_path)
346
600
except dbus.exceptions.DBusException:
347
print("Could not connect to Mandos server",
601
log.critical("Could not connect to Mandos server")
351
604
mandos_serv = dbus.Interface(mandos_dbus_objc,
352
dbus_interface = server_interface)
354
#block stderr since dbus library prints to stderr
605
dbus_interface=server_interface)
606
mandos_serv_object_manager = dbus.Interface(
607
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
611
if options.dump_json:
612
commands.append(DumpJSONCmd())
615
commands.append(EnableCmd())
618
commands.append(DisableCmd())
620
if options.bump_timeout:
621
commands.append(BumpTimeoutCmd(options.bump_timeout))
623
if options.start_checker:
624
commands.append(StartCheckerCmd())
626
if options.stop_checker:
627
commands.append(StopCheckerCmd())
629
if options.is_enabled:
630
commands.append(IsEnabledCmd())
633
commands.append(RemoveCmd(mandos_serv))
635
if options.checker is not None:
636
commands.append(SetCheckerCmd())
638
if options.timeout is not None:
639
commands.append(SetTimeoutCmd(options.timeout))
641
if options.extended_timeout:
643
SetExtendedTimeoutCmd(options.extended_timeout))
645
if options.interval is not None:
646
command.append(SetIntervalCmd(options.interval))
648
if options.approved_by_default is not None:
649
if options.approved_by_default:
650
command.append(ApproveByDefaultCmd())
652
command.append(DenyByDefaultCmd())
654
if options.approval_delay is not None:
655
command.append(SetApprovalDelayCmd(options.approval_delay))
657
if options.approval_duration is not None:
659
SetApprovalDurationCmd(options.approval_duration))
661
if options.host is not None:
662
command.append(SetHostCmd(options.host))
664
if options.secret is not None:
665
command.append(SetSecretCmd(options.secret))
668
commands.append(ApproveCmd())
671
commands.append(DenyCmd())
673
# If no command option has been given, show table of clients,
674
# optionally verbosely
676
commands.append(PrintTableCmd(verbose=options.verbose))
678
# block stderr since dbus library prints to stderr
355
679
null = os.open(os.path.devnull, os.O_RDWR)
356
680
stderrcopy = os.dup(sys.stderr.fileno())
357
681
os.dup2(null, sys.stderr.fileno())
361
mandos_clients = mandos_serv.GetAllClientsWithProperties()
685
mandos_clients = {path: ifs_and_props[client_interface]
686
for path, ifs_and_props in
687
mandos_serv_object_manager
688
.GetManagedObjects().items()
689
if client_interface in ifs_and_props}
364
692
os.dup2(stderrcopy, sys.stderr.fileno())
365
693
os.close(stderrcopy)
366
except dbus.exceptions.DBusException:
367
print("Access denied: Accessing mandos server through dbus.",
694
except dbus.exceptions.DBusException as e:
695
log.critical("Failed to access Mandos server through D-Bus:"
371
699
# Compile dict of (clients: properties) to process
374
702
if options.all or not options.client:
375
clients = { bus.get_object(busname, path): properties
376
for path, properties in mandos_clients.items() }
703
clients = {bus.get_object(busname, path): properties
704
for path, properties in mandos_clients.items()}
378
706
for name in options.client:
379
707
for path, client in mandos_clients.items():
382
710
clients[client_objc] = client
385
print("Client not found on server: {!r}"
386
.format(name), file=sys.stderr)
713
log.critical("Client not found on server: %r", name)
389
if not has_actions(options) and clients:
391
keywords = ("Name", "Enabled", "Timeout",
392
"LastCheckedOK", "Created", "Interval",
393
"Host", "Fingerprint", "CheckerRunning",
394
"LastEnabled", "ApprovalPending",
396
"LastApprovalRequest", "ApprovalDelay",
397
"ApprovalDuration", "Checker",
716
# Run all commands on clients
717
for command in commands:
721
class Test_milliseconds_to_string(unittest.TestCase):
723
self.assertEqual(milliseconds_to_string(93785000),
725
def test_no_days(self):
726
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
727
def test_all_zero(self):
728
self.assertEqual(milliseconds_to_string(0), "00:00:00")
729
def test_no_fractional_seconds(self):
730
self.assertEqual(milliseconds_to_string(400), "00:00:00")
731
self.assertEqual(milliseconds_to_string(900), "00:00:00")
732
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
734
class Test_string_to_delta(unittest.TestCase):
735
def test_handles_basic_rfc3339(self):
736
self.assertEqual(string_to_delta("PT2H"),
737
datetime.timedelta(0, 7200))
738
def test_falls_back_to_pre_1_6_1_with_warning(self):
739
# assertLogs only exists in Python 3.4
740
if hasattr(self, "assertLogs"):
741
with self.assertLogs(log, logging.WARNING):
742
value = string_to_delta("2h")
400
keywords = defaultkeywords
402
print_clients(clients.values(), keywords)
404
# Process each client in the list by all selected options
405
for client in clients:
406
def set_client_prop(prop, value):
407
"""Set a Client D-Bus property"""
408
client.Set(client_interface, prop, value,
409
dbus_interface=dbus.PROPERTIES_IFACE)
410
def set_client_prop_ms(prop, value):
411
"""Set a Client D-Bus property, converted
412
from a string to milliseconds."""
413
set_client_prop(prop,
414
timedelta_to_milliseconds
415
(string_to_delta(value)))
417
mandos_serv.RemoveClient(client.__dbus_object_path__)
419
set_client_prop("Enabled", dbus.Boolean(True))
421
set_client_prop("Enabled", dbus.Boolean(False))
422
if options.bump_timeout:
423
set_client_prop("LastCheckedOK", "")
424
if options.start_checker:
425
set_client_prop("CheckerRunning", dbus.Boolean(True))
426
if options.stop_checker:
427
set_client_prop("CheckerRunning", dbus.Boolean(False))
428
if options.is_enabled:
429
sys.exit(0 if client.Get(client_interface,
432
dbus.PROPERTIES_IFACE)
434
if options.checker is not None:
435
set_client_prop("Checker", options.checker)
436
if options.host is not None:
437
set_client_prop("Host", options.host)
438
if options.interval is not None:
439
set_client_prop_ms("Interval", options.interval)
440
if options.approval_delay is not None:
441
set_client_prop_ms("ApprovalDelay",
442
options.approval_delay)
443
if options.approval_duration is not None:
444
set_client_prop_ms("ApprovalDuration",
445
options.approval_duration)
446
if options.timeout is not None:
447
set_client_prop_ms("Timeout", options.timeout)
448
if options.extended_timeout is not None:
449
set_client_prop_ms("ExtendedTimeout",
450
options.extended_timeout)
451
if options.secret is not None:
452
set_client_prop("Secret",
453
dbus.ByteArray(options.secret.read()))
454
if options.approved_by_default is not None:
455
set_client_prop("ApprovedByDefault",
457
.approved_by_default))
459
client.Approve(dbus.Boolean(True),
460
dbus_interface=client_interface)
462
client.Approve(dbus.Boolean(False),
463
dbus_interface=client_interface)
744
value = string_to_delta("2h")
745
self.assertEqual(value, datetime.timedelta(0, 7200))
747
class Test_TableOfClients(unittest.TestCase):
749
self.tableheaders = {
753
"Bool": "A D-BUS Boolean",
754
"NonDbusBoolean": "A Non-D-BUS Boolean",
755
"Integer": "An Integer",
756
"Timeout": "Timedelta 1",
757
"Interval": "Timedelta 2",
758
"ApprovalDelay": "Timedelta 3",
759
"ApprovalDuration": "Timedelta 4",
760
"ExtendedTimeout": "Timedelta 5",
761
"String": "A String",
763
self.keywords = ["Attr1", "AttrTwo"]
769
"Bool": dbus.Boolean(False),
770
"NonDbusBoolean": False,
774
"ApprovalDelay": 2000,
775
"ApprovalDuration": 3000,
776
"ExtendedTimeout": 4000,
783
"Bool": dbus.Boolean(True),
784
"NonDbusBoolean": True,
787
"Interval": 93786000,
788
"ApprovalDelay": 93787000,
789
"ApprovalDuration": 93788000,
790
"ExtendedTimeout": 93789000,
791
"String": "A huge string which will not fit," * 10,
794
def test_short_header(self):
795
text = str(TableOfClients(self.clients, self.keywords,
802
self.assertEqual(text, expected_text)
803
def test_booleans(self):
804
keywords = ["Bool", "NonDbusBoolean"]
805
text = str(TableOfClients(self.clients, keywords,
808
A D-BUS Boolean A Non-D-BUS Boolean
812
self.assertEqual(text, expected_text)
813
def test_milliseconds_detection(self):
814
keywords = ["Integer", "Timeout", "Interval", "ApprovalDelay",
815
"ApprovalDuration", "ExtendedTimeout"]
816
text = str(TableOfClients(self.clients, keywords,
819
An Integer Timedelta 1 Timedelta 2 Timedelta 3 Timedelta 4 Timedelta 5
820
0 00:00:00 00:00:01 00:00:02 00:00:03 00:00:04
821
1 1T02:03:05 1T02:03:06 1T02:03:07 1T02:03:08 1T02:03:09
823
self.assertEqual(text, expected_text)
824
def test_empty_and_long_string_values(self):
825
keywords = ["String"]
826
text = str(TableOfClients(self.clients, keywords,
831
A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,
833
self.assertEqual(text, expected_text)
837
def should_only_run_tests():
838
parser = argparse.ArgumentParser(add_help=False)
839
parser.add_argument("--check", action='store_true')
840
args, unknown_args = parser.parse_known_args()
841
run_tests = args.check
843
# Remove --check argument from sys.argv
844
sys.argv[1:] = unknown_args
847
# Add all tests from doctest strings
848
def load_tests(loader, tests, none):
850
tests.addTests(doctest.DocTestSuite())
465
853
if __name__ == "__main__":
854
if should_only_run_tests():
855
# Call using ./tdd-python-script --check [--verbose]