From 745434fdab58924e7310ea04a67a8cd3b498b30d Mon Sep 17 00:00:00 2001 From: Stefan Bethge Date: Mon, 6 Nov 2006 21:30:39 +0000 Subject: [PATCH] give error messages when local message could not be sent, see #2586 --- src/common/xmpp/dispatcher_nb.py | 4 +- src/common/zeroconf/client_zeroconf.py | 148 ++++++++++++++------- src/common/zeroconf/connection_zeroconf.py | 21 +-- 3 files changed, 118 insertions(+), 55 deletions(-) diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index 3079ca6f2..c464b6f02 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -398,7 +398,7 @@ class Dispatcher(PlugIn): Additional callback arguments can be specified in args. ''' self.SendAndWaitForResponse(stanza, 0, func, args) - def send(self, stanza): + def send(self, stanza, is_message = False): ''' Serialise stanza and put it on the wire. Assign an unique ID to it before send. Returns assigned ID.''' if type(stanza) in [type(''), type(u'')]: @@ -425,7 +425,7 @@ class Dispatcher(PlugIn): stanza=route stanza.setNamespace(self._owner.Namespace) stanza.setParent(self._metastream) - self._owner.Connection.send(stanza) + self._owner.Connection.send(stanza, is_message) return _ID def disconnect(self): diff --git a/src/common/zeroconf/client_zeroconf.py b/src/common/zeroconf/client_zeroconf.py index 8f16f2cbc..e6109a758 100644 --- a/src/common/zeroconf/client_zeroconf.py +++ b/src/common/zeroconf/client_zeroconf.py @@ -33,10 +33,10 @@ DATA_SENT='DATA SENT' TYPE_SERVER, TYPE_CLIENT = range(2) # wait XX sec to establish a connection -CONNECT_TIMEOUT_SECONDS = 30 +CONNECT_TIMEOUT_SECONDS = 10 # after XX sec with no activity, close the stream -ACTIVITY_TIMEOUT_SECONDS = 180 +ACTIVITY_TIMEOUT_SECONDS = 30 class ZeroconfListener(IdleObject): def __init__(self, port, conn_holder): @@ -68,6 +68,7 @@ class ZeroconfListener(IdleObject): self.started = True def pollend(self): + #print 'pollend' ''' called when we stop listening on (host, port) ''' self.disconnect() @@ -89,13 +90,13 @@ class ZeroconfListener(IdleObject): self.conn_holder.kill_all_connections() def accept_conn(self): - ''' accepts a new incomming connection ''' + ''' accepts a new incoming connection ''' _sock = self._serv.accept() _sock[0].setblocking(False) return _sock class P2PClient(IdleObject): - def __init__(self, _sock, host, port, conn_holder, messagequeue = [], to = None): + def __init__(self, _sock, host, port, conn_holder, stanzaqueue = [], to = None): self._owner = self self.Namespace = 'jabber:client' self.defaultNamespace = self.Namespace @@ -103,7 +104,7 @@ class P2PClient(IdleObject): self._registered_name = None self._caller = conn_holder.caller self.conn_holder = conn_holder - self.messagequeue = messagequeue + self.stanzaqueue = stanzaqueue self.to = to self.Server = host self.DBG = 'client' @@ -121,28 +122,45 @@ class P2PClient(IdleObject): self.sock_type = TYPE_SERVER else: self.sock_type = TYPE_CLIENT - conn = P2PConnection('', _sock, host, port, self._caller, self.on_connect) + conn = P2PConnection('', _sock, host, port, self._caller, self.on_connect, self) self.sock_hash = conn._sock.__hash__ - self.conn_holder.add_connection(self, self.Server, self.to) + self.fd = conn.fd + self.conn_holder.add_connection(self, self.Server, port, self.to) + self.conn_holder.number_of_awaiting_messages[self.fd] = len(self.stanzaqueue) - def add_message(self, message): + def add_stanza(self, stanza, is_message = False): if self.Connection: if self.Connection.state == -1: return False - self.send(message) + self.send(stanza, is_message) else: - self.messagequeue.append(message) + self.stanzaqueue.append((stanza, is_message)) + + if is_message: + #print 'fd: %s' % self.fd + if self.conn_holder.number_of_awaiting_messages.has_key(self.fd): + self.conn_holder.number_of_awaiting_messages[self.fd]+=1 + else: + self.conn_holder.number_of_awaiting_messages[self.fd] = 1 + + #print "number_of_awaiting_messages %s" % self.conn_holder.number_of_awaiting_messages return True + def on_message_sent(self, connection_id): + #print 'message successfully sent' + #print connection_id + self.conn_holder.number_of_awaiting_messages[connection_id]-=1 + #print self.conn_holder.number_of_awaiting_messages + def on_connect(self, conn): self.Connection = conn self.Connection.PlugIn(self) dispatcher_nb.Dispatcher().PlugIn(self) self._register_handlers() if self.sock_type == TYPE_CLIENT: - while self.messagequeue: - message = self.messagequeue.pop(0) - self.send(message) + while self.stanzaqueue: + stanza, is_message = self.stanzaqueue.pop(0) + self.send(stanza, is_message) def StreamInit(self): ''' Send an initial stream header. ''' @@ -171,13 +189,19 @@ class P2PClient(IdleObject): return if self.sock_type == TYPE_SERVER: self.send_stream_header() - while self.messagequeue: - message = self.messagequeue.pop(0) - self.send(message) + while self.stanzaqueue: + stanza, is_message = self.stanzaqueue.pop(0) + self.send(stanza, is_message) def on_disconnect(self): + #print 'on_disconnect, to:%s' % self.to if self.conn_holder: + if self.conn_holder.number_of_awaiting_messages.has_key(self.fd): + if self.conn_holder.number_of_awaiting_messages[self.fd] > 0: + self._caller.dispatch('MSGERROR',[unicode(self.to), -1, \ + _('Connection to host could not be established'), None, None]) + del self.conn_holder.number_of_awaiting_messages[self.fd] self.conn_holder.remove_connection(self.sock_hash) if self.__dict__.has_key('Dispatcher'): self.Dispatcher.PlugOut() @@ -227,16 +251,19 @@ class P2PClient(IdleObject): class P2PConnection(IdleObject, PlugIn): ''' class for sending file to socket over socks5 ''' - def __init__(self, sock_hash, _sock, host = None, port = None, caller = None, on_connect = None): + def __init__(self, sock_hash, _sock, host = None, port = None, caller = None, on_connect = None, client = None): IdleObject.__init__(self) - self._owner = None + self._owner = client PlugIn.__init__(self) self.DBG_LINE='socket' self.sendqueue = [] self.sendbuff = None + self.buff_is_message = False self._sock = _sock + self.sock_hash = None self.host, self.port = host, port self.on_connect = on_connect + self.client = client self.writable = False self.readable = False self._exported_methods=[self.send, self.disconnect, self.onreceive] @@ -283,27 +310,34 @@ class P2PConnection(IdleObject, PlugIn): # make sure this cb is not overriden by recursive calls if not recv_handler(None) and _tmp == self.on_receive: self.on_receive = recv_handler - - - - def send(self, stanza): + + def send(self, packet, is_message = False): '''Append stanza to the queue of messages to be send. If supplied data is unicode string, encode it to utf-8. ''' if self.state <= 0: return - r = stanza + + r = packet + if isinstance(r, unicode): r = r.encode('utf-8') elif not isinstance(r, str): r = ustr(r).encode('utf-8') - self.sendqueue.append(r) + + self.sendqueue.append((r, is_message)) self._plug_idle() def read_timeout(self): + #print 'read_timeout: %s' % self.fd + #print self.client.conn_holder.number_of_awaiting_messages + if self.client.conn_holder.number_of_awaiting_messages[self.fd] > 0: + self.client._caller.dispatch('MSGERROR',[unicode(self.client.to), -1, \ + _('Connection to host could not be established'), None, None]) + #print 'error, set to zero' + self.client.conn_holder.number_of_awaiting_messages[self.fd] = 0 self.pollend() - - + def do_connect(self): errnum = 0 try: @@ -311,7 +345,7 @@ class P2PConnection(IdleObject, PlugIn): self._sock.setblocking(False) except Exception, ee: (errnum, errstr) = ee - if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): + if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): return # win32 needs this elif errnum not in (0, 10056, errno.EISCONN) or self.state != 0: @@ -328,6 +362,7 @@ class P2PConnection(IdleObject, PlugIn): if self.state == 0: self.do_connect() return + #print 'pollout:' gajim.idlequeue.remove_timeout(self.fd) self._do_send() @@ -391,6 +426,7 @@ class P2PConnection(IdleObject, PlugIn): def disconnect(self): ''' Closes the socket. ''' + #print 'disconnect' gajim.idlequeue.remove_timeout(self.fd) gajim.idlequeue.unplug_idle(self.fd) try: @@ -408,7 +444,7 @@ class P2PConnection(IdleObject, PlugIn): if not self.sendbuff: if not self.sendqueue: return None # nothing to send - self.sendbuff = self.sendqueue.pop(0) + self.sendbuff, self.buff_is_message = self.sendqueue.pop(0) self.sent_data = self.sendbuff try: send_count = self._sock.send(self.sendbuff) @@ -423,6 +459,7 @@ class P2PConnection(IdleObject, PlugIn): # we are not waiting for write self._plug_idle() self._on_send() + except socket.error, e: sys.exc_clear() if e[0] == socket.SSL_ERROR_WANT_WRITE: @@ -451,8 +488,11 @@ class P2PConnection(IdleObject, PlugIn): self.DEBUG(self.sent_data,'sent') if hasattr(self._owner, 'Dispatcher'): self._owner.Dispatcher.Event('', DATA_SENT, self.sent_data) - self.sent_data = None - + self.sent_data = None + if self.buff_is_message: + self._owner.on_message_sent(self.fd) + self.buff_is_message = False + def _on_send_failure(self): self.DEBUG("Socket error while sending data",'error') self._owner.disconnected() @@ -468,7 +508,9 @@ class ClientZeroconf: self.connections = {} self.recipient_to_hash = {} self.ip_to_hash = {} + self.hash_to_port = {} self.listener = None + self.number_of_awaiting_messages = {} def test_avahi(self): try: @@ -545,11 +587,12 @@ class ClientZeroconf: for connection in self.connections.values(): connection.force_disconnect() - def add_connection(self, connection, ip, recipient): + def add_connection(self, connection, ip, port, recipient): sock_hash = connection.sock_hash if sock_hash not in self.connections: self.connections[sock_hash] = connection self.ip_to_hash[ip] = sock_hash + self.hash_to_port[sock_hash] = port if recipient: self.recipient_to_hash[recipient] = sock_hash @@ -564,7 +607,9 @@ class ClientZeroconf: if self.ip_to_hash[i] == sock_hash: del self.ip_to_hash[i] break - + if self.hash_to_port.has_key(sock_hash): + del self.hash_to_port[sock_hash] + def start_listener(self, port): for p in range(port, port + 5): self.listener = ZeroconfListener(p, self) @@ -579,21 +624,34 @@ class ClientZeroconf: return self.roster.getRoster() return {} - def send(self, msg_iq): - msg_iq.setFrom(self.roster.zeroconf.name) - to = msg_iq.getTo() - if to in self.recipient_to_hash: - conn = self.connections[self.recipient_to_hash[to]] - if conn.add_message(msg_iq): - return + def send(self, stanza, is_message = False): + #print 'send called, is_message = %s' % is_message + #print stanza + stanza.setFrom(self.roster.zeroconf.name) + to = stanza.getTo() + try: item = self.roster[to] except KeyError: - #XXX invalid recipient, show some error maybe ? - return - if item['address'] in self.ip_to_hash: - conn = self.connections[self.ip_to_hash[item['address']]] - if conn.add_message(msg_iq): + self.caller.dispatch('MSGERROR', [unicode(to), '-1', _('Contact is offline. Your message could not be sent.'), None, None]) + return False + + # look for hashed connections + if to in self.recipient_to_hash: + conn = self.connections[self.recipient_to_hash[to]] + #print 'hashed recipient: %s' % conn.sock_hash + if conn.add_stanza(stanza, is_message): return - P2PClient(None, item['address'], item['port'], self, [msg_iq], to) + #print 'hash_to_port: %s' % self.hash_to_port + #print 'ip_to_hash: %s' % self.ip_to_hash + if item['address'] in self.ip_to_hash: + hash = self.ip_to_hash[item['address']] + if self.hash_to_port[hash] == item['port']: + #print 'hashed recipient by address: %s' % conn.sock_hash + conn = self.connections[hash] + if conn.add_stanza(stanza, is_message): + return + + # otherwise open new connection + P2PClient(None, item['address'], item['port'], self, [(stanza, is_message)], to) diff --git a/src/common/zeroconf/connection_zeroconf.py b/src/common/zeroconf/connection_zeroconf.py index 5815798e1..b5dfd5090 100644 --- a/src/common/zeroconf/connection_zeroconf.py +++ b/src/common/zeroconf/connection_zeroconf.py @@ -208,7 +208,7 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf): def _on_error(self, message): self.dispatch('ERROR', (_('Avahi error'), _("%s\nLink-local messaging might not work properly.") % message)) - + def connect(self, show = 'online', msg = ''): self.get_config_values_or_default() if not self.connection: @@ -302,7 +302,6 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf): # 'disconnect' elif show == 'offline' and self.connected: self.disconnect() - self.dispatch('STATUS', 'offline') # update status elif show != 'offline' and self.connected: @@ -311,6 +310,8 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf): if show == 'invisible': check = check and self.connection.remove_announce() elif was_invisible: + if not self.connected: + check = check and self.connect(show, msg) check = check and self.connection.announce() if self.connection and not show == 'invisible': check = check and self.connection.set_show_msg(show, msg) @@ -319,7 +320,7 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf): if check: self.dispatch('STATUS', show) else: - # show notification that avahi, or system bus is down + # show notification that avahi or system bus is down self.dispatch('STATUS', 'offline') self.status = 'offline' self.dispatch('CONNECTION_LOST', @@ -333,12 +334,16 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf): chatstate = None, msg_id = None, composing_jep = None, resource = None, user_nick = None): fjid = jid - + if not self.connection: return if not msg and chatstate is None: return + if self.status in ('invisible', 'offline'): + self.dispatch('MSGERROR', [unicode(jid), '-1', _('You are not connected or not visible to others. Your message could not be sent.'), None, None]) + return + msgtxt = msg msgenc = '' if keyID and USE_GPG: @@ -384,7 +389,9 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf): if chatstate is 'composing' or msgtxt: chatstate_node.addChild(name = 'composing') - self.connection.send(msg_iq) + if not self.connection.send(msg_iq, msg != None): + return + no_log_for = gajim.config.get_per('accounts', self.name, 'no_log_for') ji = gajim.get_jid_without_resource(jid) if self.name not in no_log_for and ji not in no_log_for: @@ -402,11 +409,9 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf): def send_stanza(self, stanza): # send a stanza untouched - print 'connection_zeroconf.py: send_stanza' if not self.connection: return - #self.connection.send(stanza) - pass + self.connection.send(stanza) def ack_subscribed(self, jid): gajim.log.debug('This should not happen (ack_subscribed)')