424
424
.format(self.name)))
427
def call_pipe(connection, # : multiprocessing.Connection
428
func, *args, **kwargs):
429
"""This function is meant to be called by multiprocessing.Process
431
This function runs func(*args, **kwargs), and writes the resulting
432
return value on the provided multiprocessing.Connection.
434
connection.send(func(*args, **kwargs))
428
437
class Client(object):
429
438
"""A representation of a client host served by this server.
456
465
last_checker_status: integer between 0 and 255 reflecting exit
457
466
status of last checker. -1 reflects crashed
458
467
checker, -2 means no checker completed yet.
468
last_checker_signal: The signal which killed the last checker, if
469
last_checker_status is -1
459
470
last_enabled: datetime.datetime(); (UTC) or None
460
471
name: string; from the config file, used in log messages and
461
472
D-Bus identifiers
635
646
# Also start a new checker *right now*.
636
647
self.start_checker()
638
def checker_callback(self, pid, condition, command):
649
def checker_callback(self, source, condition, connection,
639
651
"""The checker has completed, so take appropriate actions."""
640
652
self.checker_callback_tag = None
641
653
self.checker = None
642
if os.WIFEXITED(condition):
643
self.last_checker_status = os.WEXITSTATUS(condition)
654
# Read return code from connection (see call_pipe)
655
returncode = connection.recv()
659
self.last_checker_status = returncode
660
self.last_checker_signal = None
644
661
if self.last_checker_status == 0:
645
662
logger.info("Checker for %(name)s succeeded",
649
666
logger.info("Checker for %(name)s failed", vars(self))
651
668
self.last_checker_status = -1
669
self.last_checker_signal = -returncode
652
670
logger.warning("Checker for %(name)s crashed?",
655
674
def checked_ok(self):
656
675
"""Assert that the client has been seen, alive and well."""
657
676
self.last_checked_ok = datetime.datetime.utcnow()
658
677
self.last_checker_status = 0
678
self.last_checker_signal = None
659
679
self.bump_timeout()
661
681
def bump_timeout(self, timeout=None):
687
707
# than 'timeout' for the client to be disabled, which is as it
690
# If a checker exists, make sure it is not a zombie
692
pid, status = os.waitpid(self.checker.pid, os.WNOHANG)
693
except AttributeError:
695
except OSError as error:
696
if error.errno != errno.ECHILD:
700
logger.warning("Checker was a zombie")
701
gobject.source_remove(self.checker_callback_tag)
702
self.checker_callback(pid, status,
703
self.current_checker_command)
710
if self.checker is not None and not self.checker.is_alive():
711
logger.warning("Checker was not alive; joining")
704
714
# Start a new checker if needed
705
715
if self.checker is None:
706
716
# Escape attributes for the shell
716
726
return True # Try again later
717
727
self.current_checker_command = command
719
logger.info("Starting checker %r for %s", command,
721
# We don't need to redirect stdout and stderr, since
722
# in normal mode, that is already done by daemon(),
723
# and in debug mode we don't want to. (Stdin is
724
# always replaced by /dev/null.)
725
# The exception is when not debugging but nevertheless
726
# running in the foreground; use the previously
729
if (not self.server_settings["debug"]
730
and self.server_settings["foreground"]):
731
popen_args.update({"stdout": wnull,
733
self.checker = subprocess.Popen(command,
738
except OSError as error:
739
logger.error("Failed to start subprocess",
742
self.checker_callback_tag = gobject.child_watch_add(
743
self.checker.pid, self.checker_callback, data=command)
744
# The checker may have completed before the gobject
745
# watch was added. Check for this.
747
pid, status = os.waitpid(self.checker.pid, os.WNOHANG)
748
except OSError as error:
749
if error.errno == errno.ECHILD:
750
# This should never happen
751
logger.error("Child process vanished",
756
gobject.source_remove(self.checker_callback_tag)
757
self.checker_callback(pid, status, command)
728
logger.info("Starting checker %r for %s", command,
730
# We don't need to redirect stdout and stderr, since
731
# in normal mode, that is already done by daemon(),
732
# and in debug mode we don't want to. (Stdin is
733
# always replaced by /dev/null.)
734
# The exception is when not debugging but nevertheless
735
# running in the foreground; use the previously
737
popen_args = { "close_fds": True,
740
if (not self.server_settings["debug"]
741
and self.server_settings["foreground"]):
742
popen_args.update({"stdout": wnull,
744
pipe = multiprocessing.Pipe(duplex = False)
745
self.checker = multiprocessing.Process(
747
args = (pipe[1], subprocess.call, command),
750
self.checker_callback_tag = gobject.io_add_watch(
751
pipe[0].fileno(), gobject.IO_IN,
752
self.checker_callback, pipe[0], command)
758
753
# Re-run this periodically if run by gobject.timeout_add
1123
1111
# Create a new, but exactly alike, function
1124
1112
# object, and decorate it to be a new D-Bus signal
1125
1113
# with the alternate D-Bus interface name
1126
if sys.version == 2:
1114
if sys.version_info.major == 2:
1127
1115
new_function = types.FunctionType(
1128
1116
nonmethod_func.func_code,
1129
1117
nonmethod_func.func_globals,
1368
1356
DBusObjectWithProperties.__del__(self, *args, **kwargs)
1369
1357
Client.__del__(self, *args, **kwargs)
1371
def checker_callback(self, pid, condition, command,
1373
self.checker_callback_tag = None
1375
if os.WIFEXITED(condition):
1376
exitstatus = os.WEXITSTATUS(condition)
1359
def checker_callback(self, source, condition,
1360
connection, command, *args, **kwargs):
1361
ret = Client.checker_callback(self, source, condition,
1362
connection, command, *args,
1364
exitstatus = self.last_checker_status
1377
1366
# Emit D-Bus signal
1378
1367
self.CheckerCompleted(dbus.Int16(exitstatus),
1379
dbus.Int64(condition),
1380
1369
dbus.String(command))
1382
1371
# Emit D-Bus signal
1383
1372
self.CheckerCompleted(dbus.Int16(-1),
1384
dbus.Int64(condition),
1374
self.last_checker_signal),
1385
1375
dbus.String(command))
1387
return Client.checker_callback(self, pid, condition, command,
1390
1378
def start_checker(self, *args, **kwargs):
1391
1379
old_checker_pid = getattr(self.checker, "pid", None)