/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos

  • Committer: Teddy Hogeborn
  • Date: 2015-05-23 20:18:34 UTC
  • mto: This revision was merged to the branch mainline in revision 756.
  • Revision ID: teddy@recompile.se-20150523201834-e89ex4ito93yni8x
mandos: Use multiprocessing module to run checkers.

For a long time, the Mandos server has occasionally logged the message
"ERROR: Child process vanished".  This was never a fatal error, but it
has been annoying and slightly worrying, since a definite cause was
not found.  One potential cause could be the "multiprocessing" and
"subprocess" modules conflicting w.r.t. SIGCHLD.  To avoid this,
change the running of checkers from using subprocess.Popen
asynchronously to instead first create a multiprocessing.Process()
(which is asynchronous) calling a function, and have that function
then call subprocess.call() (which is synchronous).  In this way, the
only thing using any asynchronous subprocesses is the multiprocessing
module.

This makes it necessary to change one small thing in the D-Bus API,
since the subprocesses.call() function does not expose the raw wait(2)
status value.

DBUS-API (CheckerCompleted): Change the second value provided by this
                             D-Bus signal from the raw wait(2) status
                             to the actual terminating signal number.
mandos (subprocess_call_pipe): New function to be called by
                               multiprocessing.Process (starting a
                               separate process).
(Client.last_checker signal): New attribute for signal which
                              terminated last checker.  Like
                              last_checker_status, only not accessible
                              via D-Bus.
(Client.checker_callback): Take new "connection" argument and use it
                           to get returncode; set last_checker_signal.
                           Return False so gobject does not call this
                           callback again.
(Client.start_checker): Start checker using a multiprocessing.Process
                        instead of a subprocess.Popen.
(ClientDBus.checker_callback): Take new "connection" argument.        Call
                               Client.checker_callback early to have
                               it set last_checker_status and
                               last_checker_signal; use those.  Change
                               second value provided to D-Bus signal
                               CheckerCompleted to use
                               last_checker_signal if checker was
                               terminated by signal.
mandos-monitor: Update to reflect DBus API change.
(MandosClientWidget.checker_completed): Take "signal" instead of
                                        "condition" argument.  Use it
                                        accordingly.  Remove dead code
                                        (os.WCOREDUMP case).

Show diffs side-by-side

added added

removed removed

Lines of Context:
78
78
import tempfile
79
79
import itertools
80
80
import collections
81
 
import codecs
82
81
 
83
82
import dbus
84
83
import dbus.service
424
423
            .format(self.name)))
425
424
        return ret
426
425
 
427
 
def call_pipe(connection,       # : multiprocessing.Connection
428
 
              func, *args, **kwargs):
 
426
def subprocess_call_pipe(connection, # : multiprocessing.Connection
 
427
                         *args, **kwargs):
429
428
    """This function is meant to be called by multiprocessing.Process
430
429
    
431
 
    This function runs func(*args, **kwargs), and writes the resulting
432
 
    return value on the provided multiprocessing.Connection.
 
430
    This function runs a synchronous subprocess.call(), and writes the
 
431
    resulting return code on the provided multiprocessing.Connection.
433
432
    """
434
 
    connection.send(func(*args, **kwargs))
 
433
    connection.send(subprocess.call(*args, **kwargs))
435
434
    connection.close()
436
435
 
437
436
class Client(object):
646
645
        # Also start a new checker *right now*.
647
646
        self.start_checker()
648
647
    
649
 
    def checker_callback(self, source, condition, connection,
650
 
                         command):
 
648
    def checker_callback(self, source, condition,
 
649
                         (connection, command)):
651
650
        """The checker has completed, so take appropriate actions."""
652
651
        self.checker_callback_tag = None
653
652
        self.checker = None
654
 
        # Read return code from connection (see call_pipe)
 
653
        # Read return code from connection (see subprocess_call_pipe)
