paused and stalled states now work

This commit is contained in:
Dimitur Kirov 2006-02-12 02:25:21 +00:00
parent f84e410e5b
commit 8448be52f5
2 changed files with 69 additions and 29 deletions

View File

@ -44,7 +44,15 @@ STALLED_TIMEOUT = 10
# after foo seconds of waiting to connect, disconnect from # after foo seconds of waiting to connect, disconnect from
# streamhost and try next one # streamhost and try next one
CONNECT_TIMEOUT = 10 CONNECT_TIMEOUT = 30
# nothing received for the last foo seconds - stop transfer
# if it is 0, then transfer will wait forever
READ_TIMEOUT = 180
# nothing sent for the last foo seconds - stop transfer
# if it is 0, then transfer will wait forever
SEND_TIMEOUT = 180
class SocksQueue: class SocksQueue:
''' queue for all file requests objects ''' ''' queue for all file requests objects '''
@ -236,7 +244,6 @@ class SocksQueue:
reader.file_props['received-len'] = 0 reader.file_props['received-len'] = 0
reader.pauses = 0 reader.pauses = 0
# start sending file to proxy # start sending file to proxy
# TODO: add timeout for stalled state
self.idlequeue.set_read_timeout(reader.fd, STALLED_TIMEOUT) self.idlequeue.set_read_timeout(reader.fd, STALLED_TIMEOUT)
self.idlequeue.plug_idle(reader, True, False) self.idlequeue.plug_idle(reader, True, False)
result = reader.write_next() result = reader.write_next()
@ -304,6 +311,8 @@ class SocksQueue:
return return
if result in (0, -1) and self.complete_transfer_cb is not None: if result in (0, -1) and self.complete_transfer_cb is not None:
account = actor.account account = actor.account
if account is None and actor.file_props.has_key('tt_account'):
account = actor.file_props['tt_account']
self.complete_transfer_cb(account, actor.file_props) self.complete_transfer_cb(account, actor.file_props)
elif self.progress_transfer_cb is not None: elif self.progress_transfer_cb is not None:
self.progress_transfer_cb(actor.account, actor.file_props) self.progress_transfer_cb(actor.account, actor.file_props)
@ -507,18 +516,18 @@ class Socks5:
buff = self._recv(MAX_BUFF_LEN) buff = self._recv(MAX_BUFF_LEN)
except Exception, e: except Exception, e:
buff = '' buff = ''
first_byte = False
if self.file_props['received-len'] == 0:
if len(buff) > 0:
# delimiter between auth and data
if ord(buff[0]) == 0xD:
first_byte = True
buff = buff[1:]
current_time = self.idlequeue.current_time() current_time = self.idlequeue.current_time()
self.file_props['elapsed-time'] += current_time - \ self.file_props['elapsed-time'] += current_time - \
self.file_props['last-time'] self.file_props['last-time']
self.file_props['last-time'] = current_time self.file_props['last-time'] = current_time
self.file_props['received-len'] += len(buff) self.file_props['received-len'] += len(buff)
if len(buff) == 0:
# Transfer stopped somehow:
# reset, paused or network error
self.rem_fd(fd)
self.disconnect(False)
self.file_props['error'] = -1
return 0
try: try:
fd.write(buff) fd.write(buff)
except IOError, e: except IOError, e:
@ -526,13 +535,6 @@ class Socks5:
self.disconnect(False) self.disconnect(False)
self.file_props['error'] = -6 # file system error self.file_props['error'] = -6 # file system error
return 0 return 0
if len(buff) == 0 and first_byte is False:
# Transfer stopped somehow:
# reset, paused or network error
self.rem_fd(fd)
self.disconnect(False)
self.file_props['error'] = -1
return 0
if self.file_props['received-len'] >= int(self.file_props['size']): if self.file_props['received-len'] >= int(self.file_props['size']):
# transfer completed # transfer completed
self.rem_fd(fd) self.rem_fd(fd)
@ -554,14 +556,15 @@ class Socks5:
''' Closes open descriptors and remover socket descr. from idleque ''' ''' Closes open descriptors and remover socket descr. from idleque '''
# be sure that we don't leave open file # be sure that we don't leave open file
self.close_file() self.close_file()
self.idlequeue.remove_timeout(self.fd)
self.idlequeue.unplug_idle(self.fd)
try: try:
self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close() self._sock.close()
except: except:
# socket is already closed # socket is already closed
pass pass
self.connected = False self.connected = False
self.idlequeue.remove_timeout(self.fd)
self.idlequeue.unplug_idle(self.fd)
self.fd = -1 self.fd = -1
self.state = -1 self.state = -1
@ -636,6 +639,14 @@ class Socks5:
if version != 0x05 or method == 0xff: if version != 0x05 or method == 0xff:
self.disconnect() self.disconnect()
def continue_paused_transfer(self):
if self.state < 5:
return
if self.file_props['type'] == 'r':
self.idlequeue.plug_idle(self, False, True)
else:
self.idlequeue.plug_idle(self, True, False)
def _get_sha1_auth(self): def _get_sha1_auth(self):
''' get sha of sid + Initiator jid + Target jid ''' ''' get sha of sid + Initiator jid + Target jid '''
if self.file_props.has_key('is_a_proxy'): if self.file_props.has_key('is_a_proxy'):
@ -663,8 +674,16 @@ class Socks5Sender(Socks5, IdleObject):
def read_timeout(self): def read_timeout(self):
self.idlequeue.remove_timeout(self.fd) self.idlequeue.remove_timeout(self.fd)
self.file_props['stalled'] = True if self.state > 5:
self.queue.process_result(None, self) # no activity for foo seconds
if self.file_props['stalled'] == False:
self.file_props['stalled'] = True
self.queue.process_result(-1, self)
if SEND_TIMEOUT > 0:
self.idlequeue.set_read_timeout(self.fd, SEND_TIMEOUT)
else:
# stop transfer, there is no error code for this
self.pollend()
def pollout(self): def pollout(self):
if not self.connected: if not self.connected:
@ -677,12 +696,15 @@ class Socks5Sender(Socks5, IdleObject):
self.send_raw(self._get_request_buff(self.sha_msg, 0x00)) self.send_raw(self._get_request_buff(self.sha_msg, 0x00))
elif self.state == 7: elif self.state == 7:
if self.file_props['paused']: if self.file_props['paused']:
# TODO: better way is to remove it from idlequeue self.file_props['continue_cb'] = self.continue_paused_transfer
self.idlequeue.plug_idle(self, False, False)
return return
result = self.write_next() result = self.write_next()
self.queue.process_result(result, self) self.queue.process_result(result, self)
if result is None or result <= 0: if result is None or result <= 0:
self.disconnect() self.disconnect()
return
self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT)
elif self.state == 8: elif self.state == 8:
self.disconnect() self.disconnect()
return return
@ -696,6 +718,8 @@ class Socks5Sender(Socks5, IdleObject):
def pollend(self): def pollend(self):
self.state = 8 # end connection self.state = 8 # end connection
self.disconnect() self.disconnect()
self.file_props['error'] = -1
self.queue.process_result(-1, self)
def pollin(self): def pollin(self):
if self.connected: if self.connected:
@ -721,6 +745,7 @@ class Socks5Sender(Socks5, IdleObject):
self.file_props['started'] = True self.file_props['started'] = True
self.file_props['completed'] = False self.file_props['completed'] = False
self.file_props['paused'] = False self.file_props['paused'] = False
self.file_props['continue_cb'] = self.continue_paused_transfer
self.file_props['stalled'] = False self.file_props['stalled'] = False
self.file_props['connected'] = True self.file_props['connected'] = True
self.file_props['elapsed-time'] = 0 self.file_props['elapsed-time'] = 0
@ -835,6 +860,7 @@ class Socks5Receiver(Socks5, IdleObject):
self.file_props['started'] = True self.file_props['started'] = True
self.file_props['completed'] = False self.file_props['completed'] = False
self.file_props['paused'] = False self.file_props['paused'] = False
self.file_props['continue_cb'] = self.continue_paused_transfer
self.file_props['stalled'] = False self.file_props['stalled'] = False
Socks5.__init__(self, idlequeue, streamhost['host'], int(streamhost['port']), Socks5.__init__(self, idlequeue, streamhost['host'], int(streamhost['port']),
streamhost['initiator'], streamhost['target'], sid) streamhost['initiator'], streamhost['target'], sid)
@ -843,11 +869,18 @@ class Socks5Receiver(Socks5, IdleObject):
self.idlequeue.remove_timeout(self.fd) self.idlequeue.remove_timeout(self.fd)
if self.state > 5: if self.state > 5:
# no activity for foo seconds # no activity for foo seconds
self.file_props['stalled'] = True if self.file_props['stalled'] == False:
self.queue.process_result(-1, self) self.file_props['stalled'] = True
if not self.file_props.has_key('received-len'):
self.file_props['received-len'] = 0
self.queue.process_result(-1, self)
if READ_TIMEOUT > 0:
self.idlequeue.set_read_timeout(self.fd, READ_TIMEOUT)
else:
# stop transfer, there is no error code for this
self.pollend()
else: else:
self.queue.reconnect_receiver(self, self.streamhost ) self.queue.reconnect_receiver(self, self.streamhost)
self.idlequeue.unplug_idle(self.fd)
def connect(self): def connect(self):
''' create the socket and plug it to the idlequeue ''' ''' create the socket and plug it to the idlequeue '''
@ -865,7 +898,6 @@ class Socks5Receiver(Socks5, IdleObject):
if self.state < 5: if self.state < 5:
return False return False
return True return True
def pollout(self): def pollout(self):
self.idlequeue.remove_timeout(self.fd) self.idlequeue.remove_timeout(self.fd)
if self.state == 0: if self.state == 0:
@ -876,8 +908,8 @@ class Socks5Receiver(Socks5, IdleObject):
elif self.state == 3: # send 'connect' request elif self.state == 3: # send 'connect' request
self.send_raw(self._get_request_buff(self._get_sha1_auth())) self.send_raw(self._get_request_buff(self._get_sha1_auth()))
elif self.file_props['type'] != 'r': elif self.file_props['type'] != 'r':
# TODO: better way to handle paused state
if self.file_props['paused'] == True: if self.file_props['paused'] == True:
self.idlequeue.plug_idle(self, False, False)
return return
result = self.write_next() result = self.write_next()
self.queue.process_result(result, self) self.queue.process_result(result, self)
@ -885,10 +917,14 @@ class Socks5Receiver(Socks5, IdleObject):
self.state += 1 self.state += 1
# unplug and plug for reading # unplug and plug for reading
self.idlequeue.plug_idle(self, False, True) self.idlequeue.plug_idle(self, False, True)
self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT)
def pollend(self): def pollend(self):
if self.state >= 5: if self.state >= 5:
# error during transfer
self.disconnect() self.disconnect()
self.file_props['error'] = -1
self.queue.process_result(-1, self)
else: else:
self.queue.reconnect_receiver(self, self.streamhost) self.queue.reconnect_receiver(self, self.streamhost)
@ -896,15 +932,19 @@ class Socks5Receiver(Socks5, IdleObject):
self.idlequeue.remove_timeout(self.fd) self.idlequeue.remove_timeout(self.fd)
if self.connected: if self.connected:
if self.file_props['paused']: if self.file_props['paused']:
self.idlequeue.plug_idle(self, False, False)
return return
if self.state < 5: if self.state < 5:
self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT)
result = self.main(0) result = self.main(0)
self.queue.process_result(result, self) self.queue.process_result(result, self)
elif self.state == 5: # wait for proxy reply elif self.state == 5: # wait for proxy reply
pass pass
elif self.file_props['type'] == 'r': elif self.file_props['type'] == 'r':
self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT)
result = self.get_file_contents(0) result = self.get_file_contents(0)
self.queue.process_result(result, self) self.queue.process_result(result, self)
else: else:
self.disconnect() self.disconnect()
@ -1000,9 +1040,8 @@ class Socks5Receiver(Socks5, IdleObject):
else: else:
# receiving file contents from socket # receiving file contents from socket
self.idlequeue.plug_idle(self, False, True) self.idlequeue.plug_idle(self, False, True)
self.file_props['continue_cb'] = self.continue_paused_transfer
# we have set up the connection, next - retrieve file # we have set up the connection, next - retrieve file
# TODO: add timeout for stalled state
self.state = 6 self.state = 6
if self.state < 5: if self.state < 5:
self.idlequeue.plug_idle(self, True, False) self.idlequeue.plug_idle(self, True, False)

View File

@ -783,6 +783,7 @@ _('Connection with peer cannot be established.'))
types = {'r' : 'download', 's' : 'upload'} types = {'r' : 'download', 's' : 'upload'}
self.set_status(file_props['type'], file_props['sid'], types[sid[0]]) self.set_status(file_props['type'], file_props['sid'], types[sid[0]])
self.toggle_pause_continue(True) self.toggle_pause_continue(True)
file_props['continue_cb']()
elif self.is_transfer_active(file_props): elif self.is_transfer_active(file_props):
file_props['paused'] = True file_props['paused'] = True
self.set_status(file_props['type'], file_props['sid'], 'pause') self.set_status(file_props['type'], file_props['sid'], 'pause')