189
189
facility=logging.handlers.SysLogHandler.LOG_DAEMON,
190
190
address="/dev/log"))
191
191
syslogger.setFormatter(logging.Formatter
192
('Mandos [%(process)d]: %(levelname)s:'
194
logger.addHandler(syslogger)
192
("Mandos [%(process)d]: %(levelname)s:"
194
log.addHandler(syslogger)
197
197
console = logging.StreamHandler()
198
console.setFormatter(logging.Formatter('%(asctime)s %(name)s'
202
logger.addHandler(console)
203
logger.setLevel(level)
198
console.setFormatter(logging.Formatter("%(asctime)s %(name)s"
202
log.addHandler(console)
206
206
class PGPError(Exception):
224
224
except OSError as e:
225
225
if e.errno != errno.ENOENT:
227
self.gnupgargs = ['--batch',
228
'--homedir', self.tempdir,
227
self.gnupgargs = ["--batch",
228
"--homedir", self.tempdir,
231
231
# Only GPG version 1 has the --no-use-agent option.
232
232
if self.gpg == b"gpg" or self.gpg.endswith(b"/gpg"):
233
233
self.gnupgargs.append("--no-use-agent")
351
351
interface: integer; avahi.IF_UNSPEC or an interface index.
352
352
Used to optionally bind to the specified interface.
353
name: string; Example: 'Mandos'
354
type: string; Example: '_mandos._tcp'.
353
name: string; Example: "Mandos"
354
type: string; Example: "_mandos._tcp".
355
355
See <https://www.iana.org/assignments/service-names-port-numbers>
356
356
port: integer; what port to announce
357
357
TXT: list of strings; TXT record for the service
394
394
def rename(self, remove=True):
395
395
"""Derived from the Avahi example code"""
396
396
if self.rename_count >= self.max_renames:
397
logger.critical("No suitable Zeroconf service name found"
398
" after %i retries, exiting.",
397
log.critical("No suitable Zeroconf service name found"
398
" after %i retries, exiting.",
400
400
raise AvahiServiceError("Too many renames")
402
402
self.server.GetAlternativeServiceName(self.name))
403
403
self.rename_count += 1
404
logger.info("Changing Zeroconf service name to %r ...",
404
log.info("Changing Zeroconf service name to %r ...",
410
410
except dbus.exceptions.DBusException as error:
411
411
if (error.get_dbus_name()
412
412
== "org.freedesktop.Avahi.CollisionError"):
413
logger.info("Local Zeroconf service name collision.")
413
log.info("Local Zeroconf service name collision.")
414
414
return self.rename(remove=False)
416
logger.critical("D-Bus Exception", exc_info=error)
416
log.critical("D-Bus Exception", exc_info=error)
435
435
avahi.DBUS_INTERFACE_ENTRY_GROUP)
436
436
self.entry_group_state_changed_match = (
437
437
self.group.connect_to_signal(
438
'StateChanged', self.entry_group_state_changed))
439
logger.debug("Adding Zeroconf service '%s' of type '%s' ...",
440
self.name, self.type)
438
"StateChanged", self.entry_group_state_changed))
439
log.debug("Adding Zeroconf service '%s' of type '%s' ...",
440
self.name, self.type)
441
441
self.group.AddService(
451
451
def entry_group_state_changed(self, state, error):
452
452
"""Derived from the Avahi example code"""
453
logger.debug("Avahi entry group state change: %i", state)
453
log.debug("Avahi entry group state change: %i", state)
455
455
if state == avahi.ENTRY_GROUP_ESTABLISHED:
456
logger.debug("Zeroconf service established.")
456
log.debug("Zeroconf service established.")
457
457
elif state == avahi.ENTRY_GROUP_COLLISION:
458
logger.info("Zeroconf service name collision.")
458
log.info("Zeroconf service name collision.")
460
460
elif state == avahi.ENTRY_GROUP_FAILURE:
461
logger.critical("Avahi: Error in group state changed %s",
461
log.critical("Avahi: Error in group state changed %s",
463
463
raise AvahiGroupError("State changed: {!s}".format(error))
465
465
def cleanup(self):
476
476
def server_state_changed(self, state, error=None):
477
477
"""Derived from the Avahi example code"""
478
logger.debug("Avahi server state change: %i", state)
478
log.debug("Avahi server state change: %i", state)
480
480
avahi.SERVER_INVALID: "Zeroconf server invalid",
481
481
avahi.SERVER_REGISTERING: None,
495
495
except dbus.exceptions.DBusException as error:
496
496
if (error.get_dbus_name()
497
497
== "org.freedesktop.Avahi.CollisionError"):
498
logger.info("Local Zeroconf service name"
498
log.info("Local Zeroconf service name collision.")
500
499
return self.rename(remove=False)
502
logger.critical("D-Bus Exception", exc_info=error)
501
log.critical("D-Bus Exception", exc_info=error)
506
505
if error is None:
507
logger.debug("Unknown state: %r", state)
506
log.debug("Unknown state: %r", state)
509
logger.debug("Unknown state: %r: %r", state, error)
508
log.debug("Unknown state: %r: %r", state, error)
511
510
def activate(self):
512
511
"""Derived from the Avahi example code"""
686
685
def _retry_on_error(result, func, arguments,
687
686
_error_code=_error_code):
688
687
"""A function to retry on some errors, suitable
689
for the 'errcheck' attribute on ctypes functions"""
688
for the "errcheck" attribute on ctypes functions"""
690
689
while result < gnutls.E_SUCCESS:
691
690
if result not in (gnutls.E_INTERRUPTED, gnutls.E_AGAIN):
692
691
return _error_code(result)
876
875
"""A representation of a client host served by this server.
879
approved: bool(); 'None' if not yet approved/disapproved
878
approved: bool(); None if not yet approved/disapproved
880
879
approval_delay: datetime.timedelta(); Time to wait for approval
881
880
approval_duration: datetime.timedelta(); Duration of one approval
882
881
checker: multiprocessing.Process(); a running checker process used
883
to see if the client lives. 'None' if no process is
882
to see if the client lives. None if no process is
885
884
checker_callback_tag: a GLib event source tag, or None
886
885
checker_command: string; External command which is run to check
1010
1009
self.last_enabled = None
1011
1010
self.expires = None
1013
logger.debug("Creating client %r", self.name)
1014
logger.debug(" Key ID: %s", self.key_id)
1015
logger.debug(" Fingerprint: %s", self.fingerprint)
1012
log.debug("Creating client %r", self.name)
1013
log.debug(" Key ID: %s", self.key_id)
1014
log.debug(" Fingerprint: %s", self.fingerprint)
1016
1015
self.created = settings.get("created",
1017
1016
datetime.datetime.utcnow())
1057
1055
if not getattr(self, "enabled", False):
1060
logger.info("Disabling client %s", self.name)
1058
log.info("Disabling client %s", self.name)
1061
1059
if getattr(self, "disable_initiator_tag", None) is not None:
1062
1060
GLib.source_remove(self.disable_initiator_tag)
1063
1061
self.disable_initiator_tag = None
1075
1073
def __del__(self):
1078
def init_checker(self):
1079
# Schedule a new checker to be started an 'interval' from now,
1080
# and every interval from then on.
1076
def init_checker(self, randomize_start=False):
1077
# Schedule a new checker to be started a randomly selected
1078
# time (a fraction of 'interval') from now. This spreads out
1079
# the startup of checkers over time when the server is
1081
1081
if self.checker_initiator_tag is not None:
1082
1082
GLib.source_remove(self.checker_initiator_tag)
1083
interval_milliseconds = int(self.interval.total_seconds()
1086
delay_milliseconds = random.randrange(
1087
interval_milliseconds + 1)
1089
delay_milliseconds = interval_milliseconds
1083
1090
self.checker_initiator_tag = GLib.timeout_add(
1084
random.randrange(int(self.interval.total_seconds() * 1000
1087
# Schedule a disable() when 'timeout' has passed
1091
delay_milliseconds, self.start_checker, randomize_start)
1092
delay = datetime.timedelta(0, 0, 0, delay_milliseconds)
1093
# A checker might take up to an 'interval' of time, so we can
1094
# expire at the soonest one interval after a checker was
1095
# started. Since the initial checker is delayed, the expire
1096
# time might have to be extended.
1097
now = datetime.datetime.utcnow()
1098
self.expires = now + delay + self.interval
1099
# Schedule a disable() at expire time
1088
1100
if self.disable_initiator_tag is not None:
1089
1101
GLib.source_remove(self.disable_initiator_tag)
1090
1102
self.disable_initiator_tag = GLib.timeout_add(
1091
int(self.timeout.total_seconds() * 1000), self.disable)
1092
# Also start a new checker *right now*.
1093
self.start_checker()
1103
int((self.expires - now).total_seconds() * 1000),
1095
1106
def checker_callback(self, source, condition, connection,
1107
1118
self.last_checker_status = returncode
1108
1119
self.last_checker_signal = None
1109
1120
if self.last_checker_status == 0:
1110
logger.info("Checker for %(name)s succeeded",
1121
log.info("Checker for %(name)s succeeded", vars(self))
1112
1122
self.checked_ok()
1114
logger.info("Checker for %(name)s failed", vars(self))
1124
log.info("Checker for %(name)s failed", vars(self))
1116
1126
self.last_checker_status = -1
1117
1127
self.last_checker_signal = -returncode
1118
logger.warning("Checker for %(name)s crashed?",
1128
log.warning("Checker for %(name)s crashed?", vars(self))
1122
1131
def checked_ok(self):
1169
1178
command = self.checker_command % escaped_attrs
1170
1179
except TypeError as error:
1171
logger.error('Could not format string "%s"',
1172
self.checker_command,
1180
log.error('Could not format string "%s"',
1181
self.checker_command, exc_info=error)
1174
1182
return True # Try again later
1175
1183
self.current_checker_command = command
1176
logger.info("Starting checker %r for %s", command,
1184
log.info("Starting checker %r for %s", command, self.name)
1178
1185
# We don't need to redirect stdout and stderr, since
1179
1186
# in normal mode, that is already done by daemon(),
1180
1187
# and in debug mode we don't want to. (Stdin is
1199
1206
GLib.IOChannel.unix_new(pipe[0].fileno()),
1200
1207
GLib.PRIORITY_DEFAULT, GLib.IO_IN,
1201
1208
self.checker_callback, pipe[0], command)
1209
if start_was_randomized:
1210
# We were started after a random delay; Schedule a new
1211
# checker to be started an 'interval' from now, and every
1212
# interval from then on.
1213
now = datetime.datetime.utcnow()
1214
self.checker_initiator_tag = GLib.timeout_add(
1215
int(self.interval.total_seconds() * 1000),
1217
self.expires = max(self.expires, now + self.interval)
1218
# Don't start a new checker again after same random delay
1202
1220
# Re-run this periodically if run by GLib.timeout_add
1338
1356
@dbus.service.method(dbus.INTROSPECTABLE_IFACE,
1339
1357
out_signature="s",
1340
path_keyword='object_path',
1341
connection_keyword='connection')
1358
path_keyword="object_path",
1359
connection_keyword="connection")
1342
1360
def Introspect(self, object_path, connection):
1343
1361
"""Overloading of standard D-Bus method.
1498
1516
@dbus.service.method(dbus.INTROSPECTABLE_IFACE,
1499
1517
out_signature="s",
1500
path_keyword='object_path',
1501
connection_keyword='connection')
1518
path_keyword="object_path",
1519
connection_keyword="connection")
1502
1520
def Introspect(self, object_path, connection):
1503
1521
"""Overloading of standard D-Bus method.
1600
1618
@dbus.service.method(dbus.INTROSPECTABLE_IFACE,
1601
1619
out_signature="s",
1602
path_keyword='object_path',
1603
connection_keyword='connection')
1620
path_keyword="object_path",
1621
connection_keyword="connection")
1604
1622
def Introspect(self, object_path, connection):
1605
1623
"""Overloading of standard D-Bus method.
2272
2290
class ProxyClient:
2273
2291
def __init__(self, child_pipe, key_id, fpr, address):
2274
2292
self._pipe = child_pipe
2275
self._pipe.send(('init', key_id, fpr, address))
2293
self._pipe.send(("init", key_id, fpr, address))
2276
2294
if not self._pipe.recv():
2277
2295
raise KeyError(key_id or fpr)
2279
2297
def __getattribute__(self, name):
2281
2299
return super(ProxyClient, self).__getattribute__(name)
2282
self._pipe.send(('getattr', name))
2300
self._pipe.send(("getattr", name))
2283
2301
data = self._pipe.recv()
2284
if data[0] == 'data':
2302
if data[0] == "data":
2286
if data[0] == 'function':
2304
if data[0] == "function":
2288
2306
def func(*args, **kwargs):
2289
self._pipe.send(('funcall', name, args, kwargs))
2307
self._pipe.send(("funcall", name, args, kwargs))
2290
2308
return self._pipe.recv()[1]
2294
2312
def __setattr__(self, name, value):
2296
2314
return super(ProxyClient, self).__setattr__(name, value)
2297
self._pipe.send(('setattr', name, value))
2315
self._pipe.send(("setattr", name, value))
2300
2318
class ClientHandler(socketserver.BaseRequestHandler, object):
2306
2324
def handle(self):
2307
2325
with contextlib.closing(self.server.child_pipe) as child_pipe:
2308
logger.info("TCP connection from: %s",
2309
str(self.client_address))
2310
logger.debug("Pipe FD: %d",
2311
self.server.child_pipe.fileno())
2326
log.info("TCP connection from: %s",
2327
str(self.client_address))
2328
log.debug("Pipe FD: %d", self.server.child_pipe.fileno())
2313
2330
session = gnutls.ClientSession(self.request)
2315
# priority = ':'.join(("NONE", "+VERS-TLS1.1",
2332
# priority = ":".join(("NONE", "+VERS-TLS1.1",
2316
2333
# "+AES-256-CBC", "+SHA1",
2317
2334
# "+COMP-NULL", "+CTYPE-OPENPGP",
2326
2343
# Start communication using the Mandos protocol
2327
2344
# Get protocol number
2328
2345
line = self.request.makefile().readline()
2329
logger.debug("Protocol version: %r", line)
2346
log.debug("Protocol version: %r", line)
2331
2348
if int(line.strip().split()[0]) > 1:
2332
2349
raise RuntimeError(line)
2333
2350
except (ValueError, IndexError, RuntimeError) as error:
2334
logger.error("Unknown protocol version: %s", error)
2351
log.error("Unknown protocol version: %s", error)
2337
2354
# Start GnuTLS connection
2339
2356
session.handshake()
2340
2357
except gnutls.Error as error:
2341
logger.warning("Handshake failed: %s", error)
2358
log.warning("Handshake failed: %s", error)
2342
2359
# Do not run session.bye() here: the session is not
2343
2360
# established. Just abandon the request.
2345
logger.debug("Handshake succeeded")
2362
log.debug("Handshake succeeded")
2347
2364
approval_required = False
2364
2381
fpr = self.fingerprint(
2365
2382
self.peer_certificate(session))
2366
2383
except (TypeError, gnutls.Error) as error:
2367
logger.warning("Bad certificate: %s", error)
2384
log.warning("Bad certificate: %s", error)
2369
logger.debug("Fingerprint: %s", fpr)
2386
log.debug("Fingerprint: %s", fpr)
2372
2389
client = ProxyClient(child_pipe, key_id, fpr,
2392
2408
# We are approved or approval is disabled
2394
2410
elif client.approved is None:
2395
logger.info("Client %s needs approval",
2411
log.info("Client %s needs approval",
2397
2413
if self.server.use_dbus:
2398
2414
# Emit D-Bus signal
2399
2415
client.NeedApproval(
2400
2416
client.approval_delay.total_seconds()
2401
2417
* 1000, client.approved_by_default)
2403
logger.warning("Client %s was not approved",
2419
log.warning("Client %s was not approved",
2405
2421
if self.server.use_dbus:
2406
2422
# Emit D-Bus signal
2407
2423
client.Rejected("Denied")
2431
2447
session.send(client.secret)
2432
2448
except gnutls.Error as error:
2433
logger.warning("gnutls send failed",
2449
log.warning("gnutls send failed", exc_info=error)
2437
logger.info("Sending secret to %s", client.name)
2452
log.info("Sending secret to %s", client.name)
2438
2453
# bump the timeout using extended_timeout
2439
2454
client.bump_timeout(client.extended_timeout)
2440
2455
if self.server.use_dbus:
2464
2478
valid_cert_types = frozenset((gnutls.CRT_OPENPGP,))
2465
2479
# If not a valid certificate type...
2466
2480
if cert_type not in valid_cert_types:
2467
logger.info("Cert type %r not in %r", cert_type,
2481
log.info("Cert type %r not in %r", cert_type,
2469
2483
# ...return invalid data
2471
2485
list_size = ctypes.c_uint(1)
2655
2669
(self.interface + "\0").encode("utf-8"))
2656
2670
except socket.error as error:
2657
2671
if error.errno == errno.EPERM:
2658
logger.error("No permission to bind to"
2659
" interface %s", self.interface)
2672
log.error("No permission to bind to interface %s",
2660
2674
elif error.errno == errno.ENOPROTOOPT:
2661
logger.error("SO_BINDTODEVICE not available;"
2662
" cannot bind to interface %s",
2675
log.error("SO_BINDTODEVICE not available; cannot"
2676
" bind to interface %s", self.interface)
2664
2677
elif error.errno == errno.ENODEV:
2665
logger.error("Interface %s does not exist,"
2666
" cannot bind", self.interface)
2678
log.error("Interface %s does not exist, cannot"
2679
" bind", self.interface)
2669
2682
# Only bind(2) the socket if we really need to.
2767
logger.info("Client not found for key ID: %s, address"
2768
": %s", key_id or fpr, address)
2780
log.info("Client not found for key ID: %s, address:"
2781
" %s", key_id or fpr, address)
2769
2782
if self.use_dbus:
2770
2783
# Emit D-Bus signal
2771
2784
mandos_dbus_service.ClientNotFound(key_id or fpr,
2784
2797
# remove the old hook in favor of the new above hook on
2787
if command == 'funcall':
2800
if command == "funcall":
2788
2801
funcname = request[1]
2789
2802
args = request[2]
2790
2803
kwargs = request[3]
2792
parent_pipe.send(('data', getattr(client_object,
2805
parent_pipe.send(("data", getattr(client_object,
2793
2806
funcname)(*args,
2796
if command == 'getattr':
2809
if command == "getattr":
2797
2810
attrname = request[1]
2798
2811
if isinstance(client_object.__getattribute__(attrname),
2799
2812
collections.abc.Callable):
2800
parent_pipe.send(('function', ))
2813
parent_pipe.send(("function", ))
2802
2815
parent_pipe.send((
2803
'data', client_object.__getattribute__(attrname)))
2816
"data", client_object.__getattribute__(attrname)))
2805
if command == 'setattr':
2818
if command == "setattr":
2806
2819
attrname = request[1]
2807
2820
value = request[2]
2808
2821
setattr(client_object, attrname, value)
2914
2927
def string_to_delta(interval):
2915
2928
"""Parse a string and return a datetime.timedelta
2917
>>> string_to_delta('7d') == datetime.timedelta(7)
2919
>>> string_to_delta('60s') == datetime.timedelta(0, 60)
2921
>>> string_to_delta('60m') == datetime.timedelta(0, 3600)
2923
>>> string_to_delta('24h') == datetime.timedelta(1)
2925
>>> string_to_delta('1w') == datetime.timedelta(7)
2927
>>> string_to_delta('5m 30s') == datetime.timedelta(0, 330)
2930
>>> string_to_delta("7d") == datetime.timedelta(7)
2932
>>> string_to_delta("60s") == datetime.timedelta(0, 60)
2934
>>> string_to_delta("60m") == datetime.timedelta(0, 3600)
2936
>>> string_to_delta("24h") == datetime.timedelta(1)
2938
>>> string_to_delta("1w") == datetime.timedelta(7)
2940
>>> string_to_delta("5m 30s") == datetime.timedelta(0, 330)
3166
3179
pidfile = codecs.open(pidfilename, "w", encoding="utf-8")
3167
3180
except IOError as e:
3168
logger.error("Could not open file %r", pidfilename,
3181
log.error("Could not open file %r", pidfilename,
3171
3184
for name, group in (("_mandos", "_mandos"),
3172
3185
("mandos", "mandos"),
3187
logger.debug("Did setuid/setgid to {}:{}".format(uid,
3199
log.debug("Did setuid/setgid to %s:%s", uid, gid)
3189
3200
except OSError as error:
3190
logger.warning("Failed to setuid/setgid to {}:{}: {}"
3191
.format(uid, gid, os.strerror(error.errno)))
3201
log.warning("Failed to setuid/setgid to %s:%s: %s", uid, gid,
3202
os.strerror(error.errno))
3192
3203
if error.errno != errno.EPERM:
3329
3339
os.remove(stored_state_path)
3330
3340
except IOError as e:
3331
3341
if e.errno == errno.ENOENT:
3332
logger.warning("Could not load persistent state:"
3333
" {}".format(os.strerror(e.errno)))
3342
log.warning("Could not load persistent state:"
3343
" %s", os.strerror(e.errno))
3335
logger.critical("Could not load persistent state:",
3345
log.critical("Could not load persistent state:",
3338
3348
except EOFError as e:
3339
logger.warning("Could not load persistent state: "
3349
log.warning("Could not load persistent state: EOFError:",
3343
3352
with PGPEngine() as pgp:
3344
3353
for client_name, client in clients_data.items():
3371
3380
if client["enabled"]:
3372
3381
if datetime.datetime.utcnow() >= client["expires"]:
3373
3382
if not client["last_checked_ok"]:
3375
"disabling client {} - Client never "
3376
"performed a successful checker".format(
3383
log.warning("disabling client %s - Client"
3384
" never performed a successful"
3385
" checker", client_name)
3378
3386
client["enabled"] = False
3379
3387
elif client["last_checker_status"] != 0:
3381
"disabling client {} - Client last"
3382
" checker failed with error code"
3385
client["last_checker_status"]))
3388
log.warning("disabling client %s - Client"
3389
" last checker failed with error"
3390
" code %s", client_name,
3391
client["last_checker_status"])
3386
3392
client["enabled"] = False
3388
3394
client["expires"] = (
3389
3395
datetime.datetime.utcnow()
3390
3396
+ client["timeout"])
3391
logger.debug("Last checker succeeded,"
3392
" keeping {} enabled".format(
3397
log.debug("Last checker succeeded, keeping %s"
3398
" enabled", client_name)
3395
3400
client["secret"] = pgp.decrypt(
3396
3401
client["encrypted_secret"],
3397
3402
client_settings[client_name]["secret"])
3398
3403
except PGPError:
3399
3404
# If decryption fails, we use secret from new settings
3400
logger.debug("Failed to decrypt {} old secret".format(
3405
log.debug("Failed to decrypt %s old secret",
3402
3407
client["secret"] = (client_settings[client_name]
3600
3605
except NameError:
3602
3607
if e.errno in (errno.ENOENT, errno.EACCES, errno.EEXIST):
3603
logger.warning("Could not save persistent state: {}"
3604
.format(os.strerror(e.errno)))
3608
log.warning("Could not save persistent state: %s",
3609
os.strerror(e.errno))
3606
logger.warning("Could not save persistent state:",
3611
log.warning("Could not save persistent state:",
3610
3615
# Delete all clients, and settings from config
3637
3642
service.port = tcp_server.socket.getsockname()[1]
3639
logger.info("Now listening on address %r, port %d,"
3640
" flowinfo %d, scope_id %d",
3641
*tcp_server.socket.getsockname())
3644
log.info("Now listening on address %r, port %d, flowinfo %d,"
3645
" scope_id %d", *tcp_server.socket.getsockname())
3643
logger.info("Now listening on address %r, port %d",
3644
*tcp_server.socket.getsockname())
3647
log.info("Now listening on address %r, port %d",
3648
*tcp_server.socket.getsockname())
3646
3650
# service.interface = tcp_server.socket.getsockname()[3]
3662
3666
lambda *args, **kwargs: (tcp_server.handle_request
3663
3667
(*args[2:], **kwargs) or True))
3665
logger.debug("Starting main loop")
3669
log.debug("Starting main loop")
3666
3670
main_loop.run()
3667
3671
except AvahiError as error:
3668
logger.critical("Avahi Error", exc_info=error)
3672
log.critical("Avahi Error", exc_info=error)
3671
3675
except KeyboardInterrupt:
3673
3677
print("", file=sys.stderr)
3674
logger.debug("Server received KeyboardInterrupt")
3675
logger.debug("Server exiting")
3678
log.debug("Server received KeyboardInterrupt")
3679
log.debug("Server exiting")
3676
3680
# Must run before the D-Bus bus name gets deregistered
3680
def should_only_run_tests():
3684
def parse_test_args():
3685
# type: () -> argparse.Namespace
3681
3686
parser = argparse.ArgumentParser(add_help=False)
3682
parser.add_argument("--check", action='store_true')
3687
parser.add_argument("--check", action="store_true")
3688
parser.add_argument("--prefix", )
3683
3689
args, unknown_args = parser.parse_known_args()
3684
run_tests = args.check
3686
# Remove --check argument from sys.argv
3691
# Remove test options from sys.argv
3687
3692
sys.argv[1:] = unknown_args
3690
3695
# Add all tests from doctest strings
3691
3696
def load_tests(loader, tests, none):
3693
3698
tests.addTests(doctest.DocTestSuite())
3696
if __name__ == '__main__':
3701
if __name__ == "__main__":
3702
options = parse_test_args()
3698
if should_only_run_tests():
3699
# Call using ./mandos --check [--verbose]
3705
extra_test_prefix = options.prefix
3706
if extra_test_prefix is not None:
3707
if not (unittest.main(argv=[""], exit=False)
3708
.result.wasSuccessful()):
3710
class ExtraTestLoader(unittest.TestLoader):
3711
testMethodPrefix = extra_test_prefix
3712
# Call using ./scriptname --test [--verbose]
3713
unittest.main(argv=[""], testLoader=ExtraTestLoader())
3715
unittest.main(argv=[""])
3704
3719
logging.shutdown()
3723
# (lambda (&optional extra)
3724
# (if (not (funcall run-tests-in-test-buffer default-directory
3726
# (funcall show-test-buffer-in-test-window)
3727
# (funcall remove-test-window)
3728
# (if extra (message "Extra tests run successfully!"))))
3729
# run-tests-in-test-buffer:
3730
# (lambda (dir &optional extra)
3731
# (with-current-buffer (get-buffer-create "*Test*")
3732
# (setq buffer-read-only nil
3733
# default-directory dir)
3735
# (compilation-mode))
3736
# (let ((process-result
3737
# (let ((inhibit-read-only t))
3738
# (process-file-shell-command
3739
# (funcall get-command-line extra) nil "*Test*"))))
3740
# (and (numberp process-result)
3741
# (= process-result 0))))
3743
# (lambda (&optional extra)
3744
# (let ((quoted-script
3745
# (shell-quote-argument (funcall get-script-name))))
3747
# (concat "%s --check" (if extra " --prefix=atest" ""))
3751
# (if (fboundp 'file-local-name)
3752
# (file-local-name (buffer-file-name))
3753
# (or (file-remote-p (buffer-file-name) 'localname)
3754
# (buffer-file-name))))
3755
# remove-test-window:
3757
# (let ((test-window (get-buffer-window "*Test*")))
3758
# (if test-window (delete-window test-window))))
3759
# show-test-buffer-in-test-window:
3761
# (when (not (get-buffer-window-list "*Test*"))
3762
# (setq next-error-last-buffer (get-buffer "*Test*"))
3763
# (let* ((side (if (>= (window-width) 146) 'right 'bottom))
3764
# (display-buffer-overriding-action
3765
# `((display-buffer-in-side-window) (side . ,side)
3766
# (window-height . fit-window-to-buffer)
3767
# (window-width . fit-window-to-buffer))))
3768
# (display-buffer "*Test*"))))
3771
# (let* ((run-extra-tests (lambda () (interactive)
3772
# (funcall run-tests t)))
3773
# (inner-keymap `(keymap (116 . ,run-extra-tests))) ; t
3774
# (outer-keymap `(keymap (3 . ,inner-keymap)))) ; C-c
3775
# (setq minor-mode-overriding-map-alist
3776
# (cons `(run-tests . ,outer-keymap)
3777
# minor-mode-overriding-map-alist)))
3778
# (add-hook 'after-save-hook run-tests 90 t))