424
411
.format(self.name)))
427
def call_pipe(connection, # : multiprocessing.Connection
428
func, *args, **kwargs):
429
"""This function is meant to be called by multiprocessing.Process
431
This function runs func(*args, **kwargs), and writes the resulting
432
return value on the provided multiprocessing.Connection.
434
connection.send(func(*args, **kwargs))
437
415
class Client(object):
438
416
"""A representation of a client host served by this server.
465
443
last_checker_status: integer between 0 and 255 reflecting exit
466
444
status of last checker. -1 reflects crashed
467
445
checker, -2 means no checker completed yet.
468
last_checker_signal: The signal which killed the last checker, if
469
last_checker_status is -1
470
446
last_enabled: datetime.datetime(); (UTC) or None
471
447
name: string; from the config file, used in log messages and
472
448
D-Bus identifiers
646
622
# Also start a new checker *right now*.
647
623
self.start_checker()
649
def checker_callback(self, source, condition, connection,
625
def checker_callback(self, pid, condition, command):
651
626
"""The checker has completed, so take appropriate actions."""
652
627
self.checker_callback_tag = None
653
628
self.checker = None
654
# Read return code from connection (see call_pipe)
655
returncode = connection.recv()
659
self.last_checker_status = returncode
660
self.last_checker_signal = None
629
if os.WIFEXITED(condition):
630
self.last_checker_status = os.WEXITSTATUS(condition)
661
631
if self.last_checker_status == 0:
662
632
logger.info("Checker for %(name)s succeeded",
666
636
logger.info("Checker for %(name)s failed", vars(self))
668
638
self.last_checker_status = -1
669
self.last_checker_signal = -returncode
670
639
logger.warning("Checker for %(name)s crashed?",
674
642
def checked_ok(self):
675
643
"""Assert that the client has been seen, alive and well."""
676
644
self.last_checked_ok = datetime.datetime.utcnow()
677
645
self.last_checker_status = 0
678
self.last_checker_signal = None
679
646
self.bump_timeout()
681
648
def bump_timeout(self, timeout=None):
707
674
# than 'timeout' for the client to be disabled, which is as it
710
if self.checker is not None and not self.checker.is_alive():
711
logger.warning("Checker was not alive; joining")
677
# If a checker exists, make sure it is not a zombie
679
pid, status = os.waitpid(self.checker.pid, os.WNOHANG)
680
except AttributeError:
682
except OSError as error:
683
if error.errno != errno.ECHILD:
687
logger.warning("Checker was a zombie")
688
gobject.source_remove(self.checker_callback_tag)
689
self.checker_callback(pid, status,
690
self.current_checker_command)
714
691
# Start a new checker if needed
715
692
if self.checker is None:
716
693
# Escape attributes for the shell
726
703
return True # Try again later
727
704
self.current_checker_command = command
728
logger.info("Starting checker %r for %s", command,
730
# We don't need to redirect stdout and stderr, since
731
# in normal mode, that is already done by daemon(),
732
# and in debug mode we don't want to. (Stdin is
733
# always replaced by /dev/null.)
734
# The exception is when not debugging but nevertheless
735
# running in the foreground; use the previously
737
popen_args = { "close_fds": True,
740
if (not self.server_settings["debug"]
741
and self.server_settings["foreground"]):
742
popen_args.update({"stdout": wnull,
744
pipe = multiprocessing.Pipe(duplex = False)
745
self.checker = multiprocessing.Process(
747
args = (pipe[1], subprocess.call, command),
750
self.checker_callback_tag = gobject.io_add_watch(
751
pipe[0].fileno(), gobject.IO_IN,
752
self.checker_callback, pipe[0], command)
706
logger.info("Starting checker %r for %s", command,
708
# We don't need to redirect stdout and stderr, since
709
# in normal mode, that is already done by daemon(),
710
# and in debug mode we don't want to. (Stdin is
711
# always replaced by /dev/null.)
712
# The exception is when not debugging but nevertheless
713
# running in the foreground; use the previously
716
if (not self.server_settings["debug"]
717
and self.server_settings["foreground"]):
718
popen_args.update({"stdout": wnull,
720
self.checker = subprocess.Popen(command,
725
except OSError as error:
726
logger.error("Failed to start subprocess",
729
self.checker_callback_tag = gobject.child_watch_add(
730
self.checker.pid, self.checker_callback, data=command)
731
# The checker may have completed before the gobject
732
# watch was added. Check for this.
734
pid, status = os.waitpid(self.checker.pid, os.WNOHANG)
735
except OSError as error:
736
if error.errno == errno.ECHILD:
737
# This should never happen
738
logger.error("Child process vanished",
743
gobject.source_remove(self.checker_callback_tag)
744
self.checker_callback(pid, status, command)
753
745
# Re-run this periodically if run by gobject.timeout_add
1099
1098
interface_names.add(alt_interface)
1100
1099
# Is this a D-Bus signal?
1101
1100
if getattr(attribute, "_dbus_is_signal", False):
1102
if sys.version_info.major == 2:
1103
# Extract the original non-method undecorated
1104
# function by black magic
1105
nonmethod_func = (dict(
1106
zip(attribute.func_code.co_freevars,
1107
attribute.__closure__))
1108
["func"].cell_contents)
1110
nonmethod_func = attribute
1101
# Extract the original non-method undecorated
1102
# function by black magic
1103
nonmethod_func = (dict(
1104
zip(attribute.func_code.co_freevars,
1105
attribute.__closure__))
1106
["func"].cell_contents)
1111
1107
# Create a new, but exactly alike, function
1112
1108
# object, and decorate it to be a new D-Bus signal
1113
1109
# with the alternate D-Bus interface name
1114
if sys.version_info.major == 2:
1115
new_function = types.FunctionType(
1116
nonmethod_func.func_code,
1117
nonmethod_func.func_globals,
1118
nonmethod_func.func_name,
1119
nonmethod_func.func_defaults,
1120
nonmethod_func.func_closure)
1122
new_function = types.FunctionType(
1123
nonmethod_func.__code__,
1124
nonmethod_func.__globals__,
1125
nonmethod_func.__name__,
1126
nonmethod_func.__defaults__,
1127
nonmethod_func.__closure__)
1128
1110
new_function = (dbus.service.signal(
1130
attribute._dbus_signature)(new_function))
1111
alt_interface, attribute._dbus_signature)
1112
(types.FunctionType(
1113
nonmethod_func.func_code,
1114
nonmethod_func.func_globals,
1115
nonmethod_func.func_name,
1116
nonmethod_func.func_defaults,
1117
nonmethod_func.func_closure)))
1131
1118
# Copy annotations, if any
1133
1120
new_function._dbus_annotations = dict(
1356
1343
DBusObjectWithProperties.__del__(self, *args, **kwargs)
1357
1344
Client.__del__(self, *args, **kwargs)
1359
def checker_callback(self, source, condition,
1360
connection, command, *args, **kwargs):
1361
ret = Client.checker_callback(self, source, condition,
1362
connection, command, *args,
1364
exitstatus = self.last_checker_status
1346
def checker_callback(self, pid, condition, command,
1348
self.checker_callback_tag = None
1350
if os.WIFEXITED(condition):
1351
exitstatus = os.WEXITSTATUS(condition)
1366
1352
# Emit D-Bus signal
1367
1353
self.CheckerCompleted(dbus.Int16(exitstatus),
1354
dbus.Int64(condition),
1369
1355
dbus.String(command))
1371
1357
# Emit D-Bus signal
1372
1358
self.CheckerCompleted(dbus.Int16(-1),
1374
self.last_checker_signal),
1359
dbus.Int64(condition),
1375
1360
dbus.String(command))
1362
return Client.checker_callback(self, pid, condition, command,
1378
1365
def start_checker(self, *args, **kwargs):
1379
1366
old_checker_pid = getattr(self.checker, "pid", None)
2183
2169
# avoid excessive use of external libraries.
2185
2171
# New type for defining tokens, syntax, and semantics all-in-one
2172
Token = collections.namedtuple("Token",
2173
("regexp", # To match token; if
2174
# "value" is not None,
2175
# must have a "group"
2177
"value", # datetime.timedelta or
2179
"followers")) # Tokens valid after
2186
2181
Token = collections.namedtuple("Token", (
2187
2182
"regexp", # To match token; if "value" is not None, must have
2188
2183
# a "group" containing digits