Proposed Fix. Make sure we read all data from our SSL socket wrappers and don't just rely on gobject.io_add_watch, select or poll.
This isn't really a clean solution, but the less intrusives fix that I can think of. Fixes #2634.
This commit is contained in:
parent
be92286bed
commit
8a76efb703
|
@ -287,7 +287,8 @@ class NonBlockingClient:
|
||||||
if self.__dict__.has_key('Dispatcher'):
|
if self.__dict__.has_key('Dispatcher'):
|
||||||
self.Dispatcher.PlugOut()
|
self.Dispatcher.PlugOut()
|
||||||
self.got_features = False
|
self.got_features = False
|
||||||
d = dispatcher_nb.Dispatcher().PlugIn(self)
|
# Plugging will send our stream header
|
||||||
|
dispatcher_nb.Dispatcher().PlugIn(self)
|
||||||
on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES')
|
on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES')
|
||||||
|
|
||||||
elif mode == 'FAILURE':
|
elif mode == 'FAILURE':
|
||||||
|
@ -298,16 +299,14 @@ class NonBlockingClient:
|
||||||
self.Dispatcher.ProcessNonBlocking(data)
|
self.Dispatcher.ProcessNonBlocking(data)
|
||||||
if not hasattr(self, 'Dispatcher') or \
|
if not hasattr(self, 'Dispatcher') or \
|
||||||
self.Dispatcher.Stream._document_attrs is None:
|
self.Dispatcher.Stream._document_attrs is None:
|
||||||
self._xmpp_connect_machine(
|
# only partial data received so far
|
||||||
mode='FAILURE',
|
return
|
||||||
data='Error on stream open')
|
|
||||||
if self.incoming_stream_version() == '1.0':
|
if self.incoming_stream_version() == '1.0':
|
||||||
if not self.got_features:
|
if not self.got_features:
|
||||||
on_next_receive('RECEIVE_STREAM_FEATURES')
|
on_next_receive('RECEIVE_STREAM_FEATURES')
|
||||||
else:
|
else:
|
||||||
log.info('got STREAM FEATURES in first recv')
|
log.info('got STREAM FEATURES in first recv')
|
||||||
self._xmpp_connect_machine(mode='STREAM_STARTED')
|
self._xmpp_connect_machine(mode='STREAM_STARTED')
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.info('incoming stream version less than 1.0')
|
log.info('incoming stream version less than 1.0')
|
||||||
self._xmpp_connect_machine(mode='STREAM_STARTED')
|
self._xmpp_connect_machine(mode='STREAM_STARTED')
|
||||||
|
@ -315,14 +314,11 @@ class NonBlockingClient:
|
||||||
elif mode == 'RECEIVE_STREAM_FEATURES':
|
elif mode == 'RECEIVE_STREAM_FEATURES':
|
||||||
if data:
|
if data:
|
||||||
# sometimes <features> are received together with document
|
# sometimes <features> are received together with document
|
||||||
# attributes and sometimes on next receive...
|
# attributes and sometimes on next receive... sometimes in
|
||||||
|
# several chunks...
|
||||||
self.Dispatcher.ProcessNonBlocking(data)
|
self.Dispatcher.ProcessNonBlocking(data)
|
||||||
if not self.got_features:
|
if self.got_features:
|
||||||
self._xmpp_connect_machine(
|
log.info('got STREAM FEATURES after several recv')
|
||||||
mode='FAILURE',
|
|
||||||
data='Missing <features> in 1.0 stream')
|
|
||||||
else:
|
|
||||||
log.info('got STREAM FEATURES in second recv')
|
|
||||||
self._xmpp_connect_machine(mode='STREAM_STARTED')
|
self._xmpp_connect_machine(mode='STREAM_STARTED')
|
||||||
|
|
||||||
elif mode == 'STREAM_STARTED':
|
elif mode == 'STREAM_STARTED':
|
||||||
|
|
|
@ -19,7 +19,6 @@ idle objects and be informed about possible IO.
|
||||||
|
|
||||||
import select
|
import select
|
||||||
import logging
|
import logging
|
||||||
import gobject
|
|
||||||
log = logging.getLogger('gajim.c.x.idlequeue')
|
log = logging.getLogger('gajim.c.x.idlequeue')
|
||||||
|
|
||||||
FLAG_WRITE = 20 # write only
|
FLAG_WRITE = 20 # write only
|
||||||
|
@ -115,7 +114,7 @@ class IdleQueue:
|
||||||
|
|
||||||
def remove_timeout(self, fd):
|
def remove_timeout(self, fd):
|
||||||
''' Removes the read timeout '''
|
''' Removes the read timeout '''
|
||||||
log.info('read timeout removed for fd %s' % fd)
|
log.debug('read timeout removed for fd %s' % fd)
|
||||||
if fd in self.read_timeouts:
|
if fd in self.read_timeouts:
|
||||||
del(self.read_timeouts[fd])
|
del(self.read_timeouts[fd])
|
||||||
|
|
||||||
|
@ -126,7 +125,7 @@ class IdleQueue:
|
||||||
|
|
||||||
A filedescriptor fd can have only one timeout.
|
A filedescriptor fd can have only one timeout.
|
||||||
'''
|
'''
|
||||||
log.info('read timeout set for fd %s on %s seconds' % (fd, seconds))
|
log.debug('read timeout set for fd %s on %s seconds' % (fd, seconds))
|
||||||
timeout = self.current_time() + seconds
|
timeout = self.current_time() + seconds
|
||||||
self.read_timeouts[fd] = timeout
|
self.read_timeouts[fd] = timeout
|
||||||
|
|
||||||
|
@ -135,14 +134,14 @@ class IdleQueue:
|
||||||
Execute and remove alarm callbacks and execute read_timeout() for plugged
|
Execute and remove alarm callbacks and execute read_timeout() for plugged
|
||||||
objects if specified time has ellapsed.
|
objects if specified time has ellapsed.
|
||||||
'''
|
'''
|
||||||
log.info('check time evs')
|
log.debug('check time evs')
|
||||||
current_time = self.current_time()
|
current_time = self.current_time()
|
||||||
|
|
||||||
for fd, timeout in self.read_timeouts.items():
|
for fd, timeout in self.read_timeouts.items():
|
||||||
if timeout > current_time:
|
if timeout > current_time:
|
||||||
continue
|
continue
|
||||||
if fd in self.queue:
|
if fd in self.queue:
|
||||||
log.debug('Calling read_timeout for fd %s' % fd)
|
log.info('Calling read_timeout for fd %s' % fd)
|
||||||
self.queue[fd].read_timeout()
|
self.queue[fd].read_timeout()
|
||||||
else:
|
else:
|
||||||
self.remove_timeout(fd)
|
self.remove_timeout(fd)
|
||||||
|
@ -167,6 +166,7 @@ class IdleQueue:
|
||||||
'''
|
'''
|
||||||
if obj.fd == -1:
|
if obj.fd == -1:
|
||||||
return
|
return
|
||||||
|
log.info('Plug object fd %s as w:%s r:%s' % (obj.fd, writable, readable))
|
||||||
if obj.fd in self.queue:
|
if obj.fd in self.queue:
|
||||||
self.unplug_idle(obj.fd)
|
self.unplug_idle(obj.fd)
|
||||||
self.queue[obj.fd] = obj
|
self.queue[obj.fd] = obj
|
||||||
|
@ -208,7 +208,6 @@ class IdleQueue:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if flags & PENDING_READ:
|
if flags & PENDING_READ:
|
||||||
#print 'waiting read on %d, flags are %d' % (fd, flags)
|
|
||||||
obj.pollin()
|
obj.pollin()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -241,6 +240,14 @@ class IdleQueue:
|
||||||
waiting_descriptors = []
|
waiting_descriptors = []
|
||||||
if e[0] != 4: # interrupt
|
if e[0] != 4: # interrupt
|
||||||
raise
|
raise
|
||||||
|
# Maybe there is still data in ssl buffer:
|
||||||
|
# Add all sslWrappers where we have pending data. poll doesn't work here
|
||||||
|
# as it can only check sockets but the data may already be read into
|
||||||
|
# a ssl internal buffer
|
||||||
|
descriptors = (fd for fd, flag in waiting_descriptors)
|
||||||
|
waiting_descriptors.extend((fd, FLAG_READ) for (fd, obj) in
|
||||||
|
self.queue.iteritems() if not fd in descriptors and
|
||||||
|
hasattr(obj, '_sslObj') and obj._sslObj.pending())
|
||||||
for fd, flags in waiting_descriptors:
|
for fd, flags in waiting_descriptors:
|
||||||
self._process_events(fd, flags)
|
self._process_events(fd, flags)
|
||||||
self._check_time_events()
|
self._check_time_events()
|
||||||
|
@ -288,8 +295,19 @@ class SelectIdleQueue(IdleQueue):
|
||||||
self._check_time_events()
|
self._check_time_events()
|
||||||
return True
|
return True
|
||||||
try:
|
try:
|
||||||
|
# Maybe there is still data in ssl buffer
|
||||||
waiting_descriptors = select.select(self.read_fds.keys(),
|
waiting_descriptors = select.select(self.read_fds.keys(),
|
||||||
self.write_fds.keys(), self.error_fds.keys(), 0)
|
self.write_fds.keys(), self.error_fds.keys(), 0)
|
||||||
|
|
||||||
|
# Maybe there is still data in ssl buffer:
|
||||||
|
# Add all sslWrappers where we have pending data. select doesn't work
|
||||||
|
# here as it can only check sockets but the data may already be read
|
||||||
|
# into a ssl internal buffer
|
||||||
|
waiting_descriptors[0].extend(fd for (fd, obj)
|
||||||
|
in self.queue.iteritems() if not fd in waiting_descriptors[0] and
|
||||||
|
hasattr(obj, '_sslObj') and obj._sslObj.pending())
|
||||||
|
waiting_descriptors = (waiting_descriptors[0], waiting_descriptors[1],
|
||||||
|
waiting_descriptors[2])
|
||||||
except select.error, e:
|
except select.error, e:
|
||||||
waiting_descriptors = ((),(),())
|
waiting_descriptors = ((),(),())
|
||||||
if e[0] != 4: # interrupt
|
if e[0] != 4: # interrupt
|
||||||
|
@ -310,12 +328,13 @@ class SelectIdleQueue(IdleQueue):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
#import gobject
|
||||||
|
# FIXME: Does not work well with SSL, data may be "forgotten" in SSL buffer
|
||||||
class GlibIdleQueue(IdleQueue):
|
class GlibIdleQueue(IdleQueue):
|
||||||
'''
|
'''
|
||||||
Extends IdleQueue to use glib io_add_wath, instead of select/poll
|
Extends IdleQueue to use glib io_add_wath, instead of select/poll
|
||||||
In another 'non gui' implementation of Gajim IdleQueue can be used safetly.
|
In another 'non gui' implementation of Gajim IdleQueue can be used safetly.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
def _init_idle(self):
|
def _init_idle(self):
|
||||||
'''
|
'''
|
||||||
Creates a dict, which maps file/pipe/sock descriptor to glib event id
|
Creates a dict, which maps file/pipe/sock descriptor to glib event id
|
||||||
|
|
|
@ -73,8 +73,8 @@ DISCONNECT_TIMEOUT_SECONDS = 5
|
||||||
|
|
||||||
#: size of the buffer which reads data from server
|
#: size of the buffer which reads data from server
|
||||||
# if lower, more stanzas will be fragmented and processed twice
|
# if lower, more stanzas will be fragmented and processed twice
|
||||||
RECV_BUFSIZE = 32768 # 2x maximum size of ssl packet, should be plenty
|
# but GUI will freeze less
|
||||||
#RECV_BUFSIZE = 16 # FIXME: (#2634) gajim breaks with this setting: it's inefficient but should work.
|
RECV_BUFSIZE = 8192
|
||||||
|
|
||||||
DATA_RECEIVED='DATA RECEIVED'
|
DATA_RECEIVED='DATA RECEIVED'
|
||||||
DATA_SENT='DATA SENT'
|
DATA_SENT='DATA SENT'
|
||||||
|
@ -201,6 +201,8 @@ class NonBlockingTransport(PlugIn):
|
||||||
self.on_receive = None
|
self.on_receive = None
|
||||||
return
|
return
|
||||||
self.on_receive = recv_handler
|
self.on_receive = recv_handler
|
||||||
|
log.info('on_receive callback set to %s' %
|
||||||
|
self.on_receive.__name__ if self.on_receive else None)
|
||||||
|
|
||||||
def tcp_connecting_started(self):
|
def tcp_connecting_started(self):
|
||||||
self.set_state(CONNECTING)
|
self.set_state(CONNECTING)
|
||||||
|
@ -369,6 +371,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
|
||||||
if self.get_state() == CONNECTING:
|
if self.get_state() == CONNECTING:
|
||||||
log.info('%s socket wrapper connected' % id(self))
|
log.info('%s socket wrapper connected' % id(self))
|
||||||
self.idlequeue.remove_timeout(self.fd)
|
self.idlequeue.remove_timeout(self.fd)
|
||||||
|
# We are only interested in stream errors right now
|
||||||
|
# _xmpp_connect_machine will take care of further plugging
|
||||||
self._plug_idle(writable=False, readable=False)
|
self._plug_idle(writable=False, readable=False)
|
||||||
self.peerhost = self._sock.getsockname()
|
self.peerhost = self._sock.getsockname()
|
||||||
if self.proxy_dict:
|
if self.proxy_dict:
|
||||||
|
@ -439,7 +443,6 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
|
||||||
self._do_send()
|
self._do_send()
|
||||||
else:
|
else:
|
||||||
self.sendqueue.append(r)
|
self.sendqueue.append(r)
|
||||||
|
|
||||||
self._plug_idle(writable=True, readable=True)
|
self._plug_idle(writable=True, readable=True)
|
||||||
|
|
||||||
def encode_stanza(self, stanza):
|
def encode_stanza(self, stanza):
|
||||||
|
|
21
src/gajim.py
21
src/gajim.py
|
@ -2862,11 +2862,7 @@ class Interface:
|
||||||
try:
|
try:
|
||||||
gajim.idlequeue.process()
|
gajim.idlequeue.process()
|
||||||
except Exception:
|
except Exception:
|
||||||
# Otherwise, an exception will stop our loop
|
gobject.timeout_add(200, self.process_connections)
|
||||||
if gajim.idlequeue.__class__ == idlequeue.GlibIdleQueue:
|
|
||||||
gobject.timeout_add_seconds(2, self.process_connections)
|
|
||||||
else:
|
|
||||||
gobject.timeout_add(200, self.process_connections)
|
|
||||||
raise
|
raise
|
||||||
return True # renew timeout (loop for ever)
|
return True # renew timeout (loop for ever)
|
||||||
|
|
||||||
|
@ -3111,15 +3107,9 @@ class Interface:
|
||||||
else:
|
else:
|
||||||
gajim.log.setLevel(None)
|
gajim.log.setLevel(None)
|
||||||
|
|
||||||
# pygtk2.8+ on win, breaks io_add_watch.
|
# GObjectIdleQueue is currently broken with ssl. Use good old select
|
||||||
# We use good old select.select()
|
|
||||||
if os.name == 'nt':
|
|
||||||
gajim.idlequeue = idlequeue.SelectIdleQueue()
|
gajim.idlequeue = idlequeue.SelectIdleQueue()
|
||||||
else:
|
|
||||||
# in a nongui implementation, just call:
|
|
||||||
# gajim.idlequeue = IdleQueue() , and
|
|
||||||
# gajim.idlequeue.process() each foo miliseconds
|
|
||||||
gajim.idlequeue = idlequeue.GlibIdleQueue()
|
|
||||||
# resolve and keep current record of resolved hosts
|
# resolve and keep current record of resolved hosts
|
||||||
gajim.resolver = resolver.get_resolver(gajim.idlequeue)
|
gajim.resolver = resolver.get_resolver(gajim.idlequeue)
|
||||||
gajim.socks5queue = socks5.SocksQueue(gajim.idlequeue,
|
gajim.socks5queue = socks5.SocksQueue(gajim.idlequeue,
|
||||||
|
@ -3271,10 +3261,7 @@ class Interface:
|
||||||
self.last_ftwindow_update = 0
|
self.last_ftwindow_update = 0
|
||||||
|
|
||||||
gobject.timeout_add(100, self.autoconnect)
|
gobject.timeout_add(100, self.autoconnect)
|
||||||
if gajim.idlequeue.__class__ == idlequeue.GlibIdleQueue:
|
gobject.timeout_add(200, self.process_connections)
|
||||||
gobject.timeout_add_seconds(2, self.process_connections)
|
|
||||||
else:
|
|
||||||
gobject.timeout_add(200, self.process_connections)
|
|
||||||
gobject.timeout_add_seconds(gajim.config.get(
|
gobject.timeout_add_seconds(gajim.config.get(
|
||||||
'check_idle_every_foo_seconds'), self.read_sleepy)
|
'check_idle_every_foo_seconds'), self.read_sleepy)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue