Refactor SSL errors

Dont save fingerprints to config and check certs with our own methods.
We should trust openssl to do all necessary checks.

Self signed certs can be added to cacert.pem and will not show up as
an error until the cert changes.

nbxmpp now saves all ssl errors and passes them, so now we loop over
all errors until all are confirmed or ignored by the user

Also cacerts are now saved in utf-8
This commit is contained in:
Philipp Hörist 2018-05-01 14:03:20 +02:00
parent 779a4d4ce3
commit c534d3a147
5 changed files with 67 additions and 168 deletions

View File

@ -347,8 +347,6 @@ class Config:
'action_when_plaintext_connection': [ opt_str, 'warn', _('Show a warning dialog before sending password on an plaintext connection. Can be \'warn\', \'connect\', \'disconnect\'') ],
'warn_when_insecure_ssl_connection': [ opt_bool, True, _('Show a warning dialog before using standard SSL library.') ],
'warn_when_insecure_password': [ opt_bool, True, _('Show a warning dialog before sending PLAIN password over a plain connection.') ],
'ssl_fingerprint_sha1': [ opt_str, '', '', True ],
'ssl_fingerprint_sha256': [ opt_str, '', '', True ],
'ignore_ssl_errors': [ opt_str, '', _('Space separated list of ssl errors to ignore.') ],
'use_srv': [ opt_bool, True, '', True ],
'use_custom_host': [ opt_bool, False, '', True ],

View File

@ -1355,82 +1355,64 @@ class Connection(CommonConnection, ConnectionHandlers):
log.debug('Connected to server %s:%s with %s' % (
self._current_host['host'], self._current_host['port'], con_type))
if app.config.get_per('accounts', self.name, 'anonymous_auth'):
name = None
else:
name = app.config.get_per('accounts', self.name, 'name')
hostname = app.config.get_per('accounts', self.name, 'hostname')
self.connection = con
try:
errnum = con.Connection.ssl_errnum
except AttributeError:
errnum = 0
cert = con.Connection.ssl_certificate
if errnum > 0 and str(errnum) not in app.config.get_per('accounts',
self.name, 'ignore_ssl_errors').split():
text = _('The authenticity of the %s certificate could be invalid'
) % hostname
if errnum in ssl_error:
text += _('\nSSL Error: <b>%s</b>') % ssl_error[errnum]
else:
text += _('\nUnknown SSL error: %d') % errnum
fingerprint_sha1 = cert.digest('sha1').decode('utf-8')
fingerprint_sha256 = cert.digest('sha256').decode('utf-8')
pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
cert).decode('utf-8')
app.nec.push_incoming_event(SSLErrorEvent(None, conn=self,
error_text=text, error_num=errnum, cert=pem,
fingerprint_sha1=fingerprint_sha1,
fingerprint_sha256=fingerprint_sha256, certificate=cert))
return True
if cert:
fingerprint_sha1 = cert.digest('sha1').decode('utf-8')
fingerprint_sha256 = cert.digest('sha256').decode('utf-8')
saved_fingerprint_sha1 = app.config.get_per('accounts', self.name,
'ssl_fingerprint_sha1')
if saved_fingerprint_sha1:
# Check sha1 fingerprint
if fingerprint_sha1 != saved_fingerprint_sha1:
if not check_X509.check_certificate(cert, hostname):
app.nec.push_incoming_event(FingerprintErrorEvent(
None, conn=self, certificate=cert,
new_fingerprint_sha1=fingerprint_sha1,
new_fingerprint_sha256=fingerprint_sha256))
return True
app.config.set_per('accounts', self.name, 'ssl_fingerprint_sha1',
fingerprint_sha1)
saved_fingerprint_sha256 = app.config.get_per('accounts', self.name,
'ssl_fingerprint_sha256')
if saved_fingerprint_sha256:
# Check sha256 fingerprint
if fingerprint_sha256 != saved_fingerprint_sha256:
if not check_X509.check_certificate(cert, hostname):
app.nec.push_incoming_event(FingerprintErrorEvent(
None, conn=self, certificate=cert,
new_fingerprint_sha1=fingerprint_sha1,
new_fingerprint_sha256=fingerprint_sha256))
return True
app.config.set_per('accounts', self.name,
'ssl_fingerprint_sha256', fingerprint_sha256)
ssl_errors = con.Connection.ssl_errors
ignored_ssl_errors = self._get_ignored_ssl_errors()
self._ssl_errors = set(ssl_errors) - set(ignored_ssl_errors)
self.process_ssl_errors()
if not check_X509.check_certificate(cert, hostname) and \
'100' not in app.config.get_per('accounts', self.name,
'ignore_ssl_errors').split():
pem = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert).decode('utf-8')
txt = _('The authenticity of the %s certificate could be '
'invalid.\nThe certificate does not cover this domain.') %\
hostname
app.nec.push_incoming_event(SSLErrorEvent(None, conn=self,
error_text=txt, error_num=100, cert=pem,
fingerprint_sha1=fingerprint_sha1,
fingerprint_sha256=fingerprint_sha256, certificate=cert))
return True
def _get_ignored_ssl_errors(self):
ignore_ssl_errors = app.config.get_per(
'accounts', self.name, 'ignore_ssl_errors').split()
return [int(err) for err in ignore_ssl_errors]
self._register_handlers(con, con_type)
auth_mechs = app.config.get_per('accounts', self.name, 'authentication_mechanisms')
auth_mechs = auth_mechs.split()
def process_ssl_errors(self):
if not self._ssl_errors:
self.ssl_certificate_accepted()
return
cert = self.connection.Connection.ssl_certificate
errnum = self._ssl_errors.pop()
hostname = app.config.get_per('accounts', self.name, 'hostname')
text = _('The authenticity of the %s '
'certificate could be invalid') % hostname
if errnum in ssl_error:
text += _('\nSSL Error: <b>%s</b>') % ssl_error[errnum]
else:
text += _('\nUnknown SSL error: %d') % errnum
fingerprint_sha1 = cert.digest('sha1').decode('utf-8')
fingerprint_sha256 = cert.digest('sha256').decode('utf-8')
pem = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert).decode('utf-8')
app.nec.push_incoming_event(
SSLErrorEvent(None, conn=self,
error_text=text,
error_num=errnum,
cert=pem,
fingerprint_sha1=fingerprint_sha1,
fingerprint_sha256=fingerprint_sha256,
certificate=cert))
def ssl_certificate_accepted(self):
if not self.connection:
self.disconnect(on_purpose=True)
app.nec.push_incoming_event(
ConnectionLostEvent(
None, conn=self,
title=_('Could not connect to account %s') % self.name,
msg=_('Connection with account %s has been lost. '
'Retry connecting.') % self.name))
return
name = None
if not app.config.get_per('accounts', self.name, 'anonymous_auth'):
name = app.config.get_per('accounts', self.name, 'name')
self._register_handlers(self.connection, self._current_type)
auth_mechs = app.config.get_per(
'accounts', self.name, 'authentication_mechanisms').split()
for mech in auth_mechs:
if mech not in nbxmpp.auth_nb.SASL_AUTHENTICATION_MECHANISMS | set(['XEP-0078']):
log.warning("Unknown authentication mechanisms %s" % mech)
@ -1438,24 +1420,12 @@ class Connection(CommonConnection, ConnectionHandlers):
auth_mechs = None
else:
auth_mechs = set(auth_mechs)
con.auth(user=name, password=self.password,
resource=self.server_resource, sasl=True, on_auth=self.__on_auth, auth_mechs=auth_mechs)
def ssl_certificate_accepted(self):
if not self.connection:
self.disconnect(on_purpose=True)
app.nec.push_incoming_event(ConnectionLostEvent(None, conn=self,
title=_('Could not connect to account %s') % self.name,
msg=_('Connection with account %s has been lost. Retry '
'connecting.') % self.name))
return
if app.config.get_per('accounts', self.name, 'anonymous_auth'):
name = None
else:
name = app.config.get_per('accounts', self.name, 'name')
self._register_handlers(self.connection, 'ssl')
self.connection.auth(name, self.password, self.server_resource, 1,
self.__on_auth)
self.connection.auth(user=name,
password=self.password,
resource=self.server_resource,
sasl=True,
on_auth=self.__on_auth,
auth_mechs=auth_mechs)
def _register_handlers(self, con, con_type):
self.peerhost = con.get_peerhost()

View File

@ -2188,10 +2188,6 @@ class SSLErrorEvent(nec.NetworkIncomingEvent):
name = 'ssl-error'
base_network_events = []
class FingerprintErrorEvent(nec.NetworkIncomingEvent):
name = 'fingerprint-error'
base_network_events = []
class UniqueRoomIdSupportedEvent(nec.NetworkIncomingEvent):
name = 'unique-room-id-supported'
base_network_events = []

View File

@ -5404,25 +5404,6 @@ SHA-256 Fingerprint: %(sha256)s
self.set_transient_for(parent)
self.set_title(_('Certificate for account %s') % account)
class CheckFingerprintDialog(YesNoDialog):
def __init__(self, pritext='', sectext='', checktext='',
on_response_yes=None, on_response_no=None, account=None, certificate=None):
self.account = account
self.cert = certificate
YesNoDialog.__init__(self, pritext, sectext=sectext,
checktext=checktext, on_response_yes=on_response_yes,
on_response_no=on_response_no)
self.set_title(_('SSL Certificate Verification for %s') % account)
b = Gtk.Button(label=_('View cert…'))
b.connect('clicked', self.on_cert_clicked)
b.show_all()
area = self.get_action_area()
area.pack_start(b, True, True, 0)
def on_cert_clicked(self, button):
CertificatDialog(self, self.account, self.cert)
class SSLErrorDialog(ConfirmationDialogDoubleCheck):
def __init__(self, account, certificate, pritext, sectext, checktext1,
checktext2, on_response_ok=None, on_response_cancel=None):

