From 9c60d30351620dbfd6d7bd43547ecfdeefc44919 Mon Sep 17 00:00:00 2001 From: Dimitur Kirov Date: Tue, 19 Sep 2006 19:56:04 +0000 Subject: [PATCH] send_messages uses NonBlocking P2PConnection --- src/common/zeroconf/client_zeroconf.py | 126 ++++++++++++++------- src/common/zeroconf/connection_zeroconf.py | 7 +- 2 files changed, 85 insertions(+), 48 deletions(-) diff --git a/src/common/zeroconf/client_zeroconf.py b/src/common/zeroconf/client_zeroconf.py index 4ed64809c..f139d2f25 100644 --- a/src/common/zeroconf/client_zeroconf.py +++ b/src/common/zeroconf/client_zeroconf.py @@ -30,7 +30,7 @@ DATA_SENT='DATA SENT' TYPE_SERVER, TYPE_CLIENT = range(2) class ZeroconfListener(IdleObject): - def __init__(self, port, caller = None): + def __init__(self, port, conn_holder): ''' handle all incomming connections on ('0.0.0.0', port)''' self.port = port self.queue_idx = -1 @@ -38,7 +38,8 @@ class ZeroconfListener(IdleObject): self.started = False self._sock = None self.fd = -1 - self.caller = caller + self.caller = conn_holder.caller + self.conn_holder = conn_holder def bind(self): self._serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -64,7 +65,7 @@ class ZeroconfListener(IdleObject): def pollin(self): ''' accept a new incomming connection and notify queue''' sock = self.accept_conn() - P2PClient(sock[0], sock[1][0], sock[1][1], self.caller) + P2PClient(sock[0], sock[1][0], sock[1][1], self.conn_holder) def disconnect(self): ''' free all resources, we are not listening anymore ''' @@ -76,7 +77,7 @@ class ZeroconfListener(IdleObject): self._serv.close() except: pass - #XXX kill all active connection + self.conn_holder.kill_all_connections() def accept_conn(self): ''' accepts a new incomming connection ''' @@ -87,33 +88,42 @@ class ZeroconfListener(IdleObject): class P2PClient(IdleObject): - def __init__(self, _sock, host, port, caller, messagequeue = []): + def __init__(self, _sock, host, port, conn_holder, messagequeue = []): self._owner = self self.Namespace = 'jabber:client' self.defaultNamespace = self.Namespace self._component = 0 self._registered_name = None - self._caller = caller + self._caller = conn_holder.caller + self.conn_holder = conn_holder self.messagequeue = messagequeue self.Server = host self.DBG = 'client' + self.Connection = None debug = ['always', 'nodebuilder'] self._DEBUG = Debug.Debug(debug) self.DEBUG = self._DEBUG.Show self.debug_flags = self._DEBUG.debug_flags self.debug_flags.append(self.DBG) + self.sock_hash = None if _sock: self.sock_type = TYPE_SERVER else: self.sock_type = TYPE_CLIENT - P2PConnection('', _sock, host, port, caller, self.on_connect) + P2PConnection('', _sock, host, port, self._caller, self.on_connect) def on_connect(self, conn): self.Connection = conn self.Connection.PlugIn(self) dispatcher_nb.Dispatcher().PlugIn(self) self.RegisterHandler('message', self._messageCB) - + self.sock_hash = conn._sock.__hash__ + self.conn_holder.add_connection(self) + if self.sock_type == TYPE_CLIENT: + while self.messagequeue: + message = self.messagequeue.pop(0) + self.send(message) + def StreamInit(self): ''' Send an initial stream header. ''' self.Dispatcher.Stream = simplexml.NodeBuilder() @@ -136,17 +146,32 @@ class P2PClient(IdleObject): def _check_stream_start(self, ns, tag, attrs): if ns<>NS_STREAMS or tag<>'stream': - raise ValueError('Incorrect stream start: (%s,%s). Terminating.' % (tag, ns)) + self.Connection.DEBUG('Incorrect stream start: (%s,%s).Terminating! ' % (tag, ns), 'error') + self.Connection.disconnect() + return if self.sock_type == TYPE_SERVER: self.send_stream_header() - for message in self.messagequeue: + while self.messagequeue: + message = self.messagequeue.pop(0) self.send(message) + - def disconnected(self): + def on_disconnect(self): + if self.conn_holder: + self.conn_holder.remove_connection(self.sock_hash) if self.__dict__.has_key('Dispatcher'): self.Dispatcher.PlugOut() if self.__dict__.has_key('P2PConnection'): self.P2PConnection.PlugOut() + self.Connection = None + self._caller = None + self.conn_holder = None + + def force_disconnect(self): + if self.Connection: + self.disconnect() + else: + self.on_disconnect() def _on_receive_document_attrs(self, data): if data: @@ -184,32 +209,30 @@ class P2PConnection(IdleObject, PlugIn): self._exported_methods=[self.send, self.disconnect, self.onreceive] self.on_receive = None if _sock: - self.connected = True + self._sock = _sock self.state = 1 - _sock.setblocking(False) - self.fd = _sock.fileno() + self._sock.setblocking(False) + self.fd = self._sock.fileno() self.on_connect(self) else: - self.connected = False self.state = 0 - self.idlequeue.plug_idle(self, True, False) + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.setblocking(False) + self.fd = self._sock.fileno() + gajim.idlequeue.plug_idle(self, True, False) self.do_connect() - - - - + def plugin(self, owner): self.onreceive(owner._on_receive_document_attrs) self._plug_idle() return True - + def plugout(self): ''' Disconnect from the remote server and unregister self.disconnected method from the owner's dispatcher. ''' self.disconnect() - self._owner.Connection = None self._owner = None - + def onreceive(self, recv_handler): if not recv_handler: if hasattr(self._owner, 'Dispatcher'): @@ -245,29 +268,26 @@ class P2PConnection(IdleObject, PlugIn): def do_connect(self): + errnum = 0 try: self._sock.connect((self.host, self.port)) self._sock.setblocking(False) except Exception, ee: (errnum, errstr) = ee - if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): - return - # win32 needs this - elif errnum not in (10056, errno.EISCONN) or self.state != 0: - self.disconnect() - return None - else: # socket is already connected - self._sock.setblocking(False) - self.connected = True + if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): + return + # win32 needs this + elif errnum not in (0, 10056, errno.EISCONN) or self.state != 0: + self.disconnect() + return None + else: # socket is already connected + self._sock.setblocking(False) self.state = 1 # connected self.on_connect(self) return 1 # we are connected def pollout(self): - if not self.connected: - self.disconnect() - return if self.state == 0: self.do_connect() return @@ -314,10 +334,8 @@ class P2PConnection(IdleObject, PlugIn): self.on_receive(received) else: # This should never happed, so we need the debug - self.DEBUG('Unhandled data received: %s' % received,'got') + self.DEBUG('Unhandled data received: %s' % received,'error') self.disconnect() - if self.on_connect_failure: - self.on_connect_failure() return True def onreceive(self, recv_handler): @@ -342,10 +360,9 @@ class P2PConnection(IdleObject, PlugIn): except: # socket is already closed pass - self.connected = False self.fd = -1 self.state = -1 - self._owner.disconnected() + self._owner.on_disconnect() def _do_send(self): if not self.sendbuff: @@ -405,10 +422,23 @@ class ClientZeroconf: self.roster = roster_zeroconf.Roster(zeroconf) self.caller = caller self.start_listener(zeroconf.port) + self.connections = {} + def kill_all_connections(self): + for connection in self.connections.values(): + connection.force_disconnect() + + def add_connection(self, connection): + sock_hash = connection.sock_hash + if sock_hash not in self.connections: + self.connections[sock_hash] = connection + + def remove_connection(self, sock_hash): + if sock_hash in self.connections: + self.connections[sock_hash] def start_listener(self, port): - self.listener = ZeroconfListener(port, self.caller) + self.listener = ZeroconfListener(port, self) self.listener.bind() if self.listener.started is False: self.listener = None @@ -416,9 +446,17 @@ class ClientZeroconf: # dialog from dialogs.py and fail BindPortError(port) return None - #~ self.connected += 1 + def getRoster(self): return self.roster.getRoster() - def send(self, str): - pass + def send(self, msg_iq): + msg_iq.setFrom(self.roster.zeroconf.name) + to = msg_iq.getTo() + try: + item = self.roster[to] + except KeyError: + #XXX invalid recipient, show some error maybe ? + return + P2PClient(None, item['address'], item['port'], self, [msg_iq]) + diff --git a/src/common/zeroconf/connection_zeroconf.py b/src/common/zeroconf/connection_zeroconf.py index 4a45c462f..c0afab1b9 100644 --- a/src/common/zeroconf/connection_zeroconf.py +++ b/src/common/zeroconf/connection_zeroconf.py @@ -267,7 +267,6 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf): chatstate = None, msg_id = None, composing_jep = None, resource = None, user_nick = None): print 'connection_zeroconf.py: send_message' - fjid = jid if not self.connection: @@ -319,7 +318,8 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf): # when msgtxt, requests JEP-0022 composing notification if chatstate is 'composing' or msgtxt: chatstate_node.addChild(name = 'composing') - + + self.connection.send(msg_iq) no_log_for = gajim.config.get_per('accounts', self.name, 'no_log_for') ji = gajim.get_jid_without_resource(jid) if self.name not in no_log_for and ji not in no_log_for: @@ -332,8 +332,7 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf): else: kind = 'single_msg_sent' gajim.logger.write(kind, jid, log_msg) - - self.zeroconf.send_message(jid, msgtxt, type) + #~ self.zeroconf.send_message(jid, msgtxt, type) self.dispatch('MSGSENT', (jid, msg, keyID))