655
654
        returncode = connection.recv()
656
655
        connection.close()
657
656
        
741
740
                and self.server_settings["foreground"]):
742
741
                popen_args.update({"stdout": wnull,
743
742
                                   "stderr": wnull })
744
 
            pipe = multiprocessing.Pipe(duplex = False)
 
743
            pipe = multiprocessing.Pipe(duplex=False)
745
744
            self.checker = multiprocessing.Process(
746
 
                target = call_pipe,
747
 
                args = (pipe[1], subprocess.call, command),
748
 
                kwargs = popen_args)
 
745
                target=subprocess_call_pipe, args=(pipe[1], command),
 
746
                kwargs=popen_args)
749
747
            self.checker.start()
750
748
            self.checker_callback_tag = gobject.io_add_watch(
751
749
                pipe[0].fileno(), gobject.IO_IN,
752
 
                self.checker_callback, pipe[0], command)
 
750
                self.checker_callback, (pipe[0], command))
753
751
        # Re-run this periodically if run by gobject.timeout_add
754
752
        return True
755
753
    
1099
1097
                interface_names.add(alt_interface)
1100
1098
                # Is this a D-Bus signal?
1101
1099
                if getattr(attribute, "_dbus_is_signal", False):
1102
 
                    if sys.version_info.major == 2:
1103
 
                        # Extract the original non-method undecorated
1104
 
                        # function by black magic
1105
 
                        nonmethod_func = (dict(
1106
 
                            zip(attribute.func_code.co_freevars,
1107
 
                                attribute.__closure__))
1108
 
                                          ["func"].cell_contents)
1109
 
                    else:
1110
 
                        nonmethod_func = attribute
 
1100
                    # Extract the original non-method undecorated
 
1101
                    # function by black magic
 
1102
                    nonmethod_func = (dict(
 
1103
                        zip(attribute.func_code.co_freevars,
 
1104
                            attribute.__closure__))
 
1105
                                      ["func"].cell_contents)
1111
1106
                    # Create a new, but exactly alike, function
1112
1107
                    # object, and decorate it to be a new D-Bus signal
1113
1108
                    # with the alternate D-Bus interface name
1114
 
                    if sys.version_info.major == 2:
1115
 
                        new_function = types.FunctionType(
1116
 
                            nonmethod_func.func_code,
1117
 
                            nonmethod_func.func_globals,
1118
 
                            nonmethod_func.func_name,
1119
 
                            nonmethod_func.func_defaults,
1120
 
                            nonmethod_func.func_closure)
1121
 
                    else:
1122
 
                        new_function = types.FunctionType(
1123
 
                            nonmethod_func.__code__,
1124
 
                            nonmethod_func.__globals__,
1125
 
                            nonmethod_func.__name__,
1126
 
                            nonmethod_func.__defaults__,
1127
 
                            nonmethod_func.__closure__)
1128
1109
                    new_function = (dbus.service.signal(
1129
 
                        alt_interface,
1130
 
                        attribute._dbus_signature)(new_function))
 
1110
                        alt_interface, attribute._dbus_signature)
 
1111
                                    (types.FunctionType(
 
1112
                                        nonmethod_func.func_code,
 
1113
                                        nonmethod_func.func_globals,
 
1114
                                        nonmethod_func.func_name,
 
1115
                                        nonmethod_func.func_defaults,
 
1116
                                        nonmethod_func.func_closure)))
1131
1117
                    # Copy annotations, if any
1132
1118
                    try:
1133
1119
                        new_function._dbus_annotations = dict(
1357
1343
        Client.__del__(self, *args, **kwargs)
1358
1344
    
1359
1345
    def checker_callback(self, source, condition,
1360
 
                         connection, command, *args, **kwargs):
 
1346
                         (connection, command), *args, **kwargs):
1361
1347
        ret = Client.checker_callback(self, source, condition,
1362
 
                                      connection, command, *args,
 
1348
                                      (connection, command), *args,
1363
1349
                                      **kwargs)
