Revert r10931. Fixes #4632. See #2634

This commit is contained in:
Stephan Erb 2009-01-03 16:34:58 +00:00
parent 28112d6681
commit ed3de38ce5
4 changed files with 40 additions and 44 deletions

View File

@ -287,8 +287,7 @@ class NonBlockingClient:
if self.__dict__.has_key('Dispatcher'): if self.__dict__.has_key('Dispatcher'):
self.Dispatcher.PlugOut() self.Dispatcher.PlugOut()
self.got_features = False self.got_features = False
# Plugging will send our stream header d = dispatcher_nb.Dispatcher().PlugIn(self)
dispatcher_nb.Dispatcher().PlugIn(self)
on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES') on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES')
elif mode == 'FAILURE': elif mode == 'FAILURE':
@ -299,14 +298,16 @@ class NonBlockingClient:
self.Dispatcher.ProcessNonBlocking(data) self.Dispatcher.ProcessNonBlocking(data)
if not hasattr(self, 'Dispatcher') or \ if not hasattr(self, 'Dispatcher') or \
self.Dispatcher.Stream._document_attrs is None: self.Dispatcher.Stream._document_attrs is None:
# only partial data received so far self._xmpp_connect_machine(
return mode='FAILURE',
data='Error on stream open')
if self.incoming_stream_version() == '1.0': if self.incoming_stream_version() == '1.0':
if not self.got_features: if not self.got_features:
on_next_receive('RECEIVE_STREAM_FEATURES') on_next_receive('RECEIVE_STREAM_FEATURES')
else: else:
log.info('got STREAM FEATURES in first recv') log.info('got STREAM FEATURES in first recv')
self._xmpp_connect_machine(mode='STREAM_STARTED') self._xmpp_connect_machine(mode='STREAM_STARTED')
else: else:
log.info('incoming stream version less than 1.0') log.info('incoming stream version less than 1.0')
self._xmpp_connect_machine(mode='STREAM_STARTED') self._xmpp_connect_machine(mode='STREAM_STARTED')
@ -314,11 +315,14 @@ class NonBlockingClient:
elif mode == 'RECEIVE_STREAM_FEATURES': elif mode == 'RECEIVE_STREAM_FEATURES':
if data: if data:
# sometimes <features> are received together with document # sometimes <features> are received together with document
# attributes and sometimes on next receive... sometimes in # attributes and sometimes on next receive...
# several chunks...
self.Dispatcher.ProcessNonBlocking(data) self.Dispatcher.ProcessNonBlocking(data)
if self.got_features: if not self.got_features:
log.info('got STREAM FEATURES after several recv') self._xmpp_connect_machine(
mode='FAILURE',
data='Missing <features> in 1.0 stream')
else:
log.info('got STREAM FEATURES in second recv')
self._xmpp_connect_machine(mode='STREAM_STARTED') self._xmpp_connect_machine(mode='STREAM_STARTED')
elif mode == 'STREAM_STARTED': elif mode == 'STREAM_STARTED':

View File

