From 284160c5b23096ebec0a12ce46b93e7ad0b4ba42 Mon Sep 17 00:00:00 2001 From: Dimitur Kirov Date: Thu, 4 Aug 2005 07:23:14 +0000 Subject: [PATCH] use non blocking sockets everywhere in FT --- src/common/connection.py | 8 +- src/common/socks5.py | 298 +++++++++++++++++++++++++-------------- src/gajim.py | 2 + 3 files changed, 201 insertions(+), 107 deletions(-) diff --git a/src/common/connection.py b/src/common/connection.py index 0324c78fd..8cb229335 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -407,16 +407,16 @@ class Connection: sock5 = socks5.Socks5Receiver(host = streamhost['host'], \ port = int(streamhost['port']), initiator = streamhost['jid'], target = target, sid = sid, file_props = file_props) - ret = gajim.socks5queue.add_receiver(self.name, sock5) - if ret is None: - continue iq = common.xmpp.Iq(to = streamhost['jid'], typ = 'result', frm = target) iq.setAttr('id', id) query = iq.setTag('query') query.setNamespace(common.xmpp.NS_BYTESTREAM) stream_tag = query.setTag('streamhost-used') stream_tag.setAttr('jid', streamhost['jid']) - self.to_be_sent.append(iq) + ret = gajim.socks5queue.add_receiver(self.name, sock5, + self.to_be_sent.append, iq) + if ret is None: + continue raise common.xmpp.NodeProcessed def _bytestreamResultCB(self, con, iq_obj): diff --git a/src/common/socks5.py b/src/common/socks5.py index d292f70d0..167d0e8ae 100644 --- a/src/common/socks5.py +++ b/src/common/socks5.py @@ -21,6 +21,7 @@ import socket import select +import os try: import fcntl except: @@ -28,6 +29,8 @@ except: import struct import sha +MAX_BUFF_LEN = 65536 + class SocksQueue: ''' queue for all file requests objects ''' def __init__(self, complete_transfer_cb = None, \ @@ -64,19 +67,26 @@ class SocksQueue: result = sender.send_file(file_props) self.process_result(result, sender) - def add_receiver(self, account, sock5_receiver): + def add_receiver(self, account, sock5_receiver, auth_cb, auth_param): ''' add new file request ''' self.readers[self.idx] = sock5_receiver sock5_receiver.queue_idx = self.idx sock5_receiver.queue = self sock5_receiver.account = account + sock5_receiver.auth_cb = auth_cb + sock5_receiver.auth_param = auth_param self.idx += 1 result = sock5_receiver.connect() - self.connected += 1 + if result != None: + sock5_receiver._sock.setblocking(False) + result = sock5_receiver.main() + self.process_result(result, sock5_receiver) + self.connected += 1 + return 1 # we don't need blocking sockets anymore # this unblocks ui! - sock5_receiver._sock.setblocking(False) - return result + + return None def add_file_props(self, account, file_props): if file_props is None or \ @@ -107,33 +117,51 @@ class SocksQueue: for idx in self.senders.keys(): sender = self.senders[idx] if sender.connected: - if sender.state == 1: + + if sender.state < 5: if sender.pending_data(): - result = sender.get_data() - if result is not None: - sender.state = 2 - self.result_sha(result, idx) - else: + result = sender.main() + if sender.state == 4: + self.result_sha(sender.sha_msg, idx) + if result is None: + continue + if result == -1: sender.disconnect() - elif sender.state == 3: - result = sender.write_next() - self.process_result(result, sender) - elif sender.state == 4: + elif sender.state == 7: + for i in range(10): + if sender.file_props['paused']: + break + if not sender.connected: + self.process_result(-1, sender) + break + if sender.state == 8: + self.remove_sender(idx) + break + result = sender.write_next() + self.process_result(result, sender) + elif sender.state == 8: self.remove_sender(idx) else: - self.remove_sender(idx) + self.remove_sender(idx) for idx in self.readers.keys(): receiver = self.readers[idx] if receiver.connected: if receiver.file_props['paused']: continue - if receiver.pending_data(): + if receiver.state == 5: result = receiver.get_file_contents(timeout) self.process_result(result, receiver) + else: + pd = receiver.pending_data() + if pd: + result = receiver.main(timeout) + self.process_result(result, receiver) else: self.remove_receiver(idx) def process_result(self, result, actor): + if result is None: + return if result in [0, -1] and \ self.complete_transfer_cb is not None: self.complete_transfer_cb(actor.account, @@ -141,7 +169,7 @@ class SocksQueue: elif self.progress_transfer_cb is not None: self.progress_transfer_cb(actor.account, actor.file_props) - + def remove_receiver(self, idx): if idx != -1: if self.readers.has_key(idx): @@ -165,57 +193,74 @@ class Socks5: self.sid = sid self._sock = None self.account = None + self.state = 0 # not connected def connect(self): self._sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((self.host, self.port)) - self._sock.setblocking(True) - self._send=self._sock.sendall - self._recv=self._sock.recv + try: + self._sock.connect((self.host, self.port)) + self._sock.setblocking(False) + self._send=self._sock.send + self._recv=self._sock.recv + except Exception, e: + return None + self.buff = '' self.connected = True - return self.send_connect() + self.state = 1 # connected + return 1 def receive(self): - ''' Reads all pending incoming data. + ''' Reads small pending incoming data. Calls owner's disconnected() method if appropriate.''' - try: - received = self._recv(64) - except: + + if self.pending_read(): received = '' - - while self.pending_data(): - try: - add = self._recv(64) - except: - add='' - received +=add - if not add: - break - if len(received) == 0: - self.disconnect() + while self.pending_read(): + try: + add = self._recv(64) + except Exception, e: + add='' + received +=add + if not add: + break + if len(received) == 0: + self.disconnect() + else: + return None return received + + def send_raw(self,raw_data): ''' Writes raw outgoing data. Blocks until done. If supplied data is unicode string, encodes it to utf-8 before send.''' try: - self._send(raw_data) - except: + lenn = self._send(raw_data) + except Exception, e: self.disconnect() - pass + return len(raw_data) def disconnect(self): ''' Closes the socket. ''' self._sock.close() self.connected = False - def pending_data(self,timeout=0): + def pending_read(self,timeout=0): ''' Returns true if there is a data ready to be read. ''' if self._sock is None: return False try: return select.select([self._sock],[],[],timeout)[0] - except: + except Exception, e: + return False + + def pending_write(self,timeout=0): + ''' Returns true if there is a data ready to be read. ''' + if self._sock is None: + return False + try: + return select.select([],[self._sock],[],timeout)[0] + except Exception, e: return False def pending_connection(self,timeout=0): @@ -224,32 +269,9 @@ class Socks5: return False try: return select.select([],[self._sock],[],timeout)[0] - except: + except Exception, e: return False - def send_connect(self): - ''' begin negotiation. on success 'address' != 0 ''' - self.send_raw(self._get_auth_buff()) - buff = self.receive() - version, method = struct.unpack('!BB', buff[:2]) - if version != 0x05 or method == 0xff: - self.disconnect() - return None - self.send_raw(self._get_request_buff(self._get_sha1_auth())) - buff = self.receive() - version, command, rsvd, address_type = struct.unpack('!BBBB', buff[:4]) - addrlen, address, port = 0, 0, 0 - if address_type == 0x03: - addrlen = ord(buff[4]) - address = struct.unpack('!%ds' % addrlen, buff[5:addrlen + 5]) - portlen = len(buff[addrlen + 5]) - if portlen == 1: # Gaim bug :) - port, = struct.unpack('!B', buff[addrlen + 5]) - else: - port, = struct.unpack('!H', buff[addrlen + 5]) - return (version, command, rsvd, address_type, addrlen, address, port) - - def _get_auth_buff(self): ''' Message, that we support 1 one auth mechanism: the 'no auth' mechanism. ''' @@ -308,12 +330,15 @@ class Socks5: def _get_sha1_auth(self): return sha.new("%s%s%s" % (self.sid, self.initiator, self.target)).hexdigest() + class Socks5Sender(Socks5): + ''' class for sending file to socket over socks5 ''' def __init__(self, sock_hash, parent, _sock, host = None, port = None): self.queue_idx = sock_hash self.queue = parent Socks5.__init__(self, host, port, None, None, None) self._sock = _sock + self._sock.setblocking(False) self._recv = _sock.recv self._send = _sock.send self.connected = True @@ -322,19 +347,18 @@ class Socks5Sender(Socks5): self.remaining_buff = '' def write_next(self): - send_size = 65536 if self.remaining_buff != '': buff = self.remaining_buff self.remaining_buff = '' else: - buff = self.fd.read(send_size) + buff = self.fd.read(MAX_BUFF_LEN) if len(buff) > 0: lenn = 0 try: lenn = self._send(buff) except Exception, e: if e.args[0] != 11: - self.state = 4 + self.state = 8 self.fd.close() self.disconnect() self.file_props['error'] = -1 @@ -342,7 +366,7 @@ class Socks5Sender(Socks5): self.size += lenn self.file_props['received-len'] = self.size if self.size == int(self.file_props['size']): - self.state = 4 + self.state = 8 self.file_props['error'] = 0 self.fd.close() self.disconnect() @@ -355,13 +379,16 @@ class Socks5Sender(Socks5): self.pauses +=1 else: self.pauses = 0 - if self.pauses > 20: - self.file_props['paused'] = True + if self.pauses > 24: + self.file_props['stalled'] = True else: - self.file_props['paused'] = False + self.file_props['stalled'] = False + self.state = 7 + if lenn == 0 and self.file_props['stalled'] is False: + return None return lenn else: - self.state = 4 + self.state = 8 self.disconnect() return -1 @@ -372,31 +399,28 @@ class Socks5Sender(Socks5): file_props['started'] = True file_props['completed'] = False file_props['paused'] = False + file_props['stalled'] = False file_props['received-len'] = 0 + self.pauses = 0 self.file_props = file_props self.size = 0 - self.state = 3 - self._sock.setblocking(False) - return self.write_next() - def get_data(self): + def main(self): if self.state == 1: buff = self.receive() if not self.connected: return -1 mechs = self._parse_auth_buff(buff) - self._sock.setblocking(True) + elif self.state == 2: self.send_raw(self._get_auth_response()) - + elif self.state == 3: buff = self.receive() - - (req_type, sha_msg, port) = self._parse_request_buff(buff) - self.send_raw(self._get_request_buff(sha_msg, 0x00)) - self.state = 2 - self._sock.setblocking(False) - return sha_msg + (req_type, self.sha_msg, port) = self._parse_request_buff(buff) + elif self.state == 4: + self.send_raw(self._get_request_buff(self.sha_msg, 0x00)) + self.state += 1 return None def pending_data(self,timeout=0): @@ -404,10 +428,10 @@ class Socks5Sender(Socks5): if self._sock is None: return False try: - if self.state == 1: - return select.select([self._sock],[],[],timeout)[0] - elif self.state == 3: - return select.select([],[self._sock],[],timeout)[0] + if self.state in [1, 3]: + return self.pending_read() + elif self.state in [2, 4, 5]: + return True except Exception, e: return False return False @@ -417,7 +441,8 @@ class Socks5Sender(Socks5): # close connection and remove us from the queue self._sock.close() self.connected = False - self.file_props['disconnect_cb'] = None + if self.file_props is not None: + self.file_props['disconnect_cb'] = None if self.queue is not None: self.queue.remove_sender(self.queue_idx) @@ -446,8 +471,7 @@ class Socks5Listener: def accept_conn(self): self._serv.accept.__doc__ _sock = self._serv.accept() - # block it untill authorization is sent - _sock[0].setblocking(True) + _sock[0].setblocking(False) return _sock def pending_connection(self,timeout=0): @@ -467,23 +491,78 @@ class Socks5Receiver(Socks5): self.queue = None self.file_props = file_props self.file_props['started'] = True - self.connected = False + self.connected = False + self.pauses = 0 if file_props: file_props['disconnect_cb'] = self.disconnect file_props['error'] = 0 self.file_props['started'] = True self.file_props['completed'] = False self.file_props['paused'] = False - self.file_props['started'] = True + self.file_props['stalled'] = False + self.file_props['started'] = True Socks5.__init__(self, host, port, initiator, target, sid) + + def main(self, timeout = 0): + ''' begin negotiation. on success 'address' != 0 ''' + if self.state == 1: + self.send_raw(self._get_auth_buff()) + elif self.state == 2: + buff = self.receive() + if buff is None or len(buff) != 2: + return None + version, method = struct.unpack('!BB', buff[:2]) + if version != 0x05 or method == 0xff: + self.disconnect() + elif self.state == 3: + self.send_raw(self._get_request_buff(self._get_sha1_auth())) + elif self.state == 4: + buff = self.receive() + if buff == None: + return None + version, command, rsvd, address_type = struct.unpack('!BBBB', buff[:4]) + addrlen, address, port = 0, 0, 0 + if address_type == 0x03: + addrlen = ord(buff[4]) + address = struct.unpack('!%ds' % addrlen, buff[5:addrlen + 5]) + portlen = len(buff[addrlen + 5]) + if portlen == 1: # Gaim bug :) + port, = struct.unpack('!B', buff[addrlen + 5]) + else: + port, = struct.unpack('!H', buff[addrlen + 5]) + self.state = 5 + self.auth_cb(self.auth_param) + return None + if self.state < 5: + self.state += 1 + return None + # we have set the connection, retrieve file + + return self.get_file_contents(timeout) + + + def pending_data(self, timeout=0): + ''' Returns true if there is a data ready to be read. ''', self.state + if self._sock is None: + return False + try: + if self.state in [2, 4]: + return self.pending_read() + elif self.state in [1, 3, 5]: + return True + except Exception, e: + return False + return False + def get_file_contents(self, timeout): - ''' read file contents from socket and write them to file "''' + ''' read file contents from socket and write them to file ''' if self.file_props is None or \ self.file_props.has_key('file-name') is False: - return - #TODO error - while self.pending_data(timeout): + self.file_props['error'] = -2 + return None + fd = None + while self.pending_read(timeout): if self.file_props.has_key('fd'): fd = self.file_props['fd'] else: @@ -491,8 +570,8 @@ class Socks5Receiver(Socks5): self.file_props['fd'] = fd self.file_props['received-len'] = 0 try: - buff = self._recv(65536) - except: + buff = self._recv(MAX_BUFF_LEN) + except Exception, e: buff = '' self.file_props['received-len'] += len(buff) fd.write(buff) @@ -503,7 +582,7 @@ class Socks5Receiver(Socks5): try: # file is not complete, remove it os.remove(self.file_props['file-name']) - except: + except Exception, e: # unable to remove the incomplete file pass self.disconnect() @@ -518,7 +597,20 @@ class Socks5Receiver(Socks5): self.file_props['completed'] = True return 0 # return number of read bytes. It can be used in progressbar - return self.file_props['received-len'] + if fd == None: + self.pauses +=1 + else: + self.pauses = 0 + if self.pauses > 24: + self.file_props['stalled'] = True + else: + self.file_props['stalled'] = False + if fd == None and self.file_props['stalled'] is False: + return None + if self.file_props.has_key('received-len'): + if self.file_props['received-len'] != 0: + return self.file_props['received-len'] + return None def disconnect(self): ''' Closes the socket. ''' diff --git a/src/gajim.py b/src/gajim.py index 120e0a130..7955c670c 100755 --- a/src/gajim.py +++ b/src/gajim.py @@ -733,6 +733,8 @@ class Interface: file_props['received-len']) else: ft.set_status(file_props['type'], file_props['sid'], 'stop') + if file_props['stalled'] or file_props['paused']: + return if gajim.config.get('notify_on_file_complete'): if gajim.config.get('autopopupaway') or \ gajim.connections[account].connected in (2, 3): # we're online or chat