676
def peer_certificate(session):
677
"Return the peer's OpenPGP certificate as a bytestring"
678
# If not an OpenPGP certificate...
679
if (gnutls.library.functions
680
.gnutls_certificate_type_get(session._c_object)
681
!= gnutls.library.constants.GNUTLS_CRT_OPENPGP):
682
# ...do the normal thing
683
return session.peer_certificate
684
list_size = ctypes.c_uint(1)
685
cert_list = (gnutls.library.functions
686
.gnutls_certificate_get_peers
687
(session._c_object, ctypes.byref(list_size)))
688
if not bool(cert_list) and list_size.value != 0:
689
raise gnutls.errors.GNUTLSError("error getting peer"
691
if list_size.value == 0:
694
return ctypes.string_at(cert.data, cert.size)
697
def fingerprint(openpgp):
698
"Convert an OpenPGP bytestring to a hexdigit fingerprint string"
699
# New GnuTLS "datum" with the OpenPGP public key
700
datum = (gnutls.library.types
701
.gnutls_datum_t(ctypes.cast(ctypes.c_char_p(openpgp),
704
ctypes.c_uint(len(openpgp))))
705
# New empty GnuTLS certificate
706
crt = gnutls.library.types.gnutls_openpgp_crt_t()
707
(gnutls.library.functions
708
.gnutls_openpgp_crt_init(ctypes.byref(crt)))
709
# Import the OpenPGP public key into the certificate
710
(gnutls.library.functions
711
.gnutls_openpgp_crt_import(crt, ctypes.byref(datum),
712
gnutls.library.constants
713
.GNUTLS_OPENPGP_FMT_RAW))
714
# Verify the self signature in the key
715
crtverify = ctypes.c_uint()
716
(gnutls.library.functions
717
.gnutls_openpgp_crt_verify_self(crt, 0, ctypes.byref(crtverify)))
718
if crtverify.value != 0:
719
gnutls.library.functions.gnutls_openpgp_crt_deinit(crt)
720
raise gnutls.errors.CertificateSecurityError("Verify failed")
721
# New buffer for the fingerprint
722
buf = ctypes.create_string_buffer(20)
723
buf_len = ctypes.c_size_t()
724
# Get the fingerprint from the certificate into the buffer
725
(gnutls.library.functions
726
.gnutls_openpgp_crt_get_fingerprint(crt, ctypes.byref(buf),
727
ctypes.byref(buf_len)))
728
# Deinit the certificate
729
gnutls.library.functions.gnutls_openpgp_crt_deinit(crt)
730
# Convert the buffer to a Python bytestring
731
fpr = ctypes.string_at(buf, buf_len.value)
732
# Convert the bytestring to hexadecimal notation
733
hex_fpr = u''.join(u"%02X" % ord(char) for char in fpr)
737
676
class TCP_handler(SocketServer.BaseRequestHandler, object):
738
677
"""A TCP request handler class.
739
678
Instantiated by IPv6_TCPServer for each request to handle it.
818
753
- (sent_size + sent))
819
754
sent_size += sent
758
def peer_certificate(session):
759
"Return the peer's OpenPGP certificate as a bytestring"
760
# If not an OpenPGP certificate...
761
if (gnutls.library.functions
762
.gnutls_certificate_type_get(session._c_object)
763
!= gnutls.library.constants.GNUTLS_CRT_OPENPGP):
764
# ...do the normal thing
765
return session.peer_certificate
766
list_size = ctypes.c_uint(1)
767
cert_list = (gnutls.library.functions
768
.gnutls_certificate_get_peers
769
(session._c_object, ctypes.byref(list_size)))
770
if not bool(cert_list) and list_size.value != 0:
771
raise gnutls.errors.GNUTLSError("error getting peer"
773
if list_size.value == 0:
776
return ctypes.string_at(cert.data, cert.size)
779
def fingerprint(openpgp):
780
"Convert an OpenPGP bytestring to a hexdigit fingerprint"
781
# New GnuTLS "datum" with the OpenPGP public key
782
datum = (gnutls.library.types
783
.gnutls_datum_t(ctypes.cast(ctypes.c_char_p(openpgp),
786
ctypes.c_uint(len(openpgp))))
787
# New empty GnuTLS certificate
788
crt = gnutls.library.types.gnutls_openpgp_crt_t()
789
(gnutls.library.functions
790
.gnutls_openpgp_crt_init(ctypes.byref(crt)))
791
# Import the OpenPGP public key into the certificate
792
(gnutls.library.functions
793
.gnutls_openpgp_crt_import(crt, ctypes.byref(datum),
794
gnutls.library.constants
795
.GNUTLS_OPENPGP_FMT_RAW))
796
# Verify the self signature in the key
797
crtverify = ctypes.c_uint()
798
(gnutls.library.functions
799
.gnutls_openpgp_crt_verify_self(crt, 0,
800
ctypes.byref(crtverify)))
801
if crtverify.value != 0:
802
gnutls.library.functions.gnutls_openpgp_crt_deinit(crt)
803
raise (gnutls.errors.CertificateSecurityError
805
# New buffer for the fingerprint
806
buf = ctypes.create_string_buffer(20)
807
buf_len = ctypes.c_size_t()
808
# Get the fingerprint from the certificate into the buffer
809
(gnutls.library.functions
810
.gnutls_openpgp_crt_get_fingerprint(crt, ctypes.byref(buf),
811
ctypes.byref(buf_len)))
812
# Deinit the certificate
813
gnutls.library.functions.gnutls_openpgp_crt_deinit(crt)
814
# Convert the buffer to a Python bytestring
815
fpr = ctypes.string_at(buf, buf_len.value)
816
# Convert the bytestring to hexadecimal notation
817
hex_fpr = u''.join(u"%02X" % ord(char) for char in fpr)
823
821
class ForkingMixInWithPipe(SocketServer.ForkingMixIn, object):
939
937
# Stop calling this function
942
logger.debug("IPC command: %r\n" % cmdline)
940
logger.debug("IPC command: %r", cmdline)
944
942
# Parse and act on command
945
cmd, args = cmdline.split(None, 1)
943
cmd, args = cmdline.rstrip("\r\n").split(None, 1)
946
945
if cmd == "NOTFOUND":
946
logger.warning(u"Client not found for fingerprint: %s",
947
948
if self.settings["use_dbus"]:
948
949
# Emit D-Bus signal
949
950
mandos_dbus_service.ClientNotFound(args)
950
951
elif cmd == "INVALID":
951
if self.settings["use_dbus"]:
952
for client in self.clients:
953
if client.name == args:
952
for client in self.clients:
953
if client.name == args:
954
logger.warning(u"Client %s is invalid", args)
955
if self.settings["use_dbus"]:
954
956
# Emit D-Bus signal
955
957
client.Rejected()
960
logger.error(u"Unknown client %s is invalid", args)
957
961
elif cmd == "SENDING":
958
962
for client in self.clients:
959
963
if client.name == args:
964
logger.info(u"Sending secret to %s", client.name)
960
965
client.checked_ok()
961
966
if self.settings["use_dbus"]:
962
967
# Emit D-Bus signal
963
968
client.ReceivedSecret()
971
logger.error(u"Sending secret to unknown client %s",
966
974
logger.error("Unknown IPC command: %r", cmdline)