48
# Show warnings by default
49
if not sys.warnoptions:
51
warnings.simplefilter("default")
53
log = logging.getLogger(sys.argv[0])
54
logging.basicConfig(level="INFO", # Show info level messages
55
format="%(message)s") # Show basic log messages
57
logging.captureWarnings(True) # Show warnings via the logging system
46
59
if sys.version_info.major == 2:
103
116
datetime.timedelta(0, 60)
104
117
>>> rfc3339_duration_to_delta("PT60M")
105
118
datetime.timedelta(0, 3600)
119
>>> rfc3339_duration_to_delta("P60M")
120
datetime.timedelta(1680)
106
121
>>> rfc3339_duration_to_delta("PT24H")
107
122
datetime.timedelta(1)
108
123
>>> rfc3339_duration_to_delta("P1W")
111
126
datetime.timedelta(0, 330)
112
127
>>> rfc3339_duration_to_delta("P1DT3M20S")
113
128
datetime.timedelta(1, 200)
129
>>> # Can not be empty:
130
>>> rfc3339_duration_to_delta("")
131
Traceback (most recent call last):
133
ValueError: Invalid RFC 3339 duration: u''
134
>>> # Must start with "P":
135
>>> rfc3339_duration_to_delta("1D")
136
Traceback (most recent call last):
138
ValueError: Invalid RFC 3339 duration: u'1D'
139
>>> # Must use correct order
140
>>> rfc3339_duration_to_delta("PT1S2M")
141
Traceback (most recent call last):
143
ValueError: Invalid RFC 3339 duration: u'PT1S2M'
144
>>> # Time needs time marker
145
>>> rfc3339_duration_to_delta("P1H2S")
146
Traceback (most recent call last):
148
ValueError: Invalid RFC 3339 duration: u'P1H2S'
149
>>> # Weeks can not be combined with anything else
150
>>> rfc3339_duration_to_delta("P1D2W")
151
Traceback (most recent call last):
153
ValueError: Invalid RFC 3339 duration: u'P1D2W'
154
>>> rfc3339_duration_to_delta("P2W2H")
155
Traceback (most recent call last):
157
ValueError: Invalid RFC 3339 duration: u'P2W2H'
116
160
# Parsing an RFC 3339 duration with regular expressions is not
196
240
def string_to_delta(interval):
197
241
"""Parse a string and return a datetime.timedelta
199
>>> string_to_delta('7d')
200
datetime.timedelta(7)
201
>>> string_to_delta('60s')
202
datetime.timedelta(0, 60)
203
>>> string_to_delta('60m')
204
datetime.timedelta(0, 3600)
205
>>> string_to_delta('24h')
206
datetime.timedelta(1)
207
>>> string_to_delta('1w')
208
datetime.timedelta(7)
209
>>> string_to_delta('5m 30s')
210
datetime.timedelta(0, 330)
214
245
return rfc3339_duration_to_delta(interval)
246
except ValueError as e:
247
log.warning("%s - Parsing as pre-1.6.1 interval instead",
249
return parse_pre_1_6_1_interval(interval)
252
def parse_pre_1_6_1_interval(interval):
253
"""Parse an interval string as documented by Mandos before 1.6.1, and
254
return a datetime.timedelta
255
>>> parse_pre_1_6_1_interval('7d')
256
datetime.timedelta(7)
257
>>> parse_pre_1_6_1_interval('60s')
258
datetime.timedelta(0, 60)
259
>>> parse_pre_1_6_1_interval('60m')
260
datetime.timedelta(0, 3600)
261
>>> parse_pre_1_6_1_interval('24h')
262
datetime.timedelta(1)
263
>>> parse_pre_1_6_1_interval('1w')
264
datetime.timedelta(7)
265
>>> parse_pre_1_6_1_interval('5m 30s')
266
datetime.timedelta(0, 330)
267
>>> parse_pre_1_6_1_interval('')
268
datetime.timedelta(0)
269
>>> # Ignore unknown characters, allow any order and repetitions
270
>>> parse_pre_1_6_1_interval('2dxy7zz11y3m5m')
271
datetime.timedelta(2, 480, 18000)
218
275
value = datetime.timedelta(0)
219
276
regexp = re.compile(r"(\d+)([dsmhw]?)")
237
294
def print_clients(clients, keywords):
295
print('\n'.join(table_rows_of_clients(clients, keywords)))
297
def table_rows_of_clients(clients, keywords):
238
298
def valuetostring(value, keyword):
239
if type(value) is dbus.Boolean:
299
if isinstance(value, dbus.Boolean):
240
300
return "Yes" if value else "No"
241
301
if keyword in ("Timeout", "Interval", "ApprovalDelay",
242
302
"ApprovalDuration", "ExtendedTimeout"):
250
310
for client in clients)),
252
312
for key in keywords)
254
print(format_string.format(**tablewords))
313
# Start with header line
314
rows = [format_string.format(**tablewords)]
255
315
for client in clients:
257
.format(**{key: valuetostring(client[key], key)
258
for key in keywords}))
316
rows.append(format_string
317
.format(**{key: valuetostring(client[key], key)
318
for key in keywords}))
261
322
def has_actions(options):
345
406
if options.all and not has_actions(options):
346
407
parser.error("--all requires an action.")
350
fail_count, test_count = doctest.testmod()
351
sys.exit(os.EX_OK if fail_count == 0 else 1)
354
410
bus = dbus.SystemBus()
355
411
mandos_dbus_objc = bus.get_object(busname, server_path)
356
412
except dbus.exceptions.DBusException:
357
print("Could not connect to Mandos server", file=sys.stderr)
413
log.critical("Could not connect to Mandos server")
360
416
mandos_serv = dbus.Interface(mandos_dbus_objc,
379
435
os.dup2(stderrcopy, sys.stderr.fileno())
380
436
os.close(stderrcopy)
381
437
except dbus.exceptions.DBusException as e:
382
print("Access denied: "
383
"Accessing mandos server through D-Bus: {}".format(e),
438
log.critical("Failed to access Mandos server through D-Bus:"
387
442
# Compile dict of (clients: properties) to process
398
453
clients[client_objc] = client
401
print("Client not found on server: {!r}"
402
.format(name), file=sys.stderr)
456
log.critical("Client not found on server: %r", name)
405
459
if not has_actions(options) and clients:
493
547
client.Approve(dbus.Boolean(False),
494
548
dbus_interface=client_interface)
551
class Test_milliseconds_to_string(unittest.TestCase):
553
self.assertEqual(milliseconds_to_string(93785000),
555
def test_no_days(self):
556
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
557
def test_all_zero(self):
558
self.assertEqual(milliseconds_to_string(0), "00:00:00")
559
def test_no_fractional_seconds(self):
560
self.assertEqual(milliseconds_to_string(400), "00:00:00")
561
self.assertEqual(milliseconds_to_string(900), "00:00:00")
562
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
564
class Test_string_to_delta(unittest.TestCase):
565
def test_handles_basic_rfc3339(self):
566
self.assertEqual(string_to_delta("PT2H"),
567
datetime.timedelta(0, 7200))
568
def test_falls_back_to_pre_1_6_1_with_warning(self):
569
# assertLogs only exists in Python 3.4
570
if hasattr(self, "assertLogs"):
571
with self.assertLogs(log, logging.WARNING):
572
value = string_to_delta("2h")
574
value = string_to_delta("2h")
575
self.assertEqual(value, datetime.timedelta(0, 7200))
578
def should_only_run_tests():
579
parser = argparse.ArgumentParser(add_help=False)
580
parser.add_argument("--check", action='store_true')
581
args, unknown_args = parser.parse_known_args()
582
run_tests = args.check
584
# Remove --check argument from sys.argv
585
sys.argv[1:] = unknown_args
588
# Add all tests from doctest strings
589
def load_tests(loader, tests, none):
591
tests.addTests(doctest.DocTestSuite())
497
594
if __name__ == "__main__":
595
if should_only_run_tests():
596
# Call using ./tdd-python-script --check [--verbose]