diff --git a/src/common/socks5.py b/src/common/socks5.py index da80a2c66..15139897a 100644 --- a/src/common/socks5.py +++ b/src/common/socks5.py @@ -39,6 +39,13 @@ from errno import EINTR from xmpp.idlequeue import IdleObject MAX_BUFF_LEN = 65536 +# after foo seconds without activity label transfer as 'stalled' +STALLED_TIMEOUT = 10 + +# after foo seconds of waiting to connect, disconnect from +# streamhost and try next one +CONNECT_TIMEOUT = 10 + class SocksQueue: ''' queue for all file requests objects ''' def __init__(self, idlequeue, complete_transfer_cb = None, progress_transfer_cb = None): @@ -56,8 +63,11 @@ class SocksQueue: self.progress_transfer_cb = progress_transfer_cb self.on_success = None self.on_failure = None - + def start_listener(self, host, port, sha_str, sha_handler, sid): + ''' start waiting for incomming connections on (host, port) + and do a socks5 authentication using sid for generated sha + ''' self.sha_handlers[sha_str] = (sha_handler, sid) if self.listener == None: self.listener = Socks5Listener(self.idlequeue, host, port) @@ -74,7 +84,7 @@ class SocksQueue: return None self.connected += 1 return self.listener - + def send_success_reply(self, file_props, streamhost): if file_props.has_key('streamhost-used') and \ file_props['streamhost-used'] is True: @@ -92,17 +102,12 @@ class SocksQueue: self.on_success(streamhost) return 1 return 0 - + def connect_to_hosts(self, account, sid, on_success = None, on_failure = None): self.on_success = on_success self.on_failure = on_failure - if not self.files_props.has_key(account): - pass - # FIXME ---- show error dialog - else: - file_props = self.files_props[account][sid] - file_props['success_cb'] = on_success + file_props = self.files_props[account][sid] file_props['failure_cb'] = on_failure # add streamhosts to the queue @@ -110,30 +115,74 @@ class SocksQueue: receiver = Socks5Receiver(self.idlequeue, streamhost, sid, file_props) self.add_receiver(account, receiver) streamhost['idx'] = receiver.queue_idx - + def _socket_connected(self, streamhost, file_props): + ''' called when there is a host connected to one of the + senders's streamhosts. Stop othere attempts for connections ''' for host in file_props['streamhosts']: if host != streamhost and host.has_key('idx'): if host['state'] == 1: + # remove current self.remove_receiver(streamhost['idx']) return - else: - host['state'] = -1 - self.remove_receiver(host['idx']) - + # set state -2, meaning that this streamhost is stopped, + # but it may be connectected later + if host['state'] >=0: + self.remove_receiver(host['idx']) + host['idx'] = -1 + host['state'] = -2 + + def reconnect_receiver(self, receiver, streamhost): + ''' Check the state of all streamhosts and if all has failed, then + emit connection failure cb. If there are some which are still + not connected try to establish connection to one of them. + ''' + self.idlequeue.remove_timeout(receiver.fd) + self.idlequeue.unplug_idle(receiver.fd) + file_props = receiver.file_props + streamhost['state'] = -1 + # boolean, indicates that there are hosts, which are not tested yet + unused_hosts = False + for host in file_props['streamhosts']: + if host.has_key('idx'): + if host['state'] >= 0: + return + elif host['state'] == -2: + unused_hosts = True + if unused_hosts: + for host in file_props['streamhosts']: + if host['state'] == -2: + host['state'] = 0 + receiver = Socks5Receiver(self.idlequeue, host, host['sid'], file_props) + self.add_receiver(receiver.account, receiver) + host['idx'] = receiver.queue_idx + # we still have chances to connect + return + if not file_props.has_key('received-len') or file_props['received-len'] == 0: + # there are no other streamhosts and transfer hasn't started + self._connection_refused(streamhost, file_props, receiver.queue_idx) + else: + # transfer stopped, it is most likely stopped from sender + receiver.disconnect() + file_props['error'] = -1 + self.process_result(-1, receiver) + def _connection_refused(self, streamhost, file_props, idx): + ''' cb, called when we loose connection during transfer''' if file_props is None: return streamhost['state'] = -1 - self.remove_receiver(idx) + self.remove_receiver(idx, False) if file_props.has_key('streamhosts'): for host in file_props['streamhosts']: if host['state'] != -1: return + # failure_cb exists - this means that it has never been called if file_props.has_key('failure_cb') and file_props['failure_cb']: file_props['failure_cb'](streamhost['initiator'], streamhost['id'], file_props['sid'], code = 404) - + del(file_props['failure_cb']) + def add_receiver(self, account, sock5_receiver): ''' add new file request ''' self.readers[self.idx] = sock5_receiver @@ -148,7 +197,7 @@ class SocksQueue: self.process_result(result, sock5_receiver) return 1 return None - + def get_file_from_sender(self, file_props, account): if file_props is None: return @@ -160,11 +209,12 @@ class SocksQueue: sender.account = account result = get_file_contents(0) self.process_result(result, sender) - + def result_sha(self, sha_str, idx): if self.sha_handlers.has_key(sha_str): props = self.sha_handlers[sha_str] props[0](props[1], idx) + def activate_proxy(self, idx): if not self.readers.has_key(idx): return @@ -187,6 +237,7 @@ class SocksQueue: reader.pauses = 0 # start sending file to proxy # TODO: add timeout for stalled state + self.idlequeue.set_read_timeout(reader.fd, STALLED_TIMEOUT) self.idlequeue.plug_idle(reader, True, False) result = reader.write_next() self.process_result(result, reader) @@ -206,7 +257,7 @@ class SocksQueue: file_props['last-time'] = self.idlequeue.current_time() file_props['received-len'] = 0 sender.file_props = file_props - + def add_file_props(self, account, file_props): ''' file_prop to the dict of current file_props. It is identified by account name and sid @@ -243,7 +294,6 @@ class SocksQueue: sock_hash, self, sock[0], sock[1][0], sock[1][1]) self.connected += 1 - def process_result(self, result, actor): ''' Take appropriate actions upon the result: [ 0, - 1 ] complete/end transfer @@ -263,11 +313,14 @@ class SocksQueue: the number of active connections with 1''' if idx != -1: if self.readers.has_key(idx): + reader = self.readers[idx] + self.idlequeue.unplug_idle(reader.fd) + self.idlequeue.remove_timeout(reader.fd) if do_disconnect: - self.readers[idx].disconnect() + reader.disconnect() else: - if self.readers[idx].streamhost is not None: - self.readers[idx].streamhost['state'] = -1 + if reader.streamhost is not None: + reader.streamhost['state'] = -1 del(self.readers[idx]) def remove_sender(self, idx, do_disconnect = True): @@ -304,7 +357,7 @@ class Socks5: self.size = 0 self.remaining_buff = '' self.file = None - + def open_file_for_reading(self): if self.file == None: try: @@ -316,7 +369,7 @@ class Socks5: except IOError, e: self.close_file() raise IOError, e - + def close_file(self): if self.file: if not self.file.closed: @@ -325,7 +378,7 @@ class Socks5: except: pass self.file = None - + def get_fd(self): ''' Test if file is already open and return its fd, or just open the file and return the fd. @@ -344,7 +397,7 @@ class Socks5: self.file_props['last-time'] = self.idlequeue.current_time() self.file_props['received-len'] = offset return fd - + def rem_fd(self, fd): if self.file_props.has_key('fd'): del(self.file_props['fd']) @@ -353,7 +406,7 @@ class Socks5: except: pass - + def receive(self): ''' Reads small chunks of data. Calls owner's disconnected() method if appropriate.''' @@ -414,17 +467,10 @@ class Socks5: self.remaining_buff = buff[lenn:] else: self.remaining_buff = '' - if lenn == 0: - self.pauses +=1 - else: - self.pauses = 0 - if self.pauses > 24: - self.file_props['stalled'] = True - else: - self.file_props['stalled'] = False self.state = 7 # continue to write in the socket - if lenn == 0 and self.file_props['stalled'] is False: + if lenn == 0: return None + self.file_props['stalled'] = False return lenn else: self.state = 8 # end connection @@ -495,13 +541,7 @@ class Socks5: self.file_props['completed'] = True return 0 # return number of read bytes. It can be used in progressbar - if fd == None: - self.pauses +=1 - else: - self.pauses = 0 - if self.pauses > 24: - self.file_props['stalled'] = True - else: + if fd != None: self.file_props['stalled'] = False if fd == None and self.file_props['stalled'] is False: return None @@ -520,14 +560,16 @@ class Socks5: # socket is already closed pass self.connected = False + self.idlequeue.remove_timeout(self.fd) self.idlequeue.unplug_idle(self.fd) self.fd = -1 + self.state = -1 def _get_auth_buff(self): ''' Message, that we support 1 one auth mechanism: the 'no auth' mechanism. ''' return struct.pack('!BBB', 0x05, 0x01, 0x00) - + def _parse_auth_buff(self, buff): ''' Parse the initial message and create a list of auth mechanisms ''' @@ -545,7 +587,7 @@ class Socks5: 0x00 - no auth ) ''' return struct.pack('!BB', 0x05, 0x00) - + def _get_connect_buff(self): ''' Connect request by domain name ''' buff = struct.pack('!BBBBB%dsBB' % len(self.host), @@ -559,7 +601,7 @@ class Socks5: buff = struct.pack('!BBBBB%dsBB' % len(msg), 0x05, command, 0x00, 0x03, len(msg), msg, 0, 0) return buff - + def _parse_request_buff(self, buff): try: # don't trust on what comes from the outside version, req_type, reserved, host_type, = \ @@ -583,7 +625,7 @@ class Socks5: except: return (None, None, None) return (req_type, host, port) - + def read_connect(self): ''' connect responce: version, auth method ''' buff = self._recv() @@ -593,7 +635,7 @@ class Socks5: version, method = None, None if version != 0x05 or method == 0xff: self.disconnect() - + def _get_sha1_auth(self): ''' get sha of sid + Initiator jid + Target jid ''' if self.file_props.has_key('is_a_proxy'): @@ -601,7 +643,7 @@ class Socks5: return sha.new('%s%s%s' % (self.sid, self.file_props['proxy_sender'], self.file_props['proxy_receiver'])).hexdigest() return sha.new('%s%s%s' % (self.sid, self.initiator, self.target)).hexdigest() - + class Socks5Sender(Socks5, IdleObject): ''' class for sending file to socket over socks5 ''' def __init__(self, idlequeue, sock_hash, parent, _sock, host = None, port = None): @@ -619,10 +661,16 @@ class Socks5Sender(Socks5, IdleObject): # start waiting for data self.idlequeue.plug_idle(self, False, True) + def read_timeout(self): + self.idlequeue.remove_timeout(self.fd) + self.file_props['stalled'] = True + self.queue.process_result(None, self) + def pollout(self): if not self.connected: - self.queue.remove_sender(self.queue_idx) + self.disconnect() return + self.idlequeue.remove_timeout(self.fd) if self.state == 2: # send reply with desired auth type self.send_raw(self._get_auth_response()) elif self.state == 4: # send positive response to the 'connect' @@ -634,9 +682,9 @@ class Socks5Sender(Socks5, IdleObject): result = self.write_next() self.queue.process_result(result, self) if result is None or result <= 0: - self.queue.remove_sender(self.queue_idx) + self.disconnect() elif self.state == 8: - self.queue.remove_sender(self.queue_idx) + self.disconnect() return else: self.disconnect() @@ -647,10 +695,7 @@ class Socks5Sender(Socks5, IdleObject): def pollend(self): self.state = 8 # end connection - self.close_file() - self.file_props['error'] = -1 - self.queue.process_result(-1, self) - self.queue.remove_sender(self.queue_idx) + self.disconnect() def pollin(self): if self.connected: @@ -667,7 +712,7 @@ class Socks5Sender(Socks5, IdleObject): result = self.get_file_contents(0) self.queue.process_result(result, self) else: - self.queue.remove_sender(self.queue_idx) + self.disconnect() def send_file(self): ''' start sending the file over verified connection ''' @@ -718,6 +763,10 @@ class Socks5Sender(Socks5, IdleObject): class Socks5Listener(IdleObject): def __init__(self, idlequeue, host, port): + ''' handle all incomming connections on (host, port) + This class implements IdleObject, but we will expect + only pollin events though + ''' self.host, self.port = host, port self.queue_idx = -1 self.idlequeue = idlequeue @@ -743,19 +792,29 @@ class Socks5Listener(IdleObject): self.idlequeue.plug_idle(self, False, True) self.started = True + def pollend(self): + ''' called when we stop listening on (host, port) ''' + self.disconnect() + def pollin(self): + ''' accept a new incomming connection and notify queue''' sock = self.accept_conn() self.queue.on_connection_accepted(sock) def disconnect(self): + ''' free all resources, we are not listening anymore ''' + self.idlequeue.remove_timeout(self.fd) self.idlequeue.unplug_idle(self.fd) self.fd = -1 + self.state = -1 + self.started = False try: self._serv.close() except: pass def accept_conn(self): + ''' accepts a new incomming connection ''' _sock = self._serv.accept() _sock[0].setblocking(False) return _sock @@ -780,6 +839,16 @@ class Socks5Receiver(Socks5, IdleObject): Socks5.__init__(self, idlequeue, streamhost['host'], int(streamhost['port']), streamhost['initiator'], streamhost['target'], sid) + def read_timeout(self): + self.idlequeue.remove_timeout(self.fd) + if self.state > 5: + # no activity for foo seconds + self.file_props['stalled'] = True + self.queue.process_result(-1, self) + else: + self.queue.reconnect_receiver(self, self.streamhost ) + self.idlequeue.unplug_idle(self.fd) + def connect(self): ''' create the socket and plug it to the idlequeue ''' self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -789,10 +858,16 @@ class Socks5Receiver(Socks5, IdleObject): self.state = 0 # about to be connected self.idlequeue.plug_idle(self, True, False) self.do_connect() - # TODO: add timeout for establishing connection + self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT) return None + def _is_connected(self): + if self.state < 5: + return False + return True + def pollout(self): + self.idlequeue.remove_timeout(self.fd) if self.state == 0: self.do_connect() return @@ -812,11 +887,13 @@ class Socks5Receiver(Socks5, IdleObject): self.idlequeue.plug_idle(self, False, True) def pollend(self): - self.file_props['error'] = -1 - self.queue.process_result(-1, self) - self.queue.remove_receiver(self.queue_idx) + if self.state >= 5: + self.disconnect() + else: + self.queue.reconnect_receiver(self, self.streamhost) def pollin(self): + self.idlequeue.remove_timeout(self.fd) if self.connected: if self.file_props['paused']: return @@ -829,10 +906,7 @@ class Socks5Receiver(Socks5, IdleObject): result = self.get_file_contents(0) self.queue.process_result(result, self) else: - self.queue.remove_receiver(self.queue_idx) - - def read_timeout(self, fd): - self.disconnect() + self.disconnect() def do_connect(self): try: @@ -859,6 +933,8 @@ class Socks5Receiver(Socks5, IdleObject): self.file_props['connected'] = True self.file_props['disconnect_cb'] = self.disconnect self.state = 1 # connected + + # stop all others connections to sender's streamhosts self.queue._socket_connected(self.streamhost, self.file_props) self.idlequeue.plug_idle(self, True, False) return 1 # we are connected @@ -866,15 +942,19 @@ class Socks5Receiver(Socks5, IdleObject): def main(self, timeout = 0): ''' begin negotiation. on success 'address' != 0 ''' result = 1 + buff = self.receive() + if buff == '': + # end connection + self.pollend() + return + if self.state == 2: # read auth response - buff = self.receive() if buff is None or len(buff) != 2: return None version, method = struct.unpack('!BB', buff[:2]) if version != 0x05 or method == 0xff: self.disconnect() elif self.state == 4: # get approve of our request - buff = self.receive() if buff == None: return None sub_buff = buff[:4] @@ -915,6 +995,7 @@ class Socks5Receiver(Socks5, IdleObject): self.file_props['received-len'] = 0 self.pauses = 0 # start sending file contents to socket + self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT) self.idlequeue.plug_idle(self, True, False) else: # receiving file contents from socket