@ -19,6 +19,7 @@ idle objects and be informed about possible IO.
import select import select
import logging import logging
import gobject
log = logging.getLogger('gajim.c.x.idlequeue') log = logging.getLogger('gajim.c.x.idlequeue')
FLAG_WRITE = 20 # write only FLAG_WRITE = 20 # write only
@ -114,7 +115,7 @@ class IdleQueue:
def remove_timeout(self, fd): def remove_timeout(self, fd):
''' Removes the read timeout ''' ''' Removes the read timeout '''
log.debug('read timeout removed for fd %s' % fd) log.info('read timeout removed for fd %s' % fd)
if fd in self.read_timeouts: if fd in self.read_timeouts:
del(self.read_timeouts[fd]) del(self.read_timeouts[fd])
@ -125,7 +126,7 @@ class IdleQueue:
A filedescriptor fd can have only one timeout. A filedescriptor fd can have only one timeout.
''' '''
log.debug('read timeout set for fd %s on %s seconds' % (fd, seconds)) log.info('read timeout set for fd %s on %s seconds' % (fd, seconds))
timeout = self.current_time() + seconds timeout = self.current_time() + seconds
self.read_timeouts[fd] = timeout self.read_timeouts[fd] = timeout
@ -134,14 +135,14 @@ class IdleQueue:
Execute and remove alarm callbacks and execute read_timeout() for plugged Execute and remove alarm callbacks and execute read_timeout() for plugged
objects if specified time has ellapsed. objects if specified time has ellapsed.
''' '''
log.debug('check time evs') log.info('check time evs')
current_time = self.current_time() current_time = self.current_time()
for fd, timeout in self.read_timeouts.items(): for fd, timeout in self.read_timeouts.items():
if timeout > current_time: if timeout > current_time:
continue continue
if fd in self.queue: if fd in self.queue:
log.info('Calling read_timeout for fd %s' % fd) log.debug('Calling read_timeout for fd %s' % fd)
self.queue[fd].read_timeout() self.queue[fd].read_timeout()
else: else:
self.remove_timeout(fd) self.remove_timeout(fd)
@ -166,7 +167,6 @@ class IdleQueue:
''' '''
if obj.fd == -1: if obj.fd == -1:
return return
log.info('Plug object fd %s as w:%s r:%s' % (obj.fd, writable, readable))
if obj.fd in self.queue: if obj.fd in self.queue:
self.unplug_idle(obj.fd) self.unplug_idle(obj.fd)
self.queue[obj.fd] = obj self.queue[obj.fd] = obj
@ -208,6 +208,7 @@ class IdleQueue:
return False return False
if flags & PENDING_READ: if flags & PENDING_READ:
#print 'waiting read on %d, flags are %d' % (fd, flags)
obj.pollin() obj.pollin()
return True return True
@ -240,14 +241,6 @@ class IdleQueue:
waiting_descriptors = [] waiting_descriptors = []
if e[0] != 4: # interrupt if e[0] != 4: # interrupt
raise raise
# Maybe there is still data in ssl buffer:
# Add all sslWrappers where we have pending data. poll doesn't work here
# as it can only check sockets but the data may already be read into
# a ssl internal buffer
descriptors = (fd for fd, flag in waiting_descriptors)
waiting_descriptors.extend((fd, FLAG_READ) for (fd, obj) in
self.queue.iteritems() if not fd in descriptors and
hasattr(obj, '_sslObj') and obj._sslObj.pending())
for fd, flags in waiting_descriptors: for fd, flags in waiting_descriptors:
self._process_events(fd, flags) self._process_events(fd, flags)
self._check_time_events() self._check_time_events()
@ -297,16 +290,6 @@ class SelectIdleQueue(IdleQueue):
try: try:
waiting_descriptors = select.select(self.read_fds.keys(), waiting_descriptors = select.select(self.read_fds.keys(),
self.write_fds.keys(), self.error_fds.keys(), 0) self.write_fds.keys(), self.error_fds.keys(), 0)
# Maybe there is still data in ssl buffer:
# Add all sslWrappers where we have pending data. select doesn't work
# here as it can only check sockets but the data may already be read
# into a ssl internal buffer
waiting_descriptors[0].extend(fd for (fd, obj)
in self.queue.iteritems() if not fd in waiting_descriptors[0] and
hasattr(obj, '_sslObj') and obj._sslObj.pending())
waiting_descriptors = (waiting_descriptors[0], waiting_descriptors[1],
waiting_descriptors[2])
except select.error, e: except select.error, e:
waiting_descriptors = ((),(),()) waiting_descriptors = ((),(),())
if e[0] != 4: # interrupt if e[0] != 4: # interrupt
@ -327,13 +310,12 @@ class SelectIdleQueue(IdleQueue):
return True return True
#import gobject
# FIXME: Does not work well with SSL, data may be "forgotten" in SSL buffer
class GlibIdleQueue(IdleQueue): class GlibIdleQueue(IdleQueue):
''' '''
Extends IdleQueue to use glib io_add_wath, instead of select/poll Extends IdleQueue to use glib io_add_wath, instead of select/poll
In another 'non gui' implementation of Gajim IdleQueue can be used safetly. In another 'non gui' implementation of Gajim IdleQueue can be used safetly.
''' '''
def _init_idle(self): def _init_idle(self):
''' '''
Creates a dict, which maps file/pipe/sock descriptor to glib event id Creates a dict, which maps file/pipe/sock descriptor to glib event id

View File