1364
1350
        exitstatus = self.last_checker_status
1365
1351
        if exitstatus >= 0:
1672
1658
        self._pipe = child_pipe
1673
1659
        self._pipe.send(('init', fpr, address))
1674
1660
        if not self._pipe.recv():
1675
 
            raise KeyError(fpr)
 
1661
            raise KeyError()
1676
1662
    
1677
1663
    def __getattribute__(self, name):
1678
1664
        if name == '_pipe':
2141
2127
        
2142
2128
        if command == 'getattr':
2143
2129
            attrname = request[1]
2144
 
            if isinstance(client_object.__getattribute__(attrname),
2145
 
                          collections.Callable):
 
2130
            if callable(client_object.__getattribute__(attrname)):
2146
2131
                parent_pipe.send(('function', ))
2147
2132
            else:
2148
2133
                parent_pipe.send((
2183
2168
    # avoid excessive use of external libraries.
2184
2169
    
2185
2170
    # New type for defining tokens, syntax, and semantics all-in-one
 
2171
    Token = collections.namedtuple("Token",
 
2172
                                   ("regexp", # To match token; if
 
2173
                                              # "value" is not None,
 
2174
                                              # must have a "group"
 
2175
                                              # containing digits
 
2176
                                    "value",  # datetime.timedelta or
 
2177
                                              # None
 
2178
                                    "followers")) # Tokens valid after
 
2179
                                                  # this token
2186
2180
    Token = collections.namedtuple("Token", (
2187
2181
        "regexp",  # To match token; if "value" is not None, must have
2188
2182
                   # a "group" containing digits
2223
2217
    # Define starting values
2224
2218
    value = datetime.timedelta() # Value so far
2225
2219
    found_token = None
2226
 
    followers = frozenset((token_duration, )) # Following valid tokens
 
2220
    followers = frozenset((token_duration,)) # Following valid tokens
2227
2221
    s = duration                # String left to parse
2228
2222
    # Loop until end token is found
2229
2223
    while found_token is not token_end:
2246
2240
                break
2247
2241
        else:
2248
2242
            # No currently valid tokens were found
2249
 
            raise ValueError("Invalid RFC 3339 duration: {!r}"
2250
 
                             .format(duration))
 
2243
            raise ValueError("Invalid RFC 3339 duration")
2251
2244
    # End token found
2252
2245
    return value
2253
2246
 
2504
2497
            pidfilename = "/var/run/mandos.pid"
2505
2498
        pidfile = None
2506
2499
        try:
2507
 
            pidfile = codecs.open(pidfilename, "w", encoding="utf-8")
 
2500
            pidfile = open(pidfilename, "w")
2508
2501
        except IOError as e:
2509
2502
            logger.error("Could not open file %r", pidfilename,
2510
2503
                         exc_info=e)
2569
2562
            old_bus_name = dbus.service.BusName(
2570
2563
                "se.bsnet.fukt.Mandos", bus,
2571
2564
                do_not_queue=True)
2572
 
        except dbus.exceptions.DBusException as e:
 
2565
        except dbus.exceptions.NameExistsException as e:
2573
2566
            logger.error("Disabling D-Bus:", exc_info=e)
2574
2567
            use_dbus = False
2575
2568
            server_settings["use_dbus"] = False
2706
2699
    
2707
2700
    if not foreground:
2708
2701
        if pidfile is not None:
2709
 
            pid = os.getpid()
2710
2702
            try:
2711
2703
                with pidfile:
2712
 
                    print(pid, file=pidfile)
 
2704
                    pid = os.getpid()
 
2705
                    pidfile.write("{}\n".format(pid).encode("utf-8"))
2713
2706
            except IOError:
2714
2707
                logger.error("Could not write to file %r with PID %d",
2715
2708
                             pidfilename, pid)