423
423
.format(self.name)))
426
def subprocess_call_pipe(connection, # : multiprocessing.Connection
428
"""This function is meant to be called by multiprocessing.Process
430
This function runs a synchronous subprocess.call(), and writes the
431
resulting return code on the provided multiprocessing.Connection.
433
connection.send(subprocess.call(*args, **kwargs))
427
436
class Client(object):
428
437
"""A representation of a client host served by this server.
455
464
last_checker_status: integer between 0 and 255 reflecting exit
456
465
status of last checker. -1 reflects crashed
457
466
checker, -2 means no checker completed yet.
467
last_checker_signal: The signal which killed the last checker, if
468
last_checker_status is -1
458
469
last_enabled: datetime.datetime(); (UTC) or None
459
470
name: string; from the config file, used in log messages and
460
471
D-Bus identifiers
634
645
# Also start a new checker *right now*.
635
646
self.start_checker()
637
def checker_callback(self, pid, condition, command):
648
def checker_callback(self, source, condition,
649
(connection, command)):
638
650
"""The checker has completed, so take appropriate actions."""
639
651
self.checker_callback_tag = None
640
652
self.checker = None
641
if os.WIFEXITED(condition):
642
self.last_checker_status = os.WEXITSTATUS(condition)
653
# Read return code from connection (see subprocess_call_pipe)
654
returncode = connection.recv()
658
self.last_checker_status = returncode
659
self.last_checker_signal = None
643
660
if self.last_checker_status == 0:
644
661
logger.info("Checker for %(name)s succeeded",
648
665
logger.info("Checker for %(name)s failed", vars(self))
650
667
self.last_checker_status = -1
668
self.last_checker_signal = -returncode
651
669
logger.warning("Checker for %(name)s crashed?",
654
673
def checked_ok(self):
655
674
"""Assert that the client has been seen, alive and well."""
656
675
self.last_checked_ok = datetime.datetime.utcnow()
657
676
self.last_checker_status = 0
677
self.last_checker_signal = None
658
678
self.bump_timeout()
660
680
def bump_timeout(self, timeout=None):
686
706
# than 'timeout' for the client to be disabled, which is as it
689
# If a checker exists, make sure it is not a zombie
691
pid, status = os.waitpid(self.checker.pid, os.WNOHANG)
692
except AttributeError:
694
except OSError as error:
695
if error.errno != errno.ECHILD:
699
logger.warning("Checker was a zombie")
700
gobject.source_remove(self.checker_callback_tag)
701
self.checker_callback(pid, status,
702
self.current_checker_command)
709
if self.checker is not None and not self.checker.is_alive():
710
logger.warning("Checker was not alive; joining")
703
713
# Start a new checker if needed
704
714
if self.checker is None:
705
715
# Escape attributes for the shell
715
725
return True # Try again later
716
726
self.current_checker_command = command
718
logger.info("Starting checker %r for %s", command,
720
# We don't need to redirect stdout and stderr, since
721
# in normal mode, that is already done by daemon(),
722
# and in debug mode we don't want to. (Stdin is
723
# always replaced by /dev/null.)
724
# The exception is when not debugging but nevertheless
725
# running in the foreground; use the previously
728
if (not self.server_settings["debug"]
729
and self.server_settings["foreground"]):
730
popen_args.update({"stdout": wnull,
732
self.checker = subprocess.Popen(command,
737
except OSError as error:
738
logger.error("Failed to start subprocess",
741
self.checker_callback_tag = gobject.child_watch_add(
742
self.checker.pid, self.checker_callback, data=command)
743
# The checker may have completed before the gobject
744
# watch was added. Check for this.
746
pid, status = os.waitpid(self.checker.pid, os.WNOHANG)
747
except OSError as error:
748
if error.errno == errno.ECHILD:
749
# This should never happen
750
logger.error("Child process vanished",
755
gobject.source_remove(self.checker_callback_tag)
756
self.checker_callback(pid, status, command)
727
logger.info("Starting checker %r for %s", command,
729
# We don't need to redirect stdout and stderr, since
730
# in normal mode, that is already done by daemon(),
731
# and in debug mode we don't want to. (Stdin is
732
# always replaced by /dev/null.)
733
# The exception is when not debugging but nevertheless
734
# running in the foreground; use the previously
736
popen_args = { "close_fds": True,
739
if (not self.server_settings["debug"]
740
and self.server_settings["foreground"]):
741
popen_args.update({"stdout": wnull,
743
pipe = multiprocessing.Pipe(duplex=False)
744
self.checker = multiprocessing.Process(
745
target=subprocess_call_pipe, args=(pipe[1], command),
748
self.checker_callback_tag = gobject.io_add_watch(
749
pipe[0].fileno(), gobject.IO_IN,
750
self.checker_callback, (pipe[0], command))
757
751
# Re-run this periodically if run by gobject.timeout_add
1355
1342
DBusObjectWithProperties.__del__(self, *args, **kwargs)
1356
1343
Client.__del__(self, *args, **kwargs)
1358
def checker_callback(self, pid, condition, command,
1360
self.checker_callback_tag = None
1362
if os.WIFEXITED(condition):
1363
exitstatus = os.WEXITSTATUS(condition)
1345
def checker_callback(self, source, condition,
1346
(connection, command), *args, **kwargs):
1347
ret = Client.checker_callback(self, source, condition,
1348
(connection, command), *args,
1350
exitstatus = self.last_checker_status
1364
1352
# Emit D-Bus signal
1365
1353
self.CheckerCompleted(dbus.Int16(exitstatus),
1366
dbus.Int64(condition),
1367
1355
dbus.String(command))
1369
1357
# Emit D-Bus signal
1370
1358
self.CheckerCompleted(dbus.Int16(-1),
1371
dbus.Int64(condition),
1360
self.last_checker_signal),
1372
1361
dbus.String(command))
1374
return Client.checker_callback(self, pid, condition, command,
1377
1364
def start_checker(self, *args, **kwargs):
1378
1365
old_checker_pid = getattr(self.checker, "pid", None)