[omega] Don't check the fingerprint of the SSL certificats with a fixed list, but use OpenSSL to check if the certificat is valid.

This commit is contained in:
Yann Leboulanger 2007-06-03 14:15:50 +00:00
parent 583c3e3181
commit ecd995fda3
3 changed files with 2531 additions and 111 deletions

2474
data/other/cacerts.pem Normal file

File diff suppressed because it is too large Load Diff

View File

@ -48,6 +48,40 @@ log = logging.getLogger('gajim.c.connection')
import gtkgui_helpers
ssl_error = {
2: "Unable to get issuer certificate",
3: "Unable to get certificate CRL",
4: "Unable to decrypt certificate's signature",
5: "Unable to decrypt CRL's signature",
6: "Unable to decode issuer public key",
7: "Certificate signature failure",
8: "CRL signature failure",
9: "Certificate is not yet valid",
10: "Certificate has expired",
11: "CRL is not yet valid",
12: "CRL has expired",
13: "Format error in certificate's notBefore field",
14: "Format error in certificate's notAfter field",
15: "Format error in CRL's lastUpdate field",
16: "Format error in CRL's nextUpdate field",
17: "Out of memory",
18: "Self signed certificate in certificate chain",
19: "Unable to get local issuer certificate",
20: "Unable to verify the first certificate",
21: "Unable to verify the first certificate",
22: "Certificate chain too long",
23: "Certificate revoked",
24: "Invalid CA certificate",
25: "Path length constraint exceeded",
26: "Unsupported certificate purpose",
27: "Certificate not trusted",
28: "Certificate rejected",
29: "Subject issuer mismatch",
30: "Authority and subject key identifier mismatch",
31: "Authority and issuer serial number mismatch",
32: "Key usage does not include certificate signing",
50: "Application verification failure"
}
class Connection(ConnectionHandlers):
'''Connection class'''
def __init__(self, name):
@ -440,72 +474,20 @@ class Connection(ConnectionHandlers):
name = gajim.config.get_per('accounts', self.name, 'name')
hostname = gajim.config.get_per('accounts', self.name, 'hostname')
self.connection = con
fpr_good = self._check_fingerprint(con, con_type)
if fpr_good == False:
self.disconnect(on_purpose = True)
self.dispatch('STATUS', 'offline')
self.dispatch('CONNECTION_LOST',
(_('Security error connecting to "%s"') % self._hostname,
_("The server's key has changed, or someone is trying to hack your connection.")))
if self.on_connect_auth:
self.on_connect_auth(None)
self.on_connect_auth = None
return
if fpr_good == None:
log.warning(_("Unable to check fingerprint for %s. Connection could be insecure."), hostname)
if fpr_good == True:
log.info("Fingerprint found and matched for %s.", hostname)
try:
errnum = con.Connection.ssl_errnum
except AttributeError:
errnum = -1 # we don't have an errnum
if errnum > 0:
# FIXME: tell the user that the certificat is untrusted, and ask him what to do
try:
log.warning("The authenticity of the "+hostname+" certificate could be unvalid.\nSSL Error: "+ssl_error[errnum])
except KeyError:
log.warning("Unknown SSL error: %d" % errnum)
con.auth(name, self.password, self.server_resource, 1, self.__on_auth)
return True
def _check_fingerprint(self, con, con_type):
fpr_good = None # None: Unable to check fpr, False: mismatch, True: match
# FIXME: not tidy
if not common.xmpp.transports_nb.USE_PYOPENSSL: return None
# FIXME: find a more permanent place for loading servers.xml
servers_xml = os.path.join(gajim.DATA_DIR, 'other', 'servers.xml')
servers = gtkgui_helpers.parse_server_xml(servers_xml)
servers = dict(map(lambda e: (e[0], e), servers))
hostname = gajim.config.get_per('accounts', self.name, 'hostname')
try:
log.debug("con: %s", con)
log.debug("con.Connection: %s", con.Connection)
log.debug("con.Connection.serverDigestSHA1: %s", con.Connection.serverDigestSHA1)
log.debug("con.Connection.serverDigestMD5: %s", con.Connection.serverDigestMD5)
sha1 = gtkgui_helpers.HashDigest('sha1', con.Connection.serverDigestSHA1)
md5 = gtkgui_helpers.HashDigest('md5', con.Connection.serverDigestMD5)
log.debug("sha1: %s", repr(sha1))
log.debug("md5: %s", repr(md5))
sv = servers.get(hostname)
if sv:
for got in (sha1, md5):
expected = sv[2]['digest'].get(got.algo)
if expected:
fpr_good = (got == expected)
break
except AttributeError:
if con_type in ('ssl', 'tls'):
log.error(_("Missing fingerprint in SSL connection to %s") + ':', hostname, exc_info=True)
# fpr_good = False # FIXME: enable this when sequence is sorted
else:
log.debug("Connection to %s doesn't seem to have a fingerprint:", hostname, exc_info=True)
if fpr_good == False:
log.error(_("Fingerprint mismatch for %s: Got %s, expected %s"), hostname, got, expected)
return fpr_good
def _register_handlers(self, con, con_type):
self.peerhost = con.get_peerhost()
# notify the gui about con_type