View File

@ -1354,29 +1354,24 @@ class Interface:
certs = ''
my_ca_certs = configpaths.get('MY_CACERTS')
if os.path.isfile(my_ca_certs):
f = open(my_ca_certs)
certs = f.read()
f.close()
with open(my_ca_certs, encoding='utf-8') as f:
certs = f.read()
if obj.cert in certs:
dialogs.ErrorDialog(_('Certificate Already in File'),
_('This certificate is already in file %s, so it\'s '
'not added again.') % my_ca_certs)
else:
f = open(my_ca_certs, 'a')
f.write(server + '\n')
f.write(obj.cert + '\n\n')
f.close()
app.config.set_per('accounts', account, 'ssl_fingerprint_sha1',
obj.fingerprint_sha1)
app.config.set_per('accounts', account, 'ssl_fingerprint_sha256',
obj.fingerprint_sha256)
with open(my_ca_certs, 'a', encoding='utf-8') as f:
f.write(server + '\n')
f.write(obj.cert + '\n\n')
if is_checked[1]:
ignore_ssl_errors = app.config.get_per('accounts', account,
'ignore_ssl_errors').split()
ignore_ssl_errors.append(str(obj.error_num))
app.config.set_per('accounts', account, 'ignore_ssl_errors',
' '.join(ignore_ssl_errors))
obj.conn.ssl_certificate_accepted()
obj.conn.process_ssl_errors()
def on_cancel():
del self.instances[account]['online_dialog']['ssl_error']
@ -1412,46 +1407,6 @@ class Interface:
'does not support anonymous connection' % server,
transient_for=self.roster.window)
def handle_event_fingerprint_error(self, obj):
# ('FINGERPRINT_ERROR', account, (new_fingerprint_sha1,new_fingerprint_sha256,))
account = obj.conn.name
def on_yes(is_checked):
del self.instances[account]['online_dialog']['fingerprint_error']
app.config.set_per('accounts', account, 'ssl_fingerprint_sha1',
obj.new_fingerprint_sha1)
app.config.set_per('accounts', account, 'ssl_fingerprint_sha256',
obj.new_fingerprint_sha256)
# Reset the ignored ssl errors
app.config.set_per('accounts', account, 'ignore_ssl_errors', '')
obj.conn.ssl_certificate_accepted()
def on_no():
del self.instances[account]['online_dialog']['fingerprint_error']
obj.conn.disconnect(on_purpose=True)
app.nec.push_incoming_event(OurShowEvent(None, conn=obj.conn,
show='offline'))
pritext = _('SSL certificate error')
sectext = _('It seems the SSL certificate of account %(account)s has '
'changed and is not valid or your connection is being compromised.\n\n'
'Old SHA-1 fingerprint: '
'%(old_sha1)s\nOld SHA-256 fingerprint: %(old_sha256)s\n\n'
'New SHA-1 fingerprint: %(new_sha1)s\nNew SHA-256 fingerprint: '
'%(new_sha256)s\n\nDo you still want to connect '
'and update the fingerprint of the certificate?') % \
{'account': account,
'old_sha1': app.config.get_per('accounts', account, 'ssl_fingerprint_sha1'),
'old_sha256': app.config.get_per('accounts', account, 'ssl_fingerprint_sha256'),
'new_sha1': obj.new_fingerprint_sha1,
'new_sha256': obj.new_fingerprint_sha256}
if 'fingerprint_error' in self.instances[account]['online_dialog']:
self.instances[account]['online_dialog']['fingerprint_error'].\
destroy()
self.instances[account]['online_dialog']['fingerprint_error'] = \
dialogs.CheckFingerprintDialog(pritext, sectext, on_response_yes=on_yes,
on_response_no=on_no, account=obj.conn.name,
certificate=obj.certificate)
def handle_event_plain_connection(self, obj):
# ('PLAIN_CONNECTION', account, (connection))
def on_ok(is_checked):
@ -1578,7 +1533,6 @@ class Interface:
'failed-decrypt': [(self.handle_event_failed_decrypt, ged.GUI2)],
'file-request-error': [self.handle_event_file_request_error],
'file-request-received': [self.handle_event_file_request],
'fingerprint-error': [self.handle_event_fingerprint_error],
'gc-invitation-received': [self.handle_event_gc_invitation],
'gc-decline-received': [self.handle_event_gc_decline],
'gc-presence-received': [self.handle_event_gc_presence],