send_messages uses NonBlocking P2PConnection
This commit is contained in:
		
							parent
							
								
									81b46bb37f
								
							
						
					
					
						commit
						9c60d30351
					
				
					 2 changed files with 85 additions and 48 deletions
				
			
		| 
						 | 
				
			
			@ -30,7 +30,7 @@ DATA_SENT='DATA SENT'
 | 
			
		|||
TYPE_SERVER, TYPE_CLIENT = range(2)
 | 
			
		||||
 | 
			
		||||
class ZeroconfListener(IdleObject):
 | 
			
		||||
	def __init__(self, port, caller = None):
 | 
			
		||||
	def __init__(self, port, conn_holder):
 | 
			
		||||
		''' handle all incomming connections on ('0.0.0.0', port)'''
 | 
			
		||||
		self.port = port
 | 
			
		||||
		self.queue_idx = -1	
 | 
			
		||||
| 
						 | 
				
			
			@ -38,7 +38,8 @@ class ZeroconfListener(IdleObject):
 | 
			
		|||
		self.started = False
 | 
			
		||||
		self._sock = None
 | 
			
		||||
		self.fd = -1
 | 
			
		||||
		self.caller = caller
 | 
			
		||||
		self.caller = conn_holder.caller
 | 
			
		||||
		self.conn_holder = conn_holder
 | 
			
		||||
		
 | 
			
		||||
	def bind(self):
 | 
			
		||||
		self._serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
			
		||||
| 
						 | 
				
			
			@ -64,7 +65,7 @@ class ZeroconfListener(IdleObject):
 | 
			
		|||
	def pollin(self):
 | 
			
		||||
		''' accept a new incomming connection and notify queue'''
 | 
			
		||||
		sock = self.accept_conn()
 | 
			
		||||
		P2PClient(sock[0], sock[1][0], sock[1][1], self.caller)
 | 
			
		||||
		P2PClient(sock[0], sock[1][0], sock[1][1], self.conn_holder)
 | 
			
		||||
	
 | 
			
		||||
	def disconnect(self):
 | 
			
		||||
		''' free all resources, we are not listening anymore '''
 | 
			
		||||
| 
						 | 
				
			
			@ -76,7 +77,7 @@ class ZeroconfListener(IdleObject):
 | 
			
		|||
			self._serv.close()
 | 
			
		||||
		except:
 | 
			
		||||
			pass
 | 
			
		||||
		#XXX kill all active connection
 | 
			
		||||
		self.conn_holder.kill_all_connections()
 | 
			
		||||
	
 | 
			
		||||
	def accept_conn(self):
 | 
			
		||||
		''' accepts a new incomming connection '''
 | 
			
		||||
| 
						 | 
				
			
			@ -87,33 +88,42 @@ class ZeroconfListener(IdleObject):
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
class P2PClient(IdleObject):
 | 
			
		||||
	def __init__(self, _sock, host, port, caller, messagequeue = []):
 | 
			
		||||
	def __init__(self, _sock, host, port, conn_holder, messagequeue = []):
 | 
			
		||||
		self._owner = self
 | 
			
		||||
		self.Namespace = 'jabber:client'
 | 
			
		||||
		self.defaultNamespace = self.Namespace
 | 
			
		||||
		self._component = 0
 | 
			
		||||
		self._registered_name = None
 | 
			
		||||
		self._caller = caller
 | 
			
		||||
		self._caller = conn_holder.caller
 | 
			
		||||
		self.conn_holder = conn_holder
 | 
			
		||||
		self.messagequeue = messagequeue
 | 
			
		||||
		self.Server = host
 | 
			
		||||
		self.DBG = 'client'
 | 
			
		||||
		self.Connection = None
 | 
			
		||||
		debug = ['always', 'nodebuilder']
 | 
			
		||||
		self._DEBUG = Debug.Debug(debug)
 | 
			
		||||
		self.DEBUG = self._DEBUG.Show
 | 
			
		||||
		self.debug_flags = self._DEBUG.debug_flags
 | 
			
		||||
		self.debug_flags.append(self.DBG)
 | 
			
		||||
		self.sock_hash = None
 | 
			
		||||
		if _sock:
 | 
			
		||||
			self.sock_type = TYPE_SERVER
 | 
			
		||||
		else:
 | 
			
		||||
			self.sock_type = TYPE_CLIENT
 | 
			
		||||
		P2PConnection('', _sock, host, port, caller, self.on_connect)
 | 
			
		||||
		P2PConnection('', _sock, host, port, self._caller, self.on_connect)
 | 
			
		||||
	
 | 
			
		||||
	def on_connect(self, conn):
 | 
			
		||||
		self.Connection = conn
 | 
			
		||||
		self.Connection.PlugIn(self)
 | 
			
		||||
		dispatcher_nb.Dispatcher().PlugIn(self)
 | 
			
		||||
		self.RegisterHandler('message', self._messageCB)
 | 
			
		||||
	
 | 
			
		||||
		self.sock_hash = conn._sock.__hash__
 | 
			
		||||
		self.conn_holder.add_connection(self)
 | 
			
		||||
		if self.sock_type == TYPE_CLIENT:
 | 
			
		||||
			while self.messagequeue:
 | 
			
		||||
				message = self.messagequeue.pop(0)
 | 
			
		||||
				self.send(message)
 | 
			
		||||
 | 
			
		||||
	def StreamInit(self):
 | 
			
		||||
		''' Send an initial stream header. '''
 | 
			
		||||
		self.Dispatcher.Stream = simplexml.NodeBuilder()
 | 
			
		||||
