=== modified file 'TODO' --- TODO 2009-03-31 01:32:12 +0000 +++ TODO 2009-04-03 03:36:08 +0000 @@ -24,9 +24,6 @@ [[info:standards:Option%20Table][Table of Long Options]] ** TODO Date+time on console log messages :BUGS: Is this the default? -** DBusClient inheriting from Client -** fingerprint as a member of TCP_handler -** peer_certificate as a member of TCP_handler ** TCP_handler needs a better name! ** move handle_ipc out of IPv6_TCPServer ** DBusServiceObjectUsingSuper === modified file 'mandos' --- mandos 2009-04-01 03:37:45 +0000 +++ mandos 2009-04-03 03:36:08 +0000 @@ -673,67 +673,6 @@ del _interface -def peer_certificate(session): - "Return the peer's OpenPGP certificate as a bytestring" - # If not an OpenPGP certificate... - if (gnutls.library.functions - .gnutls_certificate_type_get(session._c_object) - != gnutls.library.constants.GNUTLS_CRT_OPENPGP): - # ...do the normal thing - return session.peer_certificate - list_size = ctypes.c_uint(1) - cert_list = (gnutls.library.functions - .gnutls_certificate_get_peers - (session._c_object, ctypes.byref(list_size))) - if not bool(cert_list) and list_size.value != 0: - raise gnutls.errors.GNUTLSError("error getting peer" - " certificate") - if list_size.value == 0: - return None - cert = cert_list[0] - return ctypes.string_at(cert.data, cert.size) - - -def fingerprint(openpgp): - "Convert an OpenPGP bytestring to a hexdigit fingerprint string" - # New GnuTLS "datum" with the OpenPGP public key - datum = (gnutls.library.types - .gnutls_datum_t(ctypes.cast(ctypes.c_char_p(openpgp), - ctypes.POINTER - (ctypes.c_ubyte)), - ctypes.c_uint(len(openpgp)))) - # New empty GnuTLS certificate - crt = gnutls.library.types.gnutls_openpgp_crt_t() - (gnutls.library.functions - .gnutls_openpgp_crt_init(ctypes.byref(crt))) - # Import the OpenPGP public key into the certificate - (gnutls.library.functions - .gnutls_openpgp_crt_import(crt, ctypes.byref(datum), - gnutls.library.constants - .GNUTLS_OPENPGP_FMT_RAW)) - # Verify the self signature in the key - crtverify = ctypes.c_uint() - (gnutls.library.functions - .gnutls_openpgp_crt_verify_self(crt, 0, ctypes.byref(crtverify))) - if crtverify.value != 0: - gnutls.library.functions.gnutls_openpgp_crt_deinit(crt) - raise gnutls.errors.CertificateSecurityError("Verify failed") - # New buffer for the fingerprint - buf = ctypes.create_string_buffer(20) - buf_len = ctypes.c_size_t() - # Get the fingerprint from the certificate into the buffer - (gnutls.library.functions - .gnutls_openpgp_crt_get_fingerprint(crt, ctypes.byref(buf), - ctypes.byref(buf_len))) - # Deinit the certificate - gnutls.library.functions.gnutls_openpgp_crt_deinit(crt) - # Convert the buffer to a Python bytestring - fpr = ctypes.string_at(buf, buf_len.value) - # Convert the bytestring to hexadecimal notation - hex_fpr = u''.join(u"%02X" % ord(char) for char in fpr) - return hex_fpr - - class TCP_handler(SocketServer.BaseRequestHandler, object): """A TCP request handler class. Instantiated by IPv6_TCPServer for each request to handle it. @@ -783,7 +722,7 @@ return logger.debug(u"Handshake succeeded") try: - fpr = fingerprint(peer_certificate(session)) + fpr = self.fingerprint(self.peer_certificate(session)) except (TypeError, gnutls.errors.GNUTLSError), error: logger.warning(u"Bad certificate: %s", error) session.bye() @@ -795,8 +734,6 @@ client = c break else: - logger.warning(u"Client not found for fingerprint: %s", - fpr) ipc.write("NOTFOUND %s\n" % fpr) session.bye() return @@ -804,8 +741,6 @@ # possible that the client timed out while establishing # the GnuTLS session. if not client.still_valid(): - logger.warning(u"Client %(name)s is invalid", - vars(client)) ipc.write("INVALID %s\n" % client.name) session.bye() return @@ -818,6 +753,69 @@ - (sent_size + sent)) sent_size += sent session.bye() + + @staticmethod + def peer_certificate(session): + "Return the peer's OpenPGP certificate as a bytestring" + # If not an OpenPGP certificate... + if (gnutls.library.functions + .gnutls_certificate_type_get(session._c_object) + != gnutls.library.constants.GNUTLS_CRT_OPENPGP): + # ...do the normal thing + return session.peer_certificate + list_size = ctypes.c_uint(1) + cert_list = (gnutls.library.functions + .gnutls_certificate_get_peers + (session._c_object, ctypes.byref(list_size))) + if not bool(cert_list) and list_size.value != 0: + raise gnutls.errors.GNUTLSError("error getting peer" + " certificate") + if list_size.value == 0: + return None + cert = cert_list[0] + return ctypes.string_at(cert.data, cert.size) + + @staticmethod + def fingerprint(openpgp): + "Convert an OpenPGP bytestring to a hexdigit fingerprint" + # New GnuTLS "datum" with the OpenPGP public key + datum = (gnutls.library.types + .gnutls_datum_t(ctypes.cast(ctypes.c_char_p(openpgp), + ctypes.POINTER + (ctypes.c_ubyte)), + ctypes.c_uint(len(openpgp)))) + # New empty GnuTLS certificate + crt = gnutls.library.types.gnutls_openpgp_crt_t() + (gnutls.library.functions + .gnutls_openpgp_crt_init(ctypes.byref(crt))) + # Import the OpenPGP public key into the certificate + (gnutls.library.functions + .gnutls_openpgp_crt_import(crt, ctypes.byref(datum), + gnutls.library.constants + .GNUTLS_OPENPGP_FMT_RAW)) + # Verify the self signature in the key + crtverify = ctypes.c_uint() + (gnutls.library.functions + .gnutls_openpgp_crt_verify_self(crt, 0, + ctypes.byref(crtverify))) + if crtverify.value != 0: + gnutls.library.functions.gnutls_openpgp_crt_deinit(crt) + raise (gnutls.errors.CertificateSecurityError + ("Verify failed")) + # New buffer for the fingerprint + buf = ctypes.create_string_buffer(20) + buf_len = ctypes.c_size_t() + # Get the fingerprint from the certificate into the buffer + (gnutls.library.functions + .gnutls_openpgp_crt_get_fingerprint(crt, ctypes.byref(buf), + ctypes.byref(buf_len))) + # Deinit the certificate + gnutls.library.functions.gnutls_openpgp_crt_deinit(crt) + # Convert the buffer to a Python bytestring + fpr = ctypes.string_at(buf, buf_len.value) + # Convert the bytestring to hexadecimal notation + hex_fpr = u''.join(u"%02X" % ord(char) for char in fpr) + return hex_fpr class ForkingMixInWithPipe(SocketServer.ForkingMixIn, object): @@ -939,29 +937,39 @@ # Stop calling this function return False - logger.debug("IPC command: %r\n" % cmdline) + logger.debug("IPC command: %r", cmdline) # Parse and act on command - cmd, args = cmdline.split(None, 1) + cmd, args = cmdline.rstrip("\r\n").split(None, 1) + if cmd == "NOTFOUND": + logger.warning(u"Client not found for fingerprint: %s", + args) if self.settings["use_dbus"]: # Emit D-Bus signal mandos_dbus_service.ClientNotFound(args) elif cmd == "INVALID": - if self.settings["use_dbus"]: - for client in self.clients: - if client.name == args: + for client in self.clients: + if client.name == args: + logger.warning(u"Client %s is invalid", args) + if self.settings["use_dbus"]: # Emit D-Bus signal client.Rejected() - break + break + else: + logger.error(u"Unknown client %s is invalid", args) elif cmd == "SENDING": for client in self.clients: if client.name == args: + logger.info(u"Sending secret to %s", client.name) client.checked_ok() if self.settings["use_dbus"]: # Emit D-Bus signal client.ReceivedSecret() break + else: + logger.error(u"Sending secret to unknown client %s", + args) else: logger.error("Unknown IPC command: %r", cmdline)