diff --git a/src/common/xmpp/client_nb.py b/src/common/xmpp/client_nb.py index bd1b10124..7721c38cc 100644 --- a/src/common/xmpp/client_nb.py +++ b/src/common/xmpp/client_nb.py @@ -287,7 +287,8 @@ class NonBlockingClient: if self.__dict__.has_key('Dispatcher'): self.Dispatcher.PlugOut() self.got_features = False - d = dispatcher_nb.Dispatcher().PlugIn(self) + # Plugging will send our stream header + dispatcher_nb.Dispatcher().PlugIn(self) on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES') elif mode == 'FAILURE': @@ -298,16 +299,14 @@ class NonBlockingClient: self.Dispatcher.ProcessNonBlocking(data) if not hasattr(self, 'Dispatcher') or \ self.Dispatcher.Stream._document_attrs is None: - self._xmpp_connect_machine( - mode='FAILURE', - data='Error on stream open') + # only partial data received so far + return if self.incoming_stream_version() == '1.0': if not self.got_features: on_next_receive('RECEIVE_STREAM_FEATURES') else: log.info('got STREAM FEATURES in first recv') self._xmpp_connect_machine(mode='STREAM_STARTED') - else: log.info('incoming stream version less than 1.0') self._xmpp_connect_machine(mode='STREAM_STARTED') @@ -315,14 +314,11 @@ class NonBlockingClient: elif mode == 'RECEIVE_STREAM_FEATURES': if data: # sometimes are received together with document - # attributes and sometimes on next receive... + # attributes and sometimes on next receive... sometimes in + # several chunks... self.Dispatcher.ProcessNonBlocking(data) - if not self.got_features: - self._xmpp_connect_machine( - mode='FAILURE', - data='Missing in 1.0 stream') - else: - log.info('got STREAM FEATURES in second recv') + if self.got_features: + log.info('got STREAM FEATURES after several recv') self._xmpp_connect_machine(mode='STREAM_STARTED') elif mode == 'STREAM_STARTED': diff --git a/src/common/xmpp/idlequeue.py b/src/common/xmpp/idlequeue.py index c6e484641..b493d5ee2 100644 --- a/src/common/xmpp/idlequeue.py +++ b/src/common/xmpp/idlequeue.py @@ -19,7 +19,6 @@ idle objects and be informed about possible IO. import select import logging -import gobject log = logging.getLogger('gajim.c.x.idlequeue') FLAG_WRITE = 20 # write only @@ -115,7 +114,7 @@ class IdleQueue: def remove_timeout(self, fd): ''' Removes the read timeout ''' - log.info('read timeout removed for fd %s' % fd) + log.debug('read timeout removed for fd %s' % fd) if fd in self.read_timeouts: del(self.read_timeouts[fd]) @@ -126,7 +125,7 @@ class IdleQueue: A filedescriptor fd can have only one timeout. ''' - log.info('read timeout set for fd %s on %s seconds' % (fd, seconds)) + log.debug('read timeout set for fd %s on %s seconds' % (fd, seconds)) timeout = self.current_time() + seconds self.read_timeouts[fd] = timeout @@ -135,14 +134,14 @@ class IdleQueue: Execute and remove alarm callbacks and execute read_timeout() for plugged objects if specified time has ellapsed. ''' - log.info('check time evs') + log.debug('check time evs') current_time = self.current_time() for fd, timeout in self.read_timeouts.items(): if timeout > current_time: continue if fd in self.queue: - log.debug('Calling read_timeout for fd %s' % fd) + log.info('Calling read_timeout for fd %s' % fd) self.queue[fd].read_timeout() else: self.remove_timeout(fd) @@ -167,6 +166,7 @@ class IdleQueue: ''' if obj.fd == -1: return + log.info('Plug object fd %s as w:%s r:%s' % (obj.fd, writable, readable)) if obj.fd in self.queue: self.unplug_idle(obj.fd) self.queue[obj.fd] = obj @@ -208,7 +208,6 @@ class IdleQueue: return False if flags & PENDING_READ: - #print 'waiting read on %d, flags are %d' % (fd, flags) obj.pollin() return True @@ -241,6 +240,14 @@ class IdleQueue: waiting_descriptors = [] if e[0] != 4: # interrupt raise + # Maybe there is still data in ssl buffer: + # Add all sslWrappers where we have pending data. poll doesn't work here + # as it can only check sockets but the data may already be read into + # a ssl internal buffer + descriptors = (fd for fd, flag in waiting_descriptors) + waiting_descriptors.extend((fd, FLAG_READ) for (fd, obj) in + self.queue.iteritems() if not fd in descriptors and + hasattr(obj, '_sslObj') and obj._sslObj.pending()) for fd, flags in waiting_descriptors: self._process_events(fd, flags) self._check_time_events() @@ -288,8 +295,19 @@ class SelectIdleQueue(IdleQueue): self._check_time_events() return True try: + # Maybe there is still data in ssl buffer waiting_descriptors = select.select(self.read_fds.keys(), self.write_fds.keys(), self.error_fds.keys(), 0) + + # Maybe there is still data in ssl buffer: + # Add all sslWrappers where we have pending data. select doesn't work + # here as it can only check sockets but the data may already be read + # into a ssl internal buffer + waiting_descriptors[0].extend(fd for (fd, obj) + in self.queue.iteritems() if not fd in waiting_descriptors[0] and + hasattr(obj, '_sslObj') and obj._sslObj.pending()) + waiting_descriptors = (waiting_descriptors[0], waiting_descriptors[1], + waiting_descriptors[2]) except select.error, e: waiting_descriptors = ((),(),()) if e[0] != 4: # interrupt @@ -310,12 +328,13 @@ class SelectIdleQueue(IdleQueue): return True +#import gobject +# FIXME: Does not work well with SSL, data may be "forgotten" in SSL buffer class GlibIdleQueue(IdleQueue): ''' Extends IdleQueue to use glib io_add_wath, instead of select/poll In another 'non gui' implementation of Gajim IdleQueue can be used safetly. ''' - def _init_idle(self): ''' Creates a dict, which maps file/pipe/sock descriptor to glib event id diff --git a/src/common/xmpp/transports_nb.py b/src/common/xmpp/transports_nb.py index 4ef7a72e0..8c64ac692 100644 --- a/src/common/xmpp/transports_nb.py +++ b/src/common/xmpp/transports_nb.py @@ -73,8 +73,8 @@ DISCONNECT_TIMEOUT_SECONDS = 5 #: size of the buffer which reads data from server # if lower, more stanzas will be fragmented and processed twice -RECV_BUFSIZE = 32768 # 2x maximum size of ssl packet, should be plenty -#RECV_BUFSIZE = 16 # FIXME: (#2634) gajim breaks with this setting: it's inefficient but should work. +# but GUI will freeze less +RECV_BUFSIZE = 8192 DATA_RECEIVED='DATA RECEIVED' DATA_SENT='DATA SENT' @@ -201,6 +201,8 @@ class NonBlockingTransport(PlugIn): self.on_receive = None return self.on_receive = recv_handler + log.info('on_receive callback set to %s' % + self.on_receive.__name__ if self.on_receive else None) def tcp_connecting_started(self): self.set_state(CONNECTING) @@ -369,6 +371,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): if self.get_state() == CONNECTING: log.info('%s socket wrapper connected' % id(self)) self.idlequeue.remove_timeout(self.fd) + # We are only interested in stream errors right now + # _xmpp_connect_machine will take care of further plugging self._plug_idle(writable=False, readable=False) self.peerhost = self._sock.getsockname() if self.proxy_dict: @@ -439,7 +443,6 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self._do_send() else: self.sendqueue.append(r) - self._plug_idle(writable=True, readable=True) def encode_stanza(self, stanza): diff --git a/src/gajim.py b/src/gajim.py index 08ed00a92..f9a519f08 100755 --- a/src/gajim.py +++ b/src/gajim.py @@ -2862,11 +2862,7 @@ class Interface: try: gajim.idlequeue.process() except Exception: - # Otherwise, an exception will stop our loop - if gajim.idlequeue.__class__ == idlequeue.GlibIdleQueue: - gobject.timeout_add_seconds(2, self.process_connections) - else: - gobject.timeout_add(200, self.process_connections) + gobject.timeout_add(200, self.process_connections) raise return True # renew timeout (loop for ever) @@ -3111,15 +3107,9 @@ class Interface: else: gajim.log.setLevel(None) - # pygtk2.8+ on win, breaks io_add_watch. - # We use good old select.select() - if os.name == 'nt': + # GObjectIdleQueue is currently broken with ssl. Use good old select gajim.idlequeue = idlequeue.SelectIdleQueue() - else: - # in a nongui implementation, just call: - # gajim.idlequeue = IdleQueue() , and - # gajim.idlequeue.process() each foo miliseconds - gajim.idlequeue = idlequeue.GlibIdleQueue() + # resolve and keep current record of resolved hosts gajim.resolver = resolver.get_resolver(gajim.idlequeue) gajim.socks5queue = socks5.SocksQueue(gajim.idlequeue, @@ -3271,10 +3261,7 @@ class Interface: self.last_ftwindow_update = 0 gobject.timeout_add(100, self.autoconnect) - if gajim.idlequeue.__class__ == idlequeue.GlibIdleQueue: - gobject.timeout_add_seconds(2, self.process_connections) - else: - gobject.timeout_add(200, self.process_connections) + gobject.timeout_add(200, self.process_connections) gobject.timeout_add_seconds(gajim.config.get( 'check_idle_every_foo_seconds'), self.read_sleepy)