| 
						 | 
				
			
			@ -136,17 +146,32 @@ class P2PClient(IdleObject):
 | 
			
		|||
	
 | 
			
		||||
	def _check_stream_start(self, ns, tag, attrs):
 | 
			
		||||
		if ns<>NS_STREAMS or tag<>'stream':
 | 
			
		||||
			raise ValueError('Incorrect stream start: (%s,%s). Terminating.' % (tag, ns))
 | 
			
		||||
			self.Connection.DEBUG('Incorrect stream start: (%s,%s).Terminating! ' % (tag, ns), 'error')
 | 
			
		||||
			self.Connection.disconnect()
 | 
			
		||||
			return
 | 
			
		||||
		if self.sock_type == TYPE_SERVER:
 | 
			
		||||
			self.send_stream_header()
 | 
			
		||||
			for message in self.messagequeue:
 | 
			
		||||
			while self.messagequeue:
 | 
			
		||||
				message = self.messagequeue.pop(0)
 | 
			
		||||
				self.send(message)
 | 
			
		||||
 | 
			
		||||
	
 | 
			
		||||
	def disconnected(self):
 | 
			
		||||
	def on_disconnect(self):
 | 
			
		||||
		if self.conn_holder:
 | 
			
		||||
			self.conn_holder.remove_connection(self.sock_hash) 
 | 
			
		||||
		if self.__dict__.has_key('Dispatcher'):
 | 
			
		||||
			self.Dispatcher.PlugOut()
 | 
			
		||||
		if self.__dict__.has_key('P2PConnection'):
 | 
			
		||||
			self.P2PConnection.PlugOut()
 | 
			
		||||
		self.Connection = None
 | 
			
		||||
		self._caller = None
 | 
			
		||||
		self.conn_holder = None
 | 
			
		||||
	
 | 
			
		||||
	def force_disconnect(self):
 | 
			
		||||
		if self.Connection:
 | 
			
		||||
			self.disconnect()
 | 
			
		||||
		else:
 | 
			
		||||
			self.on_disconnect()
 | 
			
		||||
		
 | 
			
		||||
	def _on_receive_document_attrs(self, data):
 | 
			
		||||
		if data:
 | 
			
		||||
| 
						 | 
				
			
			@ -184,32 +209,30 @@ class P2PConnection(IdleObject, PlugIn):
 | 
			
		|||
		self._exported_methods=[self.send, self.disconnect, self.onreceive]
 | 
			
		||||
		self.on_receive = None
 | 
			
		||||
		if _sock:
 | 
			
		||||
			self.connected = True
 | 
			
		||||
			self._sock = _sock
 | 
			
		||||
			self.state = 1 
 | 
			
		||||
			_sock.setblocking(False)
 | 
			
		||||
			self.fd = _sock.fileno()
 | 
			
		||||
			self._sock.setblocking(False)
 | 
			
		||||
			self.fd = self._sock.fileno()
 | 
			
		||||
			self.on_connect(self)
 | 
			
		||||
		else:
 | 
			
		||||
			self.connected = False
 | 
			
		||||
			self.state = 0
 | 
			
		||||
			self.idlequeue.plug_idle(self, True, False)
 | 
			
		||||
			self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
			
		||||
			self._sock.setblocking(False)
 | 
			
		||||
			self.fd = self._sock.fileno()
 | 
			
		||||
			gajim.idlequeue.plug_idle(self, True, False)
 | 
			
		||||
			self.do_connect()
 | 
			
		||||
			
 | 
			
		||||
		
 | 
			
		||||
		
 | 
			
		||||
		
 | 
			
		||||
 | 
			
		||||
	def plugin(self, owner):
 | 
			
		||||
		self.onreceive(owner._on_receive_document_attrs)
 | 
			
		||||
		self._plug_idle()
 | 
			
		||||
		return True
 | 
			
		||||
	
 | 
			
		||||
 | 
			
		||||
	def plugout(self):
 | 
			
		||||
		''' Disconnect from the remote server and unregister self.disconnected method from
 | 
			
		||||
			the owner's dispatcher. '''
 | 
			
		||||
		self.disconnect()
 | 
			
		||||
		self._owner.Connection = None
 | 
			
		||||
		self._owner = None
 | 
			
		||||
	
 | 
			
		||||
 | 
			
		||||
	def onreceive(self, recv_handler):
 | 
			
		||||
		if not recv_handler:
 | 
			
		||||
			if hasattr(self._owner, 'Dispatcher'):
 | 
			
		||||
