/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos

  • Committer: Teddy Hogeborn
  • Date: 2015-05-31 20:12:27 UTC
  • mfrom: (749.1.2 multiprocessing-exec)
  • Revision ID: teddy@recompile.se-20150531201227-u3n1z1ito5siqh7c
Merge change to use multiprocessing module to run checkers.

Show diffs side-by-side

added added

removed removed

Lines of Context:
424
424
            .format(self.name)))
425
425
        return ret
426
426
 
 
427
def call_pipe(connection,       # : multiprocessing.Connection
 
428
              func, *args, **kwargs):
 
429
    """This function is meant to be called by multiprocessing.Process
 
430
    
 
431
    This function runs func(*args, **kwargs), and writes the resulting
 
432
    return value on the provided multiprocessing.Connection.
 
433
    """
 
434
    connection.send(func(*args, **kwargs))
 
435
    connection.close()
427
436
 
428
437
class Client(object):
429
438
    """A representation of a client host served by this server.
456
465
    last_checker_status: integer between 0 and 255 reflecting exit
457
466
                         status of last checker. -1 reflects crashed
458
467
                         checker, -2 means no checker completed yet.
 
468
    last_checker_signal: The signal which killed the last checker, if
 
469
                         last_checker_status is -1
459
470
    last_enabled: datetime.datetime(); (UTC) or None
460
471
    name:       string; from the config file, used in log messages and
461
472
                        D-Bus identifiers
635
646
        # Also start a new checker *right now*.
636
647
        self.start_checker()
637
648
    
638
 
    def checker_callback(self, pid, condition, command):
 
649
    def checker_callback(self, source, condition, connection,
 
650
                         command):
639
651
        """The checker has completed, so take appropriate actions."""
640
652
        self.checker_callback_tag = None
641
653
        self.checker = None
642
 
        if os.WIFEXITED(condition):
643
 
            self.last_checker_status = os.WEXITSTATUS(condition)
 
654
        # Read return code from connection (see call_pipe)
 
655
        returncode = connection.recv()
 
656
        connection.close()
 
657
        
 
658
        if returncode >= 0:
 
659
            self.last_checker_status = returncode
 
660
            self.last_checker_signal = None
644
661
            if self.last_checker_status == 0:
645
662
                logger.info("Checker for %(name)s succeeded",
646
663
                            vars(self))
649
666
                logger.info("Checker for %(name)s failed", vars(self))
650
667
        else:
651
668
            self.last_checker_status = -1
 
669
            self.last_checker_signal = -returncode
652
670
            logger.warning("Checker for %(name)s crashed?",
653
671
                           vars(self))
 
672
        return False
654
673
    
655
674
    def checked_ok(self):
656
675
        """Assert that the client has been seen, alive and well."""
657
676
        self.last_checked_ok = datetime.datetime.utcnow()
658
677
        self.last_checker_status = 0
 
678
        self.last_checker_signal = None
659
679
        self.bump_timeout()
660
680
    
661
681
    def bump_timeout(self, timeout=None):
687
707
        # than 'timeout' for the client to be disabled, which is as it
688
708
        # should be.
689
709
        
690
 
        # If a checker exists, make sure it is not a zombie
691
 
        try:
692
 
            pid, status = os.waitpid(self.checker.pid, os.WNOHANG)
693
 
        except AttributeError:
694
 
            pass
695
 
        except OSError as error:
696
 
            if error.errno != errno.ECHILD:
697
 
                raise
698
 
        else:
699
 
            if pid:
700
 
                logger.warning("Checker was a zombie")
701
 
                gobject.source_remove(self.checker_callback_tag)
702
 
                self.checker_callback(pid, status,
703
 
                                      self.current_checker_command)
 
710
        if self.checker is not None and not self.checker.is_alive():
 
711
            logger.warning("Checker was not alive; joining")
 
712
            self.checker.join()
 
713
            self.checker = None
704
714
        # Start a new checker if needed
705
715
        if self.checker is None:
706
716
            # Escape attributes for the shell
715
725
                             exc_info=error)
716
726
                return True     # Try again later
717
727
            self.current_checker_command = command
718
 
            try:
719
 
                logger.info("Starting checker %r for %s", command,
720
 
                            self.name)
721
 
                # We don't need to redirect stdout and stderr, since
722
 
                # in normal mode, that is already done by daemon(),
723
 
                # and in debug mode we don't want to.  (Stdin is
724
 
                # always replaced by /dev/null.)
725
 
                # The exception is when not debugging but nevertheless
726
 
                # running in the foreground; use the previously
727
 
                # created wnull.
728
 
                popen_args = {}
729
 
                if (not self.server_settings["debug"]
730
 
                    and self.server_settings["foreground"]):
731
 
                    popen_args.update({"stdout": wnull,
732
 
                                       "stderr": wnull })
733
 
                self.checker = subprocess.Popen(command,
734
 
                                                close_fds=True,
735
 
                                                shell=True,
736
 
                                                cwd="/",
737
 
                                                **popen_args)
738
 
            except OSError as error:
739
 
                logger.error("Failed to start subprocess",
740
 
                             exc_info=error)
741
 
                return True
742
 
            self.checker_callback_tag = gobject.child_watch_add(
743
 
                self.checker.pid, self.checker_callback, data=command)
744
 
            # The checker may have completed before the gobject
745
 
            # watch was added.  Check for this.
746
 
            try:
747
 
                pid, status = os.waitpid(self.checker.pid, os.WNOHANG)
748
 
            except OSError as error:
749
 
                if error.errno == errno.ECHILD:
750
 
                    # This should never happen
751
 
                    logger.error("Child process vanished",
752
 
                                 exc_info=error)
753
 
                    return True
754
 
                raise
755
 
            if pid:
756
 
                gobject.source_remove(self.checker_callback_tag)
757
 
                self.checker_callback(pid, status, command)
 
728
            logger.info("Starting checker %r for %s", command,
 
729
                        self.name)
 
730
            # We don't need to redirect stdout and stderr, since
 
731
            # in normal mode, that is already done by daemon(),
 
732
            # and in debug mode we don't want to.  (Stdin is
 
733
            # always replaced by /dev/null.)
 
734
            # The exception is when not debugging but nevertheless
 
735
            # running in the foreground; use the previously
 
736
            # created wnull.
 
737
            popen_args = { "close_fds": True,
 
738
                           "shell": True,
 
739
                           "cwd": "/" }
 
740
            if (not self.server_settings["debug"]
 
741
                and self.server_settings["foreground"]):
 
742
                popen_args.update({"stdout": wnull,
 
743
                                   "stderr": wnull })
 
744
            pipe = multiprocessing.Pipe(duplex = False)
 
745
            self.checker = multiprocessing.Process(
 
746
                target = call_pipe,
 
747
                args = (pipe[1], subprocess.call, command),
 
748
                kwargs = popen_args)
 
749
            self.checker.start()
 
750
            self.checker_callback_tag = gobject.io_add_watch(
 
751
                pipe[0].fileno(), gobject.IO_IN,
 
752
                self.checker_callback, pipe[0], command)
758
753
        # Re-run this periodically if run by gobject.timeout_add
759
754
        return True
760
755
    
766
761
        if getattr(self, "checker", None) is None:
767
762
            return
768
763
        logger.debug("Stopping checker for %(name)s", vars(self))
769
 
        try:
770
 
            self.checker.terminate()
771
 
            #time.sleep(0.5)
772
 
            #if self.checker.poll() is None:
773
 
            #    self.checker.kill()
774
 
        except OSError as error:
775
 
            if error.errno != errno.ESRCH: # No such process
776
 
                raise
 
764
        self.checker.terminate()
777
765
        self.checker = None
778
766
 
779
767
 
1111
1099
                interface_names.add(alt_interface)
1112
1100
                # Is this a D-Bus signal?
1113
1101
                if getattr(attribute, "_dbus_is_signal", False):
1114
 
                    if sys.version == 2:
 
1102
                    if sys.version_info.major == 2:
1115
1103
                        # Extract the original non-method undecorated
1116
1104
                        # function by black magic
1117
1105
                        nonmethod_func = (dict(
1123
1111
                    # Create a new, but exactly alike, function
1124
1112
                    # object, and decorate it to be a new D-Bus signal
1125
1113
                    # with the alternate D-Bus interface name
1126
 
                    if sys.version == 2:
 
1114
                    if sys.version_info.major == 2:
1127
1115
                        new_function = types.FunctionType(
1128
1116
                            nonmethod_func.func_code,
1129
1117
                            nonmethod_func.func_globals,
1368
1356
            DBusObjectWithProperties.__del__(self, *args, **kwargs)
1369
1357
        Client.__del__(self, *args, **kwargs)
1370
1358
    
1371
 
    def checker_callback(self, pid, condition, command,
1372
 
                         *args, **kwargs):
1373
 
        self.checker_callback_tag = None
1374
 
        self.checker = None
1375
 
        if os.WIFEXITED(condition):
1376
 
            exitstatus = os.WEXITSTATUS(condition)
 
1359
    def checker_callback(self, source, condition,
 
1360
                         connection, command, *args, **kwargs):
 
1361
        ret = Client.checker_callback(self, source, condition,
 
1362
                                      connection, command, *args,
 
1363
                                      **kwargs)
 
1364
        exitstatus = self.last_checker_status
 
1365
        if exitstatus >= 0:
1377
1366
            # Emit D-Bus signal
1378
1367
            self.CheckerCompleted(dbus.Int16(exitstatus),
1379
 
                                  dbus.Int64(condition),
 
1368
                                  dbus.Int64(0),
1380
1369
                                  dbus.String(command))
1381
1370
        else:
1382
1371
            # Emit D-Bus signal
1383
1372
            self.CheckerCompleted(dbus.Int16(-1),
1384
 
                                  dbus.Int64(condition),
 
1373
                                  dbus.Int64(
 
1374
                                      self.last_checker_signal),
1385
1375
                                  dbus.String(command))
1386
 
        
1387
 
        return Client.checker_callback(self, pid, condition, command,
1388
 
                                       *args, **kwargs)
 
1376
        return ret
1389
1377
    
1390
1378
    def start_checker(self, *args, **kwargs):
1391
1379
        old_checker_pid = getattr(self.checker, "pid", None)
2660
2648
                    pass
2661
2649
            
2662
2650
            # Clients who has passed its expire date can still be
2663
 
            # enabled if its last checker was successful.  Clients
 
2651
            # enabled if its last checker was successful.  A Client
2664
2652
            # whose checker succeeded before we stored its state is
2665
2653
            # assumed to have successfully run all checkers during
2666
2654
            # downtime.