View File

@ -33,6 +33,8 @@ import thread
import logging
log = logging.getLogger('gajim.c.x.transports_nb')
from common import gajim
USE_PYOPENSSL = False
try:
@ -735,7 +737,12 @@ class NonBlockingTLS(PlugIn):
# FIXME: should method be configurable?
tcpsock._sslContext = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
#tcpsock._sslContext = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
tcpsock._sslContext.set_info_callback(self._ssl_info_callback)
tcpsock.ssl_errnum = 0
tcpsock._sslContext.set_verify(OpenSSL.SSL.VERIFY_PEER, self._ssl_verify_callback)
try:
tcpsock._sslContext.load_verify_locations(os.path.join(gajim.DATA_DIR, 'other', 'cacerts.pem'))
except:
log.warning(_("Unable to load SSL certificats from file %s" % os.path.abspath(os.path.join(gajim.DATA_DIR,'other','ca.crt'))))
tcpsock._sslObj = OpenSSL.SSL.Connection(tcpsock._sslContext, tcpsock._sock)
tcpsock._sslObj.set_connect_state() # set to client mode
@ -759,29 +766,6 @@ class NonBlockingTLS(PlugIn):
# fake it, for now
self.starttls='success'
def _on_ssl_handshake_done(self):
log.debug("Handshake done!")
#self.starttls='success'
tcpsock = self._owner.Connection
cert = tcpsock._sslObj.get_peer_certificate()
peer = cert.get_subject()
issuer = cert.get_issuer()
tcpsock._sslIssuer = unicode(issuer)
tcpsock._sslServer = unicode(peer)
tcpsock.serverDigestSHA1 = cert.digest('sha1')
tcpsock.serverDigestMD5 = cert.digest('md5')
if log.getEffectiveLevel() <= logging.DEBUG:
peercert = tcpsock._sslObj.get_peer_certificate()
ciphers = tcpsock._sslObj.get_cipher_list()
print >> sys.stderr, "Ciphers:", ciphers
print >> sys.stderr, "Peer cert:", peercert
self._dumpX509(peercert)
print >> sys.stderr, OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, peercert)
def _startSSL_stdlib(self):
log.debug("_startSSL_stdlib called")
tcpsock=self._owner.Connection
@ -795,37 +779,17 @@ class NonBlockingTLS(PlugIn):
tcpsock._send = wrapper.send
self.starttls='success'
def _ssl_info_callback(self, sslconn, type, st):
def _ssl_verify_callback(self, sslconn, cert, errnum, depth, ok):
# Exceptions can't propagate up through this callback, so print them here.
try:
self._ssl_info_callback_guarded(sslconn, type, st)
if errnum == 0:
return True
self._owner.Connection.ssl_errnum = errnum
return True
except:
log.error("Exception caught in _ssl_info_callback:", exc_info=True)
traceback.print_exc() # Make sure something is printed, even if log is disabled.
def _ssl_info_callback_guarded(self, sslconn, type, st):
b = self.ssl_h_bits
#if type & b['SSL_CB_LOOP']:
# if type & SSL_ST_CONNECT: tls_state = "connect"
# elif type & SSL_ST_ACCEPT: tls_state = "accept"
# else: tls_state = "undefined"
# print "tls_state: %s: %s" % (tls_state, sslconn.state_string())
#if type & b['SSL_CB_ALERT']:
# if type & SSL_CB_READ: rdwr = "read"
# elif type & SSL_CB_WRITE: rdwr = "write"
# else: rdwr = "unknown"
# print "tls_alert: %s:%d: %s" % (rdwr, st, sslconn.state_string())
#mask = ""
#for k, v in b.iteritems():
# if type & v: mask += " " + k
#print "mask:", mask, st
if type & b['SSL_CB_HANDSHAKE_DONE']:
self._on_ssl_handshake_done()
def StartTLSHandler(self, conn, starttls):
''' Handle server reply if TLS is allowed to process. Behaves accordingly.
Used internally.'''