proper disconnect routines
This commit is contained in:
parent
8393d9e5fd
commit
73c5eb3dd2
|
@ -523,16 +523,27 @@ class Connection:
|
|||
streamhost = query.getTag('streamhost-used')
|
||||
except: # this bytestream result is not what we need
|
||||
pass
|
||||
if streamhost is None:
|
||||
# proxy approves the activate query
|
||||
raise common.xmpp.NodeProcessed
|
||||
jid = streamhost.getAttr('jid')
|
||||
id = real_id[3:]
|
||||
if self.files_props.has_key(id):
|
||||
file_props = self.files_props[id]
|
||||
else:
|
||||
raise common.xmpp.NodeProcessed
|
||||
file_props['type']
|
||||
if streamhost is None:
|
||||
# proxy approves the activate query
|
||||
if real_id[:3] == 'au_':
|
||||
id = real_id[3:]
|
||||
if not file_props.has_key('streamhost-used') or \
|
||||
file_props['streamhost-used'] is False:
|
||||
raise common.xmpp.NodeProcessed
|
||||
if not file_props.has_key('proxyhosts'):
|
||||
raise common.xmpp.NodeProcessed
|
||||
for host in file_props['proxyhosts']:
|
||||
if host['initiator'] == frm and \
|
||||
str(query.getAttr('sid')) == file_props['sid']:
|
||||
gajim.socks5queue.activate_proxy(host['idx'])
|
||||
break
|
||||
raise common.xmpp.NodeProcessed
|
||||
jid = streamhost.getAttr('jid')
|
||||
if file_props.has_key('streamhost-used') and \
|
||||
file_props['streamhost-used'] is True:
|
||||
raise common.xmpp.NodeProcessed
|
||||
|
@ -569,6 +580,39 @@ class Connection:
|
|||
|
||||
raise common.xmpp.NodeProcessed
|
||||
|
||||
def remove_all_transfers(self):
|
||||
''' stops and removes all active connections from the socks5 pool '''
|
||||
for file_props in self.files_props.values():
|
||||
self.remove_transfer(file_props, remove_from_list = False)
|
||||
del(self.files_props)
|
||||
self.files_props = {}
|
||||
|
||||
def remove_transfer(self, file_props, remove_from_list = True):
|
||||
if file_props.has_key('hash'):
|
||||
gajim.socks5queue.remove_sender(file_props['hash'])
|
||||
|
||||
if file_props.has_key('streamhosts'):
|
||||
for host in file_props['streamhosts']:
|
||||
if host['idx'] > 0:
|
||||
gajim.socks5queue.remove_receiver(host['idx'])
|
||||
gajim.socks5queue.remove_sender(host['idx'])
|
||||
sid = file_props['sid']
|
||||
gajim.socks5queue.remove_file_props(self.name, sid)
|
||||
|
||||
if remove_from_list:
|
||||
if self.files_props.has_key('sid'):
|
||||
del(self.files_props['sid'])
|
||||
|
||||
def disconnect_transfer(self, file_props):
|
||||
if file_props.has_key('hash'):
|
||||
gajim.socks5queue.remove_sender(file_props['hash'])
|
||||
|
||||
if file_props.has_key('streamhosts'):
|
||||
for host in file_props['streamhosts']:
|
||||
if host['idx'] > 0:
|
||||
gajim.socks5queue.remove_receiver(host['idx'])
|
||||
gajim.socks5queue.remove_sender(host['idx'])
|
||||
|
||||
def proxy_auth_ok(self, proxy):
|
||||
'''cb, called after authentication to proxy server '''
|
||||
file_props = self.files_props[proxy['sid']]
|
||||
|
@ -1384,6 +1428,7 @@ class Connection:
|
|||
p = self.add_sha(p)
|
||||
if msg:
|
||||
p.setStatus(msg)
|
||||
self.remove_all_transfers()
|
||||
if self.connection:
|
||||
self.connection.send(p)
|
||||
try:
|
||||
|
|
|
@ -35,7 +35,7 @@ class SocksQueue:
|
|||
self.readers = {}
|
||||
self.files_props = {}
|
||||
self.senders = {}
|
||||
self.idx = 0
|
||||
self.idx = 1
|
||||
self.listener = None
|
||||
self.sha_handlers = {}
|
||||
self.complete_transfer_cb = complete_transfer_cb
|
||||
|
@ -60,16 +60,16 @@ class SocksQueue:
|
|||
for proxy in file_props['proxyhosts']:
|
||||
if proxy == streamhost:
|
||||
self.on_success(streamhost)
|
||||
return True
|
||||
return False
|
||||
return 2
|
||||
return 0
|
||||
if file_props.has_key('streamhosts'):
|
||||
for host in file_props['streamhosts']:
|
||||
if streamhost['state'] == 1 and host != streamhost:
|
||||
return False
|
||||
if streamhost['state'] == 1:
|
||||
return 0
|
||||
streamhost['state'] = 1
|
||||
self.on_success(streamhost)
|
||||
return True
|
||||
return False
|
||||
return 1
|
||||
return 0
|
||||
|
||||
def connect_to_hosts(self, account, sid, on_success = None,
|
||||
on_failure = None):
|
||||
|
@ -144,7 +144,29 @@ class SocksQueue:
|
|||
if self.sha_handlers.has_key(sha_str):
|
||||
props = self.sha_handlers[sha_str]
|
||||
props[0](props[1], idx)
|
||||
|
||||
def activate_proxy(self, idx):
|
||||
if not self.readers.has_key(idx):
|
||||
return
|
||||
reader = self.readers[idx]
|
||||
if reader.file_props['type'] != 's':
|
||||
return
|
||||
if reader.state != 5:
|
||||
return
|
||||
reader.state = 6
|
||||
if reader.connected:
|
||||
reader.fd = open(reader.file_props['file-name'])
|
||||
reader.file_props['error'] = 0
|
||||
reader.file_props['disconnect_cb'] = reader.disconnect
|
||||
reader.file_props['started'] = True
|
||||
reader.file_props['completed'] = False
|
||||
reader.file_props['paused'] = False
|
||||
reader.file_props['stalled'] = False
|
||||
reader.file_props['received-len'] = 0
|
||||
reader.pauses = 0
|
||||
reader.send_raw(reader._get_nl_byte())
|
||||
result = reader.write_next()
|
||||
self.process_result(result, reader)
|
||||
|
||||
def send_file(self, file_props, account):
|
||||
if file_props.has_key('hash') and \
|
||||
self.senders.has_key(file_props['hash']):
|
||||
|
@ -170,6 +192,15 @@ class SocksQueue:
|
|||
if not self.files_props.has_key(account):
|
||||
self.files_props[account] = {}
|
||||
self.files_props[account][_id] = file_props
|
||||
|
||||
def remove_file_props(self, account, sid):
|
||||
if self.files_props.has_key(account):
|
||||
fl_props = self.files_props[account]
|
||||
if fl_props.has_key(sid):
|
||||
del(fl_props[sid])
|
||||
|
||||
if len(self.files_props) == 0:
|
||||
self.connected = 0
|
||||
|
||||
def get_file_props(self, account, sid):
|
||||
''' get fil_prop by account name and session id '''
|
||||
|
@ -240,6 +271,8 @@ class SocksQueue:
|
|||
if pd:
|
||||
result = receiver.main(0)
|
||||
self.process_result(result, receiver)
|
||||
elif receiver.state == 5: # wait for proxy reply
|
||||
pass
|
||||
else:
|
||||
if receiver.file_props['type'] == 'r':
|
||||
result = receiver.get_file_contents(timeout)
|
||||
|
@ -266,23 +299,29 @@ class SocksQueue:
|
|||
self.progress_transfer_cb(actor.account,
|
||||
actor.file_props)
|
||||
|
||||
def remove_receiver(self, idx):
|
||||
def remove_receiver(self, idx, do_disconnect = True):
|
||||
''' Remove reciver from the list and decrease
|
||||
the number of active connections with 1'''
|
||||
if idx != -1:
|
||||
if self.readers.has_key(idx):
|
||||
if self.readers[idx].streamhost is not None:
|
||||
self.readers[idx].streamhost['state'] = -1
|
||||
del(self.readers[idx])
|
||||
if do_disconnect:
|
||||
self.readers[idx].disconnect()
|
||||
else:
|
||||
if self.readers[idx].streamhost is not None:
|
||||
self.readers[idx].streamhost['state'] = -1
|
||||
del(self.readers[idx])
|
||||
|
||||
def remove_sender(self, idx):
|
||||
def remove_sender(self, idx, do_disconnect = True):
|
||||
''' Remove reciver from the list of senders and decrease the
|
||||
number of active connections with 1'''
|
||||
if idx != -1:
|
||||
if self.senders.has_key(idx):
|
||||
del(self.senders[idx])
|
||||
if self.connected > 0:
|
||||
self.connected -= 1
|
||||
if do_disconnect:
|
||||
self.senders[idx].disconnect()
|
||||
else:
|
||||
del(self.senders[idx])
|
||||
if self.connected > 0:
|
||||
self.connected -= 1
|
||||
|
||||
class Socks5:
|
||||
def __init__(self, host, port, initiator, target, sid):
|
||||
|
@ -649,13 +688,16 @@ class Socks5Sender(Socks5):
|
|||
def disconnect(self, cb = True):
|
||||
''' Closes the socket. '''
|
||||
# close connection and remove us from the queue
|
||||
self._sock.close()
|
||||
try:
|
||||
self._sock.close()
|
||||
except:
|
||||
pass
|
||||
self.connected = False
|
||||
if self.file_props is not None:
|
||||
self.file_props['connected'] = False
|
||||
self.file_props['disconnect_cb'] = None
|
||||
if self.queue is not None:
|
||||
self.queue.remove_sender(self.queue_idx)
|
||||
self.queue.remove_sender(self.queue_idx, False)
|
||||
|
||||
class Socks5Listener:
|
||||
def __init__(self, host, port):
|
||||
|
@ -740,12 +782,14 @@ class Socks5Receiver(Socks5):
|
|||
self.buff = ''
|
||||
self.connected = True
|
||||
self.file_props['connected'] = True
|
||||
self.file_props['disconnect_cb'] = self.disconnect
|
||||
self.state = 1 # connected
|
||||
self.queue._socket_connected(self.streamhost, self.file_props)
|
||||
return 1 # we are connected
|
||||
|
||||
def main(self, timeout = 0):
|
||||
''' begin negotiation. on success 'address' != 0 '''
|
||||
result = 1
|
||||
if self.state == 1: # send initially: version and auth types
|
||||
self.send_raw(self._get_auth_buff())
|
||||
elif self.state == 2: # read auth response
|
||||
|
@ -779,11 +823,14 @@ class Socks5Receiver(Socks5):
|
|||
self.remaining_buff = buff[addrlen + 7:]
|
||||
self.state = 5 # for senders: init file_props and send '\n'
|
||||
if self.queue.on_success:
|
||||
if not self.queue.send_success_reply(self.file_props,
|
||||
self.streamhost):
|
||||
result = self.queue.send_success_reply(self.file_props,
|
||||
self.streamhost)
|
||||
if result == 0:
|
||||
self.state = 8
|
||||
self.disconnect()
|
||||
if self.state == 5: # for senders: init file_props and send '\n'
|
||||
|
||||
# for senders: init file_props and send '\n'
|
||||
if result == 1 and self.state == 5:
|
||||
if self.file_props['type'] == 's':
|
||||
self.fd = open(self.file_props['file-name'])
|
||||
self.file_props['error'] = 0
|
||||
|
@ -796,7 +843,7 @@ class Socks5Receiver(Socks5):
|
|||
self.pauses = 0
|
||||
self.send_raw(self._get_nl_byte())
|
||||
self.state = 6 # send/get file contents
|
||||
if self.state < 6:
|
||||
if self.state < 5:
|
||||
self.state += 1
|
||||
return None
|
||||
# we have set up the connection, next - retrieve file
|
||||
|
@ -818,9 +865,12 @@ class Socks5Receiver(Socks5):
|
|||
''' Closes the socket. '''
|
||||
# close connection and remove us from the queue
|
||||
if self._sock:
|
||||
self._sock.close()
|
||||
try:
|
||||
self._sock.close()
|
||||
except:
|
||||
pass
|
||||
self.connected = False
|
||||
if cb is True:
|
||||
self.file_props['disconnect_cb'] = None
|
||||
if self.queue is not None:
|
||||
self.queue.remove_receiver(self.queue_idx)
|
||||
self.queue.remove_receiver(self.queue_idx, False)
|
||||
|
|
Loading…
Reference in New Issue