@ -73,8 +73,8 @@ DISCONNECT_TIMEOUT_SECONDS = 5
#: size of the buffer which reads data from server #: size of the buffer which reads data from server
# if lower, more stanzas will be fragmented and processed twice # if lower, more stanzas will be fragmented and processed twice
# but GUI will freeze less RECV_BUFSIZE = 32768 # 2x maximum size of ssl packet, should be plenty
RECV_BUFSIZE = 8192 #RECV_BUFSIZE = 16 # FIXME: (#2634) gajim breaks with this setting: it's inefficient but should work.
DATA_RECEIVED='DATA RECEIVED' DATA_RECEIVED='DATA RECEIVED'
DATA_SENT='DATA SENT' DATA_SENT='DATA SENT'
@ -201,8 +201,6 @@ class NonBlockingTransport(PlugIn):
self.on_receive = None self.on_receive = None
return return
self.on_receive = recv_handler self.on_receive = recv_handler
log.info('on_receive callback set to %s' %
self.on_receive.__name__ if self.on_receive else None)
def tcp_connecting_started(self): def tcp_connecting_started(self):
self.set_state(CONNECTING) self.set_state(CONNECTING)
@ -371,8 +369,6 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
if self.get_state() == CONNECTING: if self.get_state() == CONNECTING:
log.info('%s socket wrapper connected' % id(self)) log.info('%s socket wrapper connected' % id(self))
self.idlequeue.remove_timeout(self.fd) self.idlequeue.remove_timeout(self.fd)
# We are only interested in stream errors right now
# _xmpp_connect_machine will take care of further plugging
self._plug_idle(writable=False, readable=False) self._plug_idle(writable=False, readable=False)
self.peerhost = self._sock.getsockname() self.peerhost = self._sock.getsockname()
if self.proxy_dict: if self.proxy_dict:
@ -443,6 +439,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._do_send() self._do_send()
else: else:
self.sendqueue.append(r) self.sendqueue.append(r)
self._plug_idle(writable=True, readable=True) self._plug_idle(writable=True, readable=True)
def encode_stanza(self, stanza): def encode_stanza(self, stanza):

View File

@ -2862,7 +2862,11 @@ class Interface:
try: try:
gajim.idlequeue.process() gajim.idlequeue.process()
except Exception: except Exception:
gobject.timeout_add(200, self.process_connections) # Otherwise, an exception will stop our loop
if gajim.idlequeue.__class__ == idlequeue.GlibIdleQueue:
gobject.timeout_add_seconds(2, self.process_connections)
else:
gobject.timeout_add(200, self.process_connections)
raise raise
return True # renew timeout (loop for ever) return True # renew timeout (loop for ever)
@ -3107,9 +3111,15 @@ class Interface:
else: else:
gajim.log.setLevel(None) gajim.log.setLevel(None)
# GObjectIdleQueue is currently broken with ssl. Use good old select # pygtk2.8+ on win, breaks io_add_watch.
gajim.idlequeue = idlequeue.SelectIdleQueue() # We use good old select.select()
if os.name == 'nt':
gajim.idlequeue = idlequeue.SelectIdleQueue()
else:
# in a nongui implementation, just call:
# gajim.idlequeue = IdleQueue() , and
# gajim.idlequeue.process() each foo miliseconds
gajim.idlequeue = idlequeue.GlibIdleQueue()
# resolve and keep current record of resolved hosts # resolve and keep current record of resolved hosts
gajim.resolver = resolver.get_resolver(gajim.idlequeue) gajim.resolver = resolver.get_resolver(gajim.idlequeue)
gajim.socks5queue = socks5.SocksQueue(gajim.idlequeue, gajim.socks5queue = socks5.SocksQueue(gajim.idlequeue,
@ -3261,7 +3271,10 @@ class Interface:
self.last_ftwindow_update = 0 self.last_ftwindow_update = 0
gobject.timeout_add(100, self.autoconnect) gobject.timeout_add(100, self.autoconnect)
gobject.timeout_add(200, self.process_connections) if gajim.idlequeue.__class__ == idlequeue.GlibIdleQueue:
gobject.timeout_add_seconds(2, self.process_connections)
else:
gobject.timeout_add(200, self.process_connections)
gobject.timeout_add_seconds(gajim.config.get( gobject.timeout_add_seconds(gajim.config.get(
'check_idle_every_foo_seconds'), self.read_sleepy) 'check_idle_every_foo_seconds'), self.read_sleepy)