=== modified file 'server.py' --- server.py 2008-06-21 00:53:32 +0000 +++ server.py 2008-06-30 01:43:39 +0000 @@ -19,9 +19,50 @@ from sets import Set import subprocess +import dbus +import gobject +import avahi +from dbus.mainloop.glib import DBusGMainLoop + +# This variable is used to optionally bind to a specified +# interface. +serviceInterface = avahi.IF_UNSPEC +# It is a global variable to fit in with the rest of the +# variables from the Avahi server example code: +serviceName = "Mandos" +serviceType = "_mandos._tcp" # http://www.dns-sd.org/ServiceTypes.html +servicePort = None # Not known at startup +serviceTXT = [] # TXT record for the service +domain = "" # Domain to publish on, default to .local +host = "" # Host to publish records for, default to localhost +group = None #our entry group +rename_count = 12 # Counter so we only rename after collisions a + # sensible number of times +# End of Avahi example code + + class Client(object): - def __init__(self, name=None, options=None, dn=None, - password=None, passfile=None, fqdn=None, + """A representation of a client host served by this server. + Attributes: + password: string + fqdn: string, FQDN (used by the checker) + created: datetime.datetime() + last_seen: datetime.datetime() or None if not yet seen + timeout: datetime.timedelta(); How long from last_seen until + this client is invalid + interval: datetime.timedelta(); How often to start a new checker + timeout_milliseconds: Used by gobject.timeout_add() + interval_milliseconds: - '' - + stop_hook: If set, called by stop() as stop_hook(self) + checker: subprocess.Popen(); a running checker process used + to see if the client lives. + Is None if no process is running. + checker_initiator_tag: a gobject event source tag, or None + stop_initiator_tag: - '' - + checker_callback_tag: - '' - + """ + def __init__(self, name=None, options=None, stop_hook=None, + dn=None, password=None, passfile=None, fqdn=None, timeout=None, interval=-1): self.name = name self.dn = dn @@ -30,169 +71,231 @@ elif passfile: self.password = open(passfile).readall() else: - print "No Password or Passfile in client config file" - # raise RuntimeError XXX - self.password = "gazonk" + raise RuntimeError(u"No Password or Passfile for client %s" + % self.name) self.fqdn = fqdn # string self.created = datetime.datetime.now() - self.last_seen = None # datetime.datetime() + self.last_seen = None if timeout is None: timeout = options.timeout - self.timeout = timeout # datetime.timedelta() + self.timeout = timeout + self.timeout_milliseconds = ((self.timeout.days + * 24 * 60 * 60 * 1000) + + (self.timeout.seconds * 1000) + + (self.timeout.microseconds + // 1000)) if interval == -1: interval = options.interval else: interval = string_to_delta(interval) - self.interval = interval # datetime.timedelta() - self.next_check = datetime.datetime.now() # datetime.datetime() - # Note: next_check may be in the past if checker is not None - self.checker = None # or a subprocess.Popen() - def check_action(self): - """The checker said something and might have completed. - Check if is has, and take appropriate actions.""" - if self.checker.poll() is None: - # False alarm, no result yet - #self.checker.read() - #print "Checker for %(name)s said nothing?" % vars(self) - return + self.interval = interval + self.interval_milliseconds = ((self.interval.days + * 24 * 60 * 60 * 1000) + + (self.interval.seconds * 1000) + + (self.interval.microseconds + // 1000)) + self.stop_hook = stop_hook + self.checker = None + self.checker_initiator_tag = None + self.stop_initiator_tag = None + self.checker_callback_tag = None + def start(self): + """Start this clients checker and timeout hooks""" + # Schedule a new checker to be started an 'interval' from now, + # and every interval from then on. + self.checker_initiator_tag = gobject.\ + timeout_add(self.interval_milliseconds, + self.start_checker) + # Also start a new checker *right now*. + self.start_checker() + # Schedule a stop() when 'timeout' has passed + self.stop_initiator_tag = gobject.\ + timeout_add(self.timeout_milliseconds, + self.stop) + def stop(self): + """Stop this client. + The possibility that this client might be restarted is left + open, but not currently used.""" + # print "Stopping client", self.name + self.password = None + if self.stop_initiator_tag: + gobject.source_remove(self.stop_initiator_tag) + self.stop_initiator_tag = None + if self.checker_initiator_tag: + gobject.source_remove(self.checker_initiator_tag) + self.checker_initiator_tag = None + self.stop_checker() + if self.stop_hook: + self.stop_hook(self) + # Do not run this again if called by a gobject.timeout_add + return False + def __del__(self): + # Some code duplication here and in stop() + if hasattr(self, "stop_initiator_tag") \ + and self.stop_initiator_tag: + gobject.source_remove(self.stop_initiator_tag) + self.stop_initiator_tag = None + if hasattr(self, "checker_initiator_tag") \ + and self.checker_initiator_tag: + gobject.source_remove(self.checker_initiator_tag) + self.checker_initiator_tag = None + self.stop_checker() + def checker_callback(self, pid, condition): + """The checker has completed, so take appropriate actions.""" now = datetime.datetime.now() - if self.checker.returncode == 0: - print "Checker for %(name)s succeeded" % vars(self) + if os.WIFEXITED(condition) \ + and (os.WEXITSTATUS(condition) == 0): + #print "Checker for %(name)s succeeded" % vars(self) self.last_seen = now - else: - print "Checker for %(name)s failed" % vars(self) - while self.next_check <= now: - self.next_check += self.interval + gobject.source_remove(self.stop_initiator_tag) + self.stop_initiator_tag = gobject.\ + timeout_add(self.timeout_milliseconds, + self.stop) + #else: + # if not os.WIFEXITED(condition): + # print "Checker for %(name)s crashed?" % vars(self) + # else: + # print "Checker for %(name)s failed" % vars(self) self.checker = None - handle_request = check_action + self.checker_callback_tag = None def start_checker(self): - self.stop_checker() - try: - self.checker = subprocess.Popen("sleep 10; fping -q -- %s" - % re.escape(self.fqdn), - stdout=subprocess.PIPE, - close_fds=True, - shell=True, cwd="/") - except subprocess.OSError, e: - print "Failed to start subprocess:", e + """Start a new checker subprocess if one is not running. + If a checker already exists, leave it running and do + nothing.""" + if self.checker is None: + #print "Starting checker for", self.name + try: + self.checker = subprocess.\ + Popen("sleep 1; fping -q -- %s" + % re.escape(self.fqdn), + stdout=subprocess.PIPE, + close_fds=True, shell=True, + cwd="/") + self.checker_callback_tag = gobject.\ + child_watch_add(self.checker.pid, + self.\ + checker_callback) + except subprocess.OSError, error: + sys.stderr.write(u"Failed to start subprocess: %s\n" + % error) + # Re-run this periodically if run by gobject.timeout_add + return True def stop_checker(self): - if self.checker is None: + """Force the checker process, if any, to stop.""" + if not hasattr(self, "checker") or self.checker is None: return + gobject.source_remove(self.checker_callback_tag) + self.checker_callback_tag = None os.kill(self.checker.pid, signal.SIGTERM) if self.checker.poll() is None: os.kill(self.checker.pid, signal.SIGKILL) self.checker = None - __del__ = stop_checker - def fileno(self): - if self.checker is None: - return None - return self.checker.stdout.fileno() - def next_stop(self): - """The time when something must be done about this client - May be in the past.""" - if self.last_seen is None: - # This client has never been seen - next_timeout = self.created + self.timeout - else: - next_timeout = self.last_seen + self.timeout - if self.checker is None: - return min(next_timeout, self.next_check) - else: - return next_timeout def still_valid(self, now=None): - """Has this client's timeout not passed?""" + """Has the timeout not yet passed for this client?""" if now is None: now = datetime.datetime.now() if self.last_seen is None: return now < (self.created + self.timeout) else: return now < (self.last_seen + self.timeout) - def it_is_time_to_check(self, now=None): - if now is None: - now = datetime.datetime.now() - return self.next_check <= now - - -class server_metaclass(type): - "Common behavior for the UDP and TCP server classes" - def __new__(cls, name, bases, attrs): - attrs["address_family"] = socket.AF_INET6 - attrs["allow_reuse_address"] = True - def server_bind(self): - if self.options.interface: - if not hasattr(socket, "SO_BINDTODEVICE"): - # From /usr/include/asm-i486/socket.h - socket.SO_BINDTODEVICE = 25 - try: - self.socket.setsockopt(socket.SOL_SOCKET, - socket.SO_BINDTODEVICE, - self.options.interface) - except socket.error, error: - if error[0] == errno.EPERM: - print "Warning: No permission to bind to interface", \ - self.options.interface - else: - raise error - return super(type(self), self).server_bind() - attrs["server_bind"] = server_bind - def init(self, *args, **kwargs): - if "options" in kwargs: - self.options = kwargs["options"] - del kwargs["options"] - if "clients" in kwargs: - self.clients = kwargs["clients"] - del kwargs["clients"] - if "credentials" in kwargs: - self.credentials = kwargs["credentials"] - del kwargs["credentials"] - return super(type(self), self).__init__(*args, **kwargs) - attrs["__init__"] = init - return type.__new__(cls, name, bases, attrs) - - -class udp_handler(SocketServer.DatagramRequestHandler, object): - def handle(self): - self.wfile.write("Polo") - print "UDP request answered" - - -class IPv6_UDPServer(SocketServer.UDPServer, object): - __metaclass__ = server_metaclass - def verify_request(self, request, client_address): - print "UDP request came" - return request[0] == "Marco" class tcp_handler(SocketServer.BaseRequestHandler, object): + """A TCP request handler class. + Instantiated by IPv6_TCPServer for each request to handle it. + Note: This will run in its own forked process.""" def handle(self): - print "TCP request came" - print "Request:", self.request - print "Client Address:", self.client_address - print "Server:", self.server + #print u"TCP request came" + #print u"Request:", self.request + #print u"Client Address:", self.client_address + #print u"Server:", self.server session = gnutls.connection.ServerSession(self.request, - self.server.credentials) - session.handshake() - if session.peer_certificate: - print "DN:", session.peer_certificate.subject + self.server\ + .credentials) + try: + session.handshake() + except gnutls.errors.GNUTLSError, error: + #sys.stderr.write(u"Handshake failed: %s\n" % error) + # Do not run session.bye() here: the session is not + # established. Just abandon the request. + return + #if session.peer_certificate: + # print "DN:", session.peer_certificate.subject try: session.verify_peer() except gnutls.errors.CertificateError, error: - print "Verify failed", error + #sys.stderr.write(u"Verify failed: %s\n" % error) session.bye() return - try: - session.send([client.password - for client in self.server.clients - if (client.dn == - session.peer_certificate.subject)][0]) - except IndexError: - session.send("gazonk") - # Log maybe? XXX + client = None + for c in clients: + if c.dn == session.peer_certificate.subject: + client = c + break + # Have to check if client.still_valid(), since it is possible + # that the client timed out while establishing the GnuTLS + # session. + if client and client.still_valid(): + session.send(client.password) + else: + #if client: + # sys.stderr.write(u"Client %(name)s is invalid\n" + # % vars(client)) + #else: + # sys.stderr.write(u"Client not found for DN: %s\n" + # % session.peer_certificate.subject) + #session.send("gazonk") + pass session.bye() class IPv6_TCPServer(SocketServer.ForkingTCPServer, object): - __metaclass__ = server_metaclass + """IPv6 TCP server. Accepts 'None' as address and/or port. + Attributes: + options: Command line options + clients: Set() of Client objects + credentials: GnuTLS X.509 credentials + """ + address_family = socket.AF_INET6 + def __init__(self, *args, **kwargs): + if "options" in kwargs: + self.options = kwargs["options"] + del kwargs["options"] + if "clients" in kwargs: + self.clients = kwargs["clients"] + del kwargs["clients"] + if "credentials" in kwargs: + self.credentials = kwargs["credentials"] + del kwargs["credentials"] + return super(type(self), self).__init__(*args, **kwargs) + def server_bind(self): + """This overrides the normal server_bind() function + to bind to an interface if one was specified, and also NOT to + bind to an address or port if they were not specified.""" + if self.options.interface: + if not hasattr(socket, "SO_BINDTODEVICE"): + # From /usr/include/asm-i486/socket.h + socket.SO_BINDTODEVICE = 25 + try: + self.socket.setsockopt(socket.SOL_SOCKET, + socket.SO_BINDTODEVICE, + self.options.interface) + except socket.error, error: + if error[0] == errno.EPERM: + sys.stderr.write(u"Warning: No permission to bind to interface %s\n" + % self.options.interface) + else: + raise error + # Only bind(2) the socket if we really need to. + if self.server_address[0] or self.server_address[1]: + if not self.server_address[0]: + in6addr_any = "::" + self.server_address = (in6addr_any, + self.server_address[1]) + elif self.server_address[1] is None: + self.server_address = (self.server_address[0], + 0) + return super(type(self), self).server_bind() def string_to_delta(interval): @@ -229,29 +332,118 @@ return delta -def main(): +def add_service(): + """From the Avahi server example code""" + global group, serviceName, serviceType, servicePort, serviceTXT, \ + domain, host + if group is None: + group = dbus.Interface( + bus.get_object( avahi.DBUS_NAME, + server.EntryGroupNew()), + avahi.DBUS_INTERFACE_ENTRY_GROUP) + group.connect_to_signal('StateChanged', + entry_group_state_changed) + + # print "Adding service '%s' of type '%s' ..." % (serviceName, + # serviceType) + + group.AddService( + serviceInterface, # interface + avahi.PROTO_INET6, # protocol + dbus.UInt32(0), # flags + serviceName, serviceType, + domain, host, + dbus.UInt16(servicePort), + avahi.string_array_to_txt_array(serviceTXT)) + group.Commit() + + +def remove_service(): + """From the Avahi server example code""" + global group + + if not group is None: + group.Reset() + + +def server_state_changed(state): + """From the Avahi server example code""" + if state == avahi.SERVER_COLLISION: + print "WARNING: Server name collision" + remove_service() + elif state == avahi.SERVER_RUNNING: + add_service() + + +def entry_group_state_changed(state, error): + """From the Avahi server example code""" + global serviceName, server, rename_count + + # print "state change: %i" % state + + if state == avahi.ENTRY_GROUP_ESTABLISHED: + pass + # print "Service established." + elif state == avahi.ENTRY_GROUP_COLLISION: + + rename_count = rename_count - 1 + if rename_count > 0: + name = server.GetAlternativeServiceName(name) + print "WARNING: Service name collision, changing name to '%s' ..." % name + remove_service() + add_service() + + else: + print "ERROR: No suitable service name found after %i retries, exiting." % n_rename + main_loop.quit() + elif state == avahi.ENTRY_GROUP_FAILURE: + print "Error in group state changed", error + main_loop.quit() + return + + +def if_nametoindex(interface): + """Call the C function if_nametoindex()""" + try: + if "ctypes" not in sys.modules: + import ctypes + libc = ctypes.cdll.LoadLibrary("libc.so.6") + return libc.if_nametoindex(interface) + except (ImportError, OSError, AttributeError): + if "struct" not in sys.modules: + import struct + if "fcntl" not in sys.modules: + import fcntl + SIOCGIFINDEX = 0x8933 # From /usr/include/linux/sockios.h + s = socket.socket() + ifreq = fcntl.ioctl(s, SIOCGIFINDEX, + struct.pack("16s16x", interface)) + s.close() + interface_index = struct.unpack("I", ifreq[16:20])[0] + return interface_index + + +if __name__ == '__main__': parser = OptionParser() parser.add_option("-i", "--interface", type="string", - default="eth0", metavar="IF", - help="Interface to bind to") + default=None, metavar="IF", + help="Bind to interface IF") parser.add_option("--cert", type="string", default="cert.pem", metavar="FILE", - help="Public key certificate to use") + help="Public key certificate PEM file to use") parser.add_option("--key", type="string", default="key.pem", metavar="FILE", - help="Private key to use") + help="Private key PEM file to use") parser.add_option("--ca", type="string", default="ca.pem", metavar="FILE", - help="Certificate Authority certificate to use") + help="Certificate Authority certificate PEM file to use") parser.add_option("--crl", type="string", default="crl.pem", metavar="FILE", - help="Certificate Revokation List to use") - parser.add_option("-p", "--port", type="int", default=49001, + help="Certificate Revokation List PEM file to use") + parser.add_option("-p", "--port", type="int", default=None, help="Port number to receive requests on") - parser.add_option("--dh", type="int", metavar="BITS", - help="DH group to use") - parser.add_option("-t", "--timeout", type="string", # Parsed later - default="15m", + parser.add_option("--timeout", type="string", # Parsed later + default="1h", help="Amount of downtime allowed for clients") parser.add_option("--interval", type="string", # Parsed later default="5m", @@ -259,7 +451,7 @@ parser.add_option("--check", action="store_true", default=False, help="Run self-test") (options, args) = parser.parse_args() - + if options.check: import doctest doctest.testmod() @@ -270,7 +462,6 @@ options.timeout = string_to_delta(options.timeout) except ValueError: parser.error("option --timeout: Unparseable time") - try: options.interval = string_to_delta(options.interval) except ValueError: @@ -284,72 +475,67 @@ # Parse config file defaults = {} - client_config_object = ConfigParser.SafeConfigParser(defaults) - client_config_object.read("mandos-clients.conf") - clients = Set(Client(name=section, options=options, - **(dict(client_config_object\ - .items(section)))) - for section in client_config_object.sections()) - - in6addr_any = "::" - udp_server = IPv6_UDPServer((in6addr_any, options.port), - udp_handler, - options=options) - - tcp_server = IPv6_TCPServer((in6addr_any, options.port), + client_config = ConfigParser.SafeConfigParser(defaults) + #client_config.readfp(open("secrets.conf"), "secrets.conf") + client_config.read("mandos-clients.conf") + + # From the Avahi server example code + DBusGMainLoop(set_as_default=True ) + main_loop = gobject.MainLoop() + bus = dbus.SystemBus() + server = dbus.Interface( + bus.get_object( avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER ), + avahi.DBUS_INTERFACE_SERVER ) + # End of Avahi example code + + clients = Set() + def remove_from_clients(client): + clients.remove(client) + if not clients: + print "No clients left, exiting" + main_loop.quit() + + clients.update(Set(Client(name=section, options=options, + stop_hook = remove_from_clients, + **(dict(client_config\ + .items(section)))) + for section in client_config.sections())) + for client in clients: + client.start() + + tcp_server = IPv6_TCPServer((None, options.port), tcp_handler, options=options, clients=clients, credentials=cred) - - while True: - if not clients: - break - try: - next_stop = min(client.next_stop() for client in clients) - now = datetime.datetime.now() - if next_stop > now: - delay = next_stop - now - delay_seconds = (delay.days * 24 * 60 * 60 - + delay.seconds - + delay.microseconds / 1000000) - clients_with_checkers = tuple(client for client in - clients - if client.checker - is not None) - input_checks = (udp_server, tcp_server) \ - + clients_with_checkers - print "Waiting for network", - if clients_with_checkers: - print "and checkers for:", - for client in clients_with_checkers: - print client.name, - print - input, out, err = select.select(input_checks, (), (), - delay_seconds) - for obj in input: - obj.handle_request() - # start new checkers - for client in clients: - if client.it_is_time_to_check(now=now) and \ - client.checker is None: - print "Starting checker for client %(name)s" \ - % vars(client) - client.start_checker() - # delete timed-out clients - for client in clients.copy(): - if not client.still_valid(now=now): - # log xxx - print "Removing client %(name)s" % vars(client) - clients.remove(client) - except KeyboardInterrupt: - break + # Find out what random port we got + servicePort = tcp_server.socket.getsockname()[1] + #sys.stderr.write("Now listening on port %d\n" % servicePort) + + if options.interface is not None: + serviceInterface = if_nametoindex(options.interface) + + # From the Avahi server example code + server.connect_to_signal("StateChanged", server_state_changed) + server_state_changed(server.GetState()) + # End of Avahi example code + + gobject.io_add_watch(tcp_server.fileno(), gobject.IO_IN, + lambda *args, **kwargs: + tcp_server.handle_request(*args[2:], + **kwargs) or True) + try: + main_loop.run() + except KeyboardInterrupt: + print # Cleanup here + + # From the Avahi server example code + if not group is None: + group.Free() + # End of Avahi example code + for client in clients: - client.stop_checker() - - -if __name__ == "__main__": - main() - + client.stop_hook = None + client.stop()