connect to candidate with highest priority
This commit is contained in:
parent
f4cc439a7e
commit
609ee79791
2 changed files with 137 additions and 103 deletions
|
@ -342,6 +342,10 @@ class JingleFileTransfer(JingleContent):
|
|||
# send error message, notify the user
|
||||
return
|
||||
def isOurCandUsed(self):
|
||||
'''
|
||||
If this method returns true then the candidate we nominated will be
|
||||
used, if false, the candidate nominated by peer will be used
|
||||
'''
|
||||
|
||||
if self.nominated_cand['peer-cand'] == False:
|
||||
return True
|
||||
|
@ -352,32 +356,24 @@ class JingleFileTransfer(JingleContent):
|
|||
our_pr = int(self.nominated_cand['our-cand']['priority'])
|
||||
|
||||
if peer_pr != our_pr:
|
||||
if peer_pr > our_pr:
|
||||
# Choose peer host
|
||||
return False
|
||||
else:
|
||||
# Choose our host
|
||||
return True
|
||||
return peer_pr > our_pr
|
||||
else:
|
||||
if self.weinitiate:
|
||||
# Choose our host
|
||||
return True
|
||||
else:
|
||||
# Choose peer host
|
||||
return False
|
||||
|
||||
return self.weinitiate
|
||||
|
||||
|
||||
def start_transfer(self, streamhost_used):
|
||||
|
||||
self.state = STATE_TRANSFERING
|
||||
|
||||
# It tells wether we start the transfer as client or server
|
||||
type = None
|
||||
|
||||
if self.isOurCandUsed():
|
||||
print 'our'
|
||||
type = 'client'
|
||||
else:
|
||||
print 'peer'
|
||||
|
||||
type = 'server'
|
||||
|
||||
print type
|
||||
# FIXME if streamhost_used is none where do we get the proxy host
|
||||
if streamhost_used and streamhost_used['type'] == 'proxy':
|
||||
self.file_props['streamhost-used'] = True
|
||||
|
@ -401,7 +397,7 @@ class JingleFileTransfer(JingleContent):
|
|||
else:
|
||||
jid = gajim.get_jid_without_resource(self.session.ourjid)
|
||||
gajim.socks5queue.send_file(self.file_props,
|
||||
self.session.connection.name)
|
||||
self.session.connection.name, type)
|
||||
|
||||
def get_content(desc):
|
||||
return JingleFileTransfer
|
||||
|
|
|
@ -152,11 +152,9 @@ class SocksQueue:
|
|||
_sock=None, host=str(streamhost['host']),
|
||||
port=int(streamhost['port']), fingerprint=fp,
|
||||
connected=False, file_props=file_props)
|
||||
#socks5obj.file_props = file_props
|
||||
socks5obj.streamhost = streamhost
|
||||
self.add_sockobj(account, socks5obj, type='sender')
|
||||
|
||||
socks5obj.file_props = file_props
|
||||
streamhost['idx'] = socks5obj.queue_idx
|
||||
|
||||
def _socket_connected(self, streamhost, file_props):
|
||||
|
@ -242,12 +240,13 @@ class SocksQueue:
|
|||
|
||||
def add_sockobj(self, account, sockobj, type='receiver'):
|
||||
"""
|
||||
Add new file a sockobj type receiver or sendder
|
||||
Add new file a sockobj type receiver or sender, and use it to connect
|
||||
to server
|
||||
"""
|
||||
if type == 'receiver':
|
||||
self.readers[self.idx] = sockobj
|
||||
self._add(sockobj, self.readers, sockobj.file_props, self.idx)
|
||||
else:
|
||||
self.senders[self.idx] = sockobj
|
||||
self._add(sockobj, self.senders, sockobj.file_props, self.idx)
|
||||
sockobj.queue_idx = self.idx
|
||||
sockobj.queue = self
|
||||
sockobj.account = account
|
||||
|
@ -260,57 +259,66 @@ class SocksQueue:
|
|||
return 1
|
||||
return None
|
||||
|
||||
|
||||
def _add(self, sockobj, sockobjects, fp, hash):
|
||||
'''
|
||||
Adds the sockobj to the current list of sockobjects
|
||||
'''
|
||||
keys = (fp['sid'], fp['name'], hash)
|
||||
sockobjects[keys] = sockobj
|
||||
|
||||
|
||||
def result_sha(self, sha_str, idx):
|
||||
if sha_str in self.sha_handlers:
|
||||
props = self.sha_handlers[sha_str]
|
||||
props[0](props[1], idx)
|
||||
|
||||
def activate_proxy(self, idx):
|
||||
if idx not in self.readers:
|
||||
if not self.isHashInSockObjs(self.readers, idx):
|
||||
return
|
||||
reader = self.readers[idx]
|
||||
if reader.file_props['type'] != 's':
|
||||
return
|
||||
if reader.state != 5:
|
||||
return
|
||||
reader.state = 6
|
||||
if reader.connected:
|
||||
reader.file_props['error'] = 0
|
||||
reader.file_props['disconnect_cb'] = reader.disconnect
|
||||
reader.file_props['started'] = True
|
||||
reader.file_props['completed'] = False
|
||||
reader.file_props['paused'] = False
|
||||
reader.file_props['stalled'] = False
|
||||
reader.file_props['elapsed-time'] = 0
|
||||
reader.file_props['last-time'] = self.idlequeue.current_time()
|
||||
reader.file_props['received-len'] = 0
|
||||
reader.pauses = 0
|
||||
# start sending file to proxy
|
||||
self.idlequeue.set_read_timeout(reader.fd, STALLED_TIMEOUT)
|
||||
self.idlequeue.plug_idle(reader, True, False)
|
||||
result = reader.write_next()
|
||||
self.process_result(result, reader)
|
||||
for key in self.readers.keys():
|
||||
if idx in key:
|
||||
reader = self.readers[key]
|
||||
if reader.file_props['type'] != 's':
|
||||
return
|
||||
if reader.state != 5:
|
||||
return
|
||||
reader.state = 6
|
||||
if reader.connected:
|
||||
reader.file_props['error'] = 0
|
||||
reader.file_props['disconnect_cb'] = reader.disconnect
|
||||
reader.file_props['started'] = True
|
||||
reader.file_props['completed'] = False
|
||||
reader.file_props['paused'] = False
|
||||
reader.file_props['stalled'] = False
|
||||
reader.file_props['elapsed-time'] = 0
|
||||
reader.file_props['last-time'] = self.idlequeue.current_time()
|
||||
reader.file_props['received-len'] = 0
|
||||
reader.pauses = 0
|
||||
# start sending file to proxy
|
||||
self.idlequeue.set_read_timeout(reader.fd, STALLED_TIMEOUT)
|
||||
self.idlequeue.plug_idle(reader, True, False)
|
||||
result = reader.write_next()
|
||||
self.process_result(result, reader)
|
||||
|
||||
def send_file(self, file_props, account):
|
||||
if 'hash' in file_props and file_props['hash'] in self.senders:
|
||||
log.info("socks5: sending file")
|
||||
sender = self.senders[file_props['hash']]
|
||||
file_props['streamhost-used'] = True
|
||||
sender.account = account
|
||||
if file_props['type'] == 's':
|
||||
sender.file_props = file_props
|
||||
result = sender.send_file()
|
||||
self.process_result(result, sender)
|
||||
else:
|
||||
file_props['elapsed-time'] = 0
|
||||
file_props['last-time'] = self.idlequeue.current_time()
|
||||
file_props['received-len'] = 0
|
||||
sender.file_props = file_props
|
||||
def send_file(self, file_props, account, type):
|
||||
for key in self.senders.keys():
|
||||
if file_props['name'] in key and file_props['sid'] in key \
|
||||
and self.senders[key].mode == type:
|
||||
|
||||
log.info("socks5: sending file")
|
||||
sender = self.senders[key]
|
||||
file_props['streamhost-used'] = True
|
||||
sender.account = account
|
||||
if file_props['type'] == 's':
|
||||
sender.file_props = file_props
|
||||
result = sender.send_file()
|
||||
self.process_result(result, sender)
|
||||
else:
|
||||
file_props['elapsed-time'] = 0
|
||||
file_props['last-time'] = self.idlequeue.current_time()
|
||||
file_props['received-len'] = 0
|
||||
sender.file_props = file_props
|
||||
|
||||
else:
|
||||
|
||||
log.info("socks5: NOT sending file")
|
||||
|
||||
def add_file_props(self, account, file_props):
|
||||
"""
|
||||
|
@ -346,30 +354,49 @@ class SocksQueue:
|
|||
if sid in fl_props:
|
||||
return fl_props[sid]
|
||||
return None
|
||||
|
||||
def isHashInSockObjs(self, sockobjs, hash):
|
||||
'''
|
||||
It tells wether there is a particular hash in sockobjs or not
|
||||
'''
|
||||
for key in sockobjs:
|
||||
if hash in key:
|
||||
return True
|
||||
return False
|
||||
|
||||
def on_connection_accepted(self, sock, listener):
|
||||
sock_hash = sock.__hash__()
|
||||
if self.type == 'sender' and (sock_hash not in self.senders):
|
||||
self.senders[sock_hash] = Socks5Sender(self.idlequeue, sock_hash, self,
|
||||
'server', sock[0], sock[1][0], sock[1][1],
|
||||
fingerprint='server', file_props=listener.file_props)
|
||||
if self.type == 'sender' and \
|
||||
not self.isHashInSockObjs(self.senders, sock_hash):
|
||||
|
||||
sockobj = Socks5Sender(self.idlequeue, sock_hash, self,
|
||||
'server', sock[0], sock[1][0], sock[1][1],
|
||||
fingerprint='server', file_props=listener.file_props)
|
||||
self._add(sockobj, self.senders, listener.file_props, sock_hash)
|
||||
# Start waiting for data
|
||||
self.idlequeue.plug_idle(self.senders[sock_hash], False, True)
|
||||
self.idlequeue.plug_idle(sockobj, False, True)
|
||||
self.connected += 1
|
||||
|
||||
if self.type == 'receiver' and (sock_hash not in self.readers):
|
||||
if self.type == 'receiver' and \
|
||||
not self.isHashInSockObjs(self.readers, sock_hash):
|
||||
sh = {}
|
||||
sh['host'] = sock[1][0]
|
||||
sh['port'] = sock[1][1]
|
||||
sh['initiator'] = None
|
||||
sh['target'] = None
|
||||
self.readers[sock_hash] = Socks5Receiver(idlequeue=self.idlequeue,
|
||||
streamhost=sh,sid=None, file_props=None,
|
||||
|
||||
sockobj = Socks5Receiver(idlequeue=self.idlequeue,
|
||||
streamhost=sh,sid=None,
|
||||
file_props=listener.file_props,
|
||||
mode='server',fingerprint='server')
|
||||
self.readers[sock_hash].set_sock(sock[0])
|
||||
self.readers[sock_hash].queue = self
|
||||
|
||||
self._add(sockobj, self.readers, listener.file_props, sock_hash)
|
||||
|
||||
sockobj.set_sock(sock[0])
|
||||
sockobj.queue = self
|
||||
self.connected += 1
|
||||
|
||||
|
||||
def process_result(self, result, actor):
|
||||
"""
|
||||
Take appropriate actions upon the result:
|
||||
|
@ -393,16 +420,17 @@ class SocksQueue:
|
|||
connections with 1
|
||||
"""
|
||||
if idx != -1:
|
||||
if idx in self.readers:
|
||||
reader = self.readers[idx]
|
||||
self.idlequeue.unplug_idle(reader.fd)
|
||||
self.idlequeue.remove_timeout(reader.fd)
|
||||
if do_disconnect:
|
||||
reader.disconnect()
|
||||
else:
|
||||
if reader.streamhost is not None:
|
||||
reader.streamhost['state'] = -1
|
||||
del(self.readers[idx])
|
||||
for key in self.readers.keys():
|
||||
if idx in key:
|
||||
reader = self.readers[key]
|
||||
self.idlequeue.unplug_idle(reader.fd)
|
||||
self.idlequeue.remove_timeout(reader.fd)
|
||||
if do_disconnect:
|
||||
reader.disconnect()
|
||||
else:
|
||||
if reader.streamhost is not None:
|
||||
reader.streamhost['state'] = -1
|
||||
del(self.readers[key])
|
||||
|
||||
def remove_sender(self, idx, do_disconnect=True):
|
||||
"""
|
||||
|
@ -410,17 +438,18 @@ class SocksQueue:
|
|||
connections with 1
|
||||
"""
|
||||
if idx != -1:
|
||||
if idx in self.senders:
|
||||
sender = self.senders[idx]
|
||||
if do_disconnect:
|
||||
self.senders[idx].disconnect()
|
||||
return
|
||||
else:
|
||||
self.idlequeue.unplug_idle(sender.fd)
|
||||
self.idlequeue.remove_timeout(sender.fd)
|
||||
del(self.senders[idx])
|
||||
if self.connected > 0:
|
||||
self.connected -= 1
|
||||
for key in self.senders.keys():
|
||||
if idx in key:
|
||||
sender = self.senders[key]
|
||||
if do_disconnect:
|
||||
sender.disconnect()
|
||||
return
|
||||
else:
|
||||
self.idlequeue.unplug_idle(sender.fd)
|
||||
self.idlequeue.remove_timeout(sender.fd)
|
||||
del(self.senders[key])
|
||||
if self.connected > 0:
|
||||
self.connected -= 1
|
||||
if len(self.senders) == 0 and self.listener is not None:
|
||||
self.listener.disconnect()
|
||||
self.listener = None
|
||||
|
@ -702,14 +731,24 @@ class Socks5:
|
|||
self.disconnect()
|
||||
|
||||
elif self.state == 5:
|
||||
self.state = 7
|
||||
if self.type == 'sender':
|
||||
# We wait for the end of the negotiation to
|
||||
# send the file
|
||||
self.state = 7
|
||||
self.idlequeue.plug_idle(self, False, False)
|
||||
else:
|
||||
result = self.start_transfer() # receive
|
||||
self.queue.process_result(result, self)
|
||||
# We plug for reading
|
||||
self.idlequeue.plug_idle(self, False, True)
|
||||
return
|
||||
|
||||
elif self.state == 7:
|
||||
if self.file_props['paused']:
|
||||
self.file_props['continue_cb'] = self.continue_paused_transfer
|
||||
self.idlequeue.plug_idle(self, False, False)
|
||||
return
|
||||
self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT)
|
||||
result = self.start_transfer() # send
|
||||
self.queue.process_result(result, self)
|
||||
except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError,
|
||||
OpenSSL.SSL.WantX509LookupError), e:
|
||||
log.info('caught SSL exception, ignored')
|
||||
|
@ -770,8 +809,8 @@ class Socks5:
|
|||
if self.file_props['stalled'] == False:
|
||||
self.file_props['stalled'] = True
|
||||
self.queue.process_result(-1, self)
|
||||
#if 'received-len' not in self.file_props:
|
||||
# self.file_props['received-len'] = 0
|
||||
if 'received-len' not in self.file_props:
|
||||
self.file_props['received-len'] = 0
|
||||
if SEND_TIMEOUT > 0:
|
||||
self.idlequeue.set_read_timeout(self.fd, SEND_TIMEOUT)
|
||||
else:
|
||||
|
@ -1322,13 +1361,11 @@ class Socks5Receiver(Socks5, IdleObject):
|
|||
self.queue_idx = -1
|
||||
self.streamhost = streamhost
|
||||
self.queue = None
|
||||
self.file_props = file_props
|
||||
self.fingerprint = fingerprint
|
||||
self.connect_timeout = 0
|
||||
self.connected = False
|
||||
self.pauses = 0
|
||||
if not self.file_props:
|
||||
self.file_props = {}
|
||||
self.file_props = file_props
|
||||
self.file_props['disconnect_cb'] = self.disconnect
|
||||
self.file_props['error'] = 0
|
||||
self.file_props['started'] = True
|
||||
|
@ -1336,6 +1373,7 @@ class Socks5Receiver(Socks5, IdleObject):
|
|||
self.file_props['paused'] = False
|
||||
self.file_props['continue_cb'] = self.continue_paused_transfer
|
||||
self.file_props['stalled'] = False
|
||||
self.file_props['received-len'] = 0
|
||||
self.mode = mode # client or server
|
||||
Socks5.__init__(self, idlequeue, streamhost['host'],
|
||||
int(streamhost['port']), streamhost['initiator'], streamhost['target'],
|
||||
|
|
Loading…
Add table
Reference in a new issue