| 
						 | 
				
			
			@ -245,29 +268,26 @@ class P2PConnection(IdleObject, PlugIn):
 | 
			
		|||
	
 | 
			
		||||
	
 | 
			
		||||
	def do_connect(self):
 | 
			
		||||
		errnum = 0
 | 
			
		||||
		try:
 | 
			
		||||
			self._sock.connect((self.host, self.port))
 | 
			
		||||
			self._sock.setblocking(False)
 | 
			
		||||
		except Exception, ee:
 | 
			
		||||
			(errnum, errstr) = ee
 | 
			
		||||
			if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): 
 | 
			
		||||
				return
 | 
			
		||||
			# win32 needs this
 | 
			
		||||
			elif errnum not in  (10056, errno.EISCONN) or self.state != 0:
 | 
			
		||||
				self.disconnect()
 | 
			
		||||
				return None
 | 
			
		||||
			else: # socket is already connected
 | 
			
		||||
				self._sock.setblocking(False)
 | 
			
		||||
		self.connected = True
 | 
			
		||||
		if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): 
 | 
			
		||||
			return
 | 
			
		||||
		# win32 needs this
 | 
			
		||||
		elif errnum not in (0, 10056, errno.EISCONN) or self.state != 0:
 | 
			
		||||
			self.disconnect()
 | 
			
		||||
			return None
 | 
			
		||||
		else: # socket is already connected
 | 
			
		||||
			self._sock.setblocking(False)
 | 
			
		||||
		self.state = 1 # connected
 | 
			
		||||
		self.on_connect(self)
 | 
			
		||||
		return 1 # we are connected
 | 
			
		||||
	
 | 
			
		||||
	
 | 
			
		||||
	def pollout(self):
 | 
			
		||||
		if not self.connected:
 | 
			
		||||
			self.disconnect()
 | 
			
		||||
			return
 | 
			
		||||
		if self.state == 0:
 | 
			
		||||
			self.do_connect()
 | 
			
		||||
			return
 | 
			
		||||
| 
						 | 
				
			
			@ -314,10 +334,8 @@ class P2PConnection(IdleObject, PlugIn):
 | 
			
		|||
			self.on_receive(received)
 | 
			
		||||
		else:
 | 
			
		||||
			# This should never happed, so we need the debug
 | 
			
		||||
			self.DEBUG('Unhandled data received: %s' % received,'got')
 | 
			
		||||
			self.DEBUG('Unhandled data received: %s' % received,'error')
 | 
			
		||||
			self.disconnect()
 | 
			
		||||
			if self.on_connect_failure:
 | 
			
		||||
				self.on_connect_failure()
 | 
			
		||||
		return True
 | 
			
		||||
	
 | 
			
		||||
	def onreceive(self, recv_handler):
 | 
			
		||||
| 
						 | 
				
			
			@ -342,10 +360,9 @@ class P2PConnection(IdleObject, PlugIn):
 | 
			
		|||
		except:
 | 
			
		||||
			# socket is already closed
 | 
			
		||||
			pass
 | 
			
		||||
		self.connected = False
 | 
			
		||||
		self.fd = -1
 | 
			
		||||
		self.state = -1
 | 
			
		||||
		self._owner.disconnected()
 | 
			
		||||
		self._owner.on_disconnect()
 | 
			
		||||
 | 
			
		||||
	def _do_send(self):
 | 
			
		||||
		if not self.sendbuff:
 | 
			
		||||
