add connect timeout and reconnect to

some other streamhosts when already
established connection has failed
This commit is contained in:
Dimitur Kirov 2006-02-11 21:32:48 +00:00
parent 19ee5917bf
commit 29ab60eaa7
1 changed files with 149 additions and 68 deletions

View File

@ -39,6 +39,13 @@ from errno import EINTR
from xmpp.idlequeue import IdleObject
MAX_BUFF_LEN = 65536
# after foo seconds without activity label transfer as 'stalled'
STALLED_TIMEOUT = 10
# after foo seconds of waiting to connect, disconnect from
# streamhost and try next one
CONNECT_TIMEOUT = 10
class SocksQueue:
''' queue for all file requests objects '''
def __init__(self, idlequeue, complete_transfer_cb = None, progress_transfer_cb = None):
@ -58,6 +65,9 @@ class SocksQueue:
self.on_failure = None
def start_listener(self, host, port, sha_str, sha_handler, sid):
''' start waiting for incomming connections on (host, port)
and do a socks5 authentication using sid for generated sha
'''
self.sha_handlers[sha_str] = (sha_handler, sid)
if self.listener == None:
self.listener = Socks5Listener(self.idlequeue, host, port)
@ -97,12 +107,7 @@ class SocksQueue:
on_failure = None):
self.on_success = on_success
self.on_failure = on_failure
if not self.files_props.has_key(account):
pass
# FIXME ---- show error dialog
else:
file_props = self.files_props[account][sid]
file_props['success_cb'] = on_success
file_props = self.files_props[account][sid]
file_props['failure_cb'] = on_failure
# add streamhosts to the queue
@ -112,27 +117,71 @@ class SocksQueue:
streamhost['idx'] = receiver.queue_idx
def _socket_connected(self, streamhost, file_props):
''' called when there is a host connected to one of the
senders's streamhosts. Stop othere attempts for connections '''
for host in file_props['streamhosts']:
if host != streamhost and host.has_key('idx'):
if host['state'] == 1:
# remove current
self.remove_receiver(streamhost['idx'])
return
else:
host['state'] = -1
self.remove_receiver(host['idx'])
# set state -2, meaning that this streamhost is stopped,
# but it may be connectected later
if host['state'] >=0:
self.remove_receiver(host['idx'])
host['idx'] = -1
host['state'] = -2
def reconnect_receiver(self, receiver, streamhost):
''' Check the state of all streamhosts and if all has failed, then
emit connection failure cb. If there are some which are still
not connected try to establish connection to one of them.
'''
self.idlequeue.remove_timeout(receiver.fd)
self.idlequeue.unplug_idle(receiver.fd)
file_props = receiver.file_props
streamhost['state'] = -1
# boolean, indicates that there are hosts, which are not tested yet
unused_hosts = False
for host in file_props['streamhosts']:
if host.has_key('idx'):
if host['state'] >= 0:
return
elif host['state'] == -2:
unused_hosts = True
if unused_hosts:
for host in file_props['streamhosts']:
if host['state'] == -2:
host['state'] = 0
receiver = Socks5Receiver(self.idlequeue, host, host['sid'], file_props)
self.add_receiver(receiver.account, receiver)
host['idx'] = receiver.queue_idx
# we still have chances to connect
return
if not file_props.has_key('received-len') or file_props['received-len'] == 0:
# there are no other streamhosts and transfer hasn't started
self._connection_refused(streamhost, file_props, receiver.queue_idx)
else:
# transfer stopped, it is most likely stopped from sender
receiver.disconnect()
file_props['error'] = -1
self.process_result(-1, receiver)
def _connection_refused(self, streamhost, file_props, idx):
''' cb, called when we loose connection during transfer'''
if file_props is None:
return
streamhost['state'] = -1
self.remove_receiver(idx)
self.remove_receiver(idx, False)
if file_props.has_key('streamhosts'):
for host in file_props['streamhosts']:
if host['state'] != -1:
return
# failure_cb exists - this means that it has never been called
if file_props.has_key('failure_cb') and file_props['failure_cb']:
file_props['failure_cb'](streamhost['initiator'], streamhost['id'],
file_props['sid'], code = 404)
del(file_props['failure_cb'])
def add_receiver(self, account, sock5_receiver):
''' add new file request '''
@ -165,6 +214,7 @@ class SocksQueue:
if self.sha_handlers.has_key(sha_str):
props = self.sha_handlers[sha_str]
props[0](props[1], idx)
def activate_proxy(self, idx):
if not self.readers.has_key(idx):
return
@ -187,6 +237,7 @@ class SocksQueue:
reader.pauses = 0
# start sending file to proxy
# TODO: add timeout for stalled state
self.idlequeue.set_read_timeout(reader.fd, STALLED_TIMEOUT)
self.idlequeue.plug_idle(reader, True, False)
result = reader.write_next()
self.process_result(result, reader)
@ -243,7 +294,6 @@ class SocksQueue:
sock_hash, self, sock[0], sock[1][0], sock[1][1])
self.connected += 1
def process_result(self, result, actor):
''' Take appropriate actions upon the result:
[ 0, - 1 ] complete/end transfer
@ -263,11 +313,14 @@ class SocksQueue:
the number of active connections with 1'''
if idx != -1:
if self.readers.has_key(idx):
reader = self.readers[idx]
self.idlequeue.unplug_idle(reader.fd)
self.idlequeue.remove_timeout(reader.fd)
if do_disconnect:
self.readers[idx].disconnect()
reader.disconnect()
else:
if self.readers[idx].streamhost is not None:
self.readers[idx].streamhost['state'] = -1
if reader.streamhost is not None:
reader.streamhost['state'] = -1
del(self.readers[idx])
def remove_sender(self, idx, do_disconnect = True):
@ -414,17 +467,10 @@ class Socks5:
self.remaining_buff = buff[lenn:]
else:
self.remaining_buff = ''
if lenn == 0:
self.pauses +=1
else:
self.pauses = 0
if self.pauses > 24:
self.file_props['stalled'] = True
else:
self.file_props['stalled'] = False
self.state = 7 # continue to write in the socket
if lenn == 0 and self.file_props['stalled'] is False:
if lenn == 0:
return None
self.file_props['stalled'] = False
return lenn
else:
self.state = 8 # end connection
@ -495,13 +541,7 @@ class Socks5:
self.file_props['completed'] = True
return 0
# return number of read bytes. It can be used in progressbar
if fd == None:
self.pauses +=1
else:
self.pauses = 0
if self.pauses > 24:
self.file_props['stalled'] = True
else:
if fd != None:
self.file_props['stalled'] = False
if fd == None and self.file_props['stalled'] is False:
return None
@ -520,8 +560,10 @@ class Socks5:
# socket is already closed
pass
self.connected = False
self.idlequeue.remove_timeout(self.fd)
self.idlequeue.unplug_idle(self.fd)
self.fd = -1
self.state = -1
def _get_auth_buff(self):
''' Message, that we support 1 one auth mechanism:
@ -619,10 +661,16 @@ class Socks5Sender(Socks5, IdleObject):
# start waiting for data
self.idlequeue.plug_idle(self, False, True)
def read_timeout(self):
self.idlequeue.remove_timeout(self.fd)
self.file_props['stalled'] = True
self.queue.process_result(None, self)
def pollout(self):
if not self.connected:
self.queue.remove_sender(self.queue_idx)
self.disconnect()
return
self.idlequeue.remove_timeout(self.fd)
if self.state == 2: # send reply with desired auth type
self.send_raw(self._get_auth_response())
elif self.state == 4: # send positive response to the 'connect'
@ -634,9 +682,9 @@ class Socks5Sender(Socks5, IdleObject):
result = self.write_next()
self.queue.process_result(result, self)
if result is None or result <= 0:
self.queue.remove_sender(self.queue_idx)
self.disconnect()
elif self.state == 8:
self.queue.remove_sender(self.queue_idx)
self.disconnect()
return
else:
self.disconnect()
@ -647,10 +695,7 @@ class Socks5Sender(Socks5, IdleObject):
def pollend(self):
self.state = 8 # end connection
self.close_file()
self.file_props['error'] = -1
self.queue.process_result(-1, self)
self.queue.remove_sender(self.queue_idx)
self.disconnect()
def pollin(self):
if self.connected:
@ -667,7 +712,7 @@ class Socks5Sender(Socks5, IdleObject):
result = self.get_file_contents(0)
self.queue.process_result(result, self)
else:
self.queue.remove_sender(self.queue_idx)
self.disconnect()
def send_file(self):
''' start sending the file over verified connection '''
@ -718,6 +763,10 @@ class Socks5Sender(Socks5, IdleObject):
class Socks5Listener(IdleObject):
def __init__(self, idlequeue, host, port):
''' handle all incomming connections on (host, port)
This class implements IdleObject, but we will expect
only pollin events though
'''
self.host, self.port = host, port
self.queue_idx = -1
self.idlequeue = idlequeue
@ -743,19 +792,29 @@ class Socks5Listener(IdleObject):
self.idlequeue.plug_idle(self, False, True)
self.started = True
def pollend(self):
''' called when we stop listening on (host, port) '''
self.disconnect()
def pollin(self):
''' accept a new incomming connection and notify queue'''
sock = self.accept_conn()
self.queue.on_connection_accepted(sock)
def disconnect(self):
''' free all resources, we are not listening anymore '''
self.idlequeue.remove_timeout(self.fd)
self.idlequeue.unplug_idle(self.fd)
self.fd = -1
self.state = -1
self.started = False
try:
self._serv.close()
except:
pass
def accept_conn(self):
''' accepts a new incomming connection '''
_sock = self._serv.accept()
_sock[0].setblocking(False)
return _sock
@ -780,6 +839,16 @@ class Socks5Receiver(Socks5, IdleObject):
Socks5.__init__(self, idlequeue, streamhost['host'], int(streamhost['port']),
streamhost['initiator'], streamhost['target'], sid)
def read_timeout(self):
self.idlequeue.remove_timeout(self.fd)
if self.state > 5:
# no activity for foo seconds
self.file_props['stalled'] = True
self.queue.process_result(-1, self)
else:
self.queue.reconnect_receiver(self, self.streamhost )
self.idlequeue.unplug_idle(self.fd)
def connect(self):
''' create the socket and plug it to the idlequeue '''
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -789,10 +858,16 @@ class Socks5Receiver(Socks5, IdleObject):
self.state = 0 # about to be connected
self.idlequeue.plug_idle(self, True, False)
self.do_connect()
# TODO: add timeout for establishing connection
self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT)
return None
def _is_connected(self):
if self.state < 5:
return False
return True
def pollout(self):
self.idlequeue.remove_timeout(self.fd)
if self.state == 0:
self.do_connect()
return
@ -812,11 +887,13 @@ class Socks5Receiver(Socks5, IdleObject):
self.idlequeue.plug_idle(self, False, True)
def pollend(self):
self.file_props['error'] = -1
self.queue.process_result(-1, self)
self.queue.remove_receiver(self.queue_idx)
if self.state >= 5:
self.disconnect()
else:
self.queue.reconnect_receiver(self, self.streamhost)
def pollin(self):
self.idlequeue.remove_timeout(self.fd)
if self.connected:
if self.file_props['paused']:
return
@ -829,10 +906,7 @@ class Socks5Receiver(Socks5, IdleObject):
result = self.get_file_contents(0)
self.queue.process_result(result, self)
else:
self.queue.remove_receiver(self.queue_idx)
def read_timeout(self, fd):
self.disconnect()
self.disconnect()
def do_connect(self):
try:
@ -859,6 +933,8 @@ class Socks5Receiver(Socks5, IdleObject):
self.file_props['connected'] = True
self.file_props['disconnect_cb'] = self.disconnect
self.state = 1 # connected
# stop all others connections to sender's streamhosts
self.queue._socket_connected(self.streamhost, self.file_props)
self.idlequeue.plug_idle(self, True, False)
return 1 # we are connected
@ -866,15 +942,19 @@ class Socks5Receiver(Socks5, IdleObject):
def main(self, timeout = 0):
''' begin negotiation. on success 'address' != 0 '''
result = 1
buff = self.receive()
if buff == '':
# end connection
self.pollend()
return
if self.state == 2: # read auth response
buff = self.receive()
if buff is None or len(buff) != 2:
return None
version, method = struct.unpack('!BB', buff[:2])
if version != 0x05 or method == 0xff:
self.disconnect()
elif self.state == 4: # get approve of our request
buff = self.receive()
if buff == None:
return None
sub_buff = buff[:4]
@ -915,6 +995,7 @@ class Socks5Receiver(Socks5, IdleObject):
self.file_props['received-len'] = 0
self.pauses = 0
# start sending file contents to socket
self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT)
self.idlequeue.plug_idle(self, True, False)
else:
# receiving file contents from socket