| 
						 | 
				
			
			@ -405,10 +422,23 @@ class ClientZeroconf:
 | 
			
		|||
		self.roster = roster_zeroconf.Roster(zeroconf)
 | 
			
		||||
		self.caller = caller
 | 
			
		||||
		self.start_listener(zeroconf.port)
 | 
			
		||||
		self.connections = {}
 | 
			
		||||
		
 | 
			
		||||
	def kill_all_connections(self):
 | 
			
		||||
		for connection in self.connections.values():
 | 
			
		||||
			connection.force_disconnect()
 | 
			
		||||
	
 | 
			
		||||
	def add_connection(self, connection):
 | 
			
		||||
		sock_hash = connection.sock_hash
 | 
			
		||||
		if sock_hash not in self.connections:
 | 
			
		||||
			self.connections[sock_hash] = connection
 | 
			
		||||
		
 | 
			
		||||
	def remove_connection(self, sock_hash):
 | 
			
		||||
		if sock_hash in self.connections:
 | 
			
		||||
			self.connections[sock_hash]
 | 
			
		||||
		
 | 
			
		||||
	def start_listener(self, port):
 | 
			
		||||
		self.listener = ZeroconfListener(port, self.caller)
 | 
			
		||||
		self.listener = ZeroconfListener(port, self)
 | 
			
		||||
		self.listener.bind()
 | 
			
		||||
		if self.listener.started is False:
 | 
			
		||||
			self.listener = None
 | 
			
		||||
| 
						 | 
				
			
			@ -416,9 +446,17 @@ class ClientZeroconf:
 | 
			
		|||
			# dialog from dialogs.py and fail
 | 
			
		||||
			BindPortError(port)
 | 
			
		||||
			return None
 | 
			
		||||
		#~ self.connected += 1
 | 
			
		||||
	
 | 
			
		||||
	def getRoster(self):
 | 
			
		||||
		return self.roster.getRoster()
 | 
			
		||||
 | 
			
		||||
	def send(self, str):
 | 
			
		||||
		pass
 | 
			
		||||
	def send(self, msg_iq):
 | 
			
		||||
		msg_iq.setFrom(self.roster.zeroconf.name)
 | 
			
		||||
		to = msg_iq.getTo()
 | 
			
		||||
		try:
 | 
			
		||||
			item = self.roster[to]
 | 
			
		||||
		except KeyError:
 | 
			
		||||
			#XXX invalid recipient, show some error maybe ?
 | 
			
		||||
			return
 | 
			
		||||
		P2PClient(None, item['address'], item['port'], self, [msg_iq])
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -267,7 +267,6 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf):
 | 
			
		|||
	chatstate = None, msg_id = None, composing_jep = None, resource = None, 
 | 
			
		||||
	user_nick = None):
 | 
			
		||||
		print 'connection_zeroconf.py: send_message'
 | 
			
		||||
 | 
			
		||||
		fjid = jid
 | 
			
		||||
 | 
			
		||||
		if not self.connection:
 | 
			
		||||
| 
						 | 
				
			
			@ -319,7 +318,8 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf):
 | 
			
		|||
				# when msgtxt, requests JEP-0022 composing notification
 | 
			
		||||
				if chatstate is 'composing' or msgtxt: 
 | 
			
		||||
					chatstate_node.addChild(name = 'composing') 
 | 
			
		||||
 | 
			
		||||
		
 | 
			
		||||
		self.connection.send(msg_iq)
 | 
			
		||||
		no_log_for = gajim.config.get_per('accounts', self.name, 'no_log_for')
 | 
			
		||||
		ji = gajim.get_jid_without_resource(jid)
 | 
			
		||||
		if self.name not in no_log_for and ji not in no_log_for:
 | 
			
		||||
| 
						 | 
				
			
			@ -332,8 +332,7 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf):
 | 
			
		|||
				else:
 | 
			
		||||
					kind = 'single_msg_sent'
 | 
			
		||||
				gajim.logger.write(kind, jid, log_msg)
 | 
			
		||||
		
 | 
			
		||||
		self.zeroconf.send_message(jid, msgtxt, type)
 | 
			
		||||
		#~ self.zeroconf.send_message(jid, msgtxt, type)
 | 
			
		||||
		
 | 
			
		||||
		self.dispatch('MSGSENT', (jid, msg, keyID))
 | 
			
		||||
		
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		
		Reference in a new issue