Wait until candidate negociation ends; Socks5 connection problem fixed

This commit is contained in:
Jefry Lagrange 2011-08-14 23:59:39 -04:00
parent 4284927434
commit 60df476506
2 changed files with 217 additions and 82 deletions

View File

@ -35,7 +35,17 @@ STATE_INITIALIZED = 1
STATE_ACCEPTED = 2
STATE_TRANSPORT_INFO = 3
STATE_PROXY_ACTIVATED = 4
STATE_TRANSPORT_REPLACE = 5
# We send the candidates and we are waiting for a reply
STATE_CAND_SENT_PENDING_REPLY = 5
# We received the candidates and we are waiting to reply
STATE_CAND_RECEIVED_PENDING_REPLY = 6
# We have sent and received the candidates
# This also includes any candidate-error received or sent
STATE_CAND_SENT_AND_RECEIVED = 7
# We are transfering the file
STATE_TRANSFERING = 8
STATE_TRANSPORT_REPLACE = 9
class JingleFileTransfer(JingleContent):
def __init__(self, session, transport=None, file_props=None,
@ -88,7 +98,7 @@ class JingleFileTransfer(JingleContent):
self.session = session
self.media = 'file'
self.nominated_cand = {}
def __on_session_initiate(self, stanza, content, error, action):
gajim.nec.push_incoming_event(FileRequestReceivedEvent(None,
@ -152,10 +162,20 @@ class JingleFileTransfer(JingleContent):
def __on_transport_info(self, stanza, content, error, action):
log.info("__on_transport_info")
if not self.weinitiate: # proxy activated from initiator
return
#if not self.weinitiate: # proxy activated from initiator
# return
if content.getTag('transport').getTag('candidate-error'):
self.session.transport_replace()
self.nominated_cand['peer-cand'] = False
if self.state == STATE_CAND_SENT_PENDING_REPLY:
#self.state = STATE_CAND_SENT_AND_RECEIVED
if not self.nominated_cand['our-cand'] and \
not self.nominated_cand['peer-cand']:
if not self.weinitiate:
return
self.session.transport_replace()
else:
self.state = STATE_CAND_RECEIVED_PENDING_REPLY
return
streamhost_cid = content.getTag('transport').getTag('candidate-used').\
getAttr('cid')
@ -167,29 +187,17 @@ class JingleFileTransfer(JingleContent):
if streamhost_used == None:
log.info("unknow streamhost")
return
if streamhost_used['type'] == 'proxy':
self.file_props['streamhost-used'] = True
for proxy in self.file_props['proxyhosts']:
if proxy['host'] == streamhost_used['host'] and \
proxy['port'] == streamhost_used['port'] and \
proxy['jid'] == streamhost_used['jid']:
host_used = proxy
break
if 'streamhosts' not in self.file_props:
self.file_props['streamhosts'] = []
self.file_props['streamhosts'].append(streamhost_used)
self.file_props['is_a_proxy'] = True
receiver = Socks5Receiver(gajim.idlequeue, streamhost_used,
self.file_props['sid'], self.file_props)
gajim.socks5queue.add_receiver(self.session.connection.name,
receiver)
streamhost_used['idx'] = receiver.queue_idx
gajim.socks5queue.on_success[self.file_props['sid']] = \
self.transport._on_proxy_auth_ok
# We save the candidate nominated by peer
self.nominated_cand['peer-cand'] = streamhost_used
if self.state == STATE_CAND_SENT_PENDING_REPLY:
response = stanza.buildReply('result')
self.session.connection.connection.send(response)
self.start_transfer(streamhost_used)
raise xmpp.NodeProcessed
else:
jid = gajim.get_jid_without_resource(self.session.ourjid)
gajim.socks5queue.send_file(self.file_props,
self.session.connection.name)
self.state = STATE_CAND_RECEIVED_PENDING_REPLY
def __on_iq_result(self, stanza, content, error, action):
log.info("__on_iq_result")
@ -221,6 +229,16 @@ class JingleFileTransfer(JingleContent):
elif self.weinitiate and self.state == STATE_INITIALIZED:
# proxy activated
self.state = STATE_PROXY_ACTIVATED
elif self.state == STATE_CAND_SENT_AND_RECEIVED:
if not self.nominated_cand['our-cand'] and \
not self.nominated_cand['peer-cand']:
if not self.weinitiate:
return
self.session.transport_replace()
return
# initiate transfer
self.start_transfer(None)
def send_candidate_used(self, streamhost):
"""
@ -229,7 +247,13 @@ class JingleFileTransfer(JingleContent):
log.info('send_candidate_used')
if streamhost is None:
return
self.nominated_cand['our-cand'] = streamhost
if self.state == STATE_CAND_RECEIVED_PENDING_REPLY:
self.state = STATE_CAND_SENT_AND_RECEIVED
else:
self.state = STATE_CAND_SENT_PENDING_REPLY
content = xmpp.Node('content')
content.setAttr('creator', 'initiator')
content.setAttr('name', self.name)
@ -246,9 +270,16 @@ class JingleFileTransfer(JingleContent):
self.session.send_transport_info(content)
def _on_connect_error(self, to, _id, sid, code=404):
if code == 404 and self.file_props['sid'] == sid:
self.send_error_candidate()
self.nominated_cand['our-cand'] = False
self.send_error_candidate()
if self.state == STATE_CAND_RECEIVED_PENDING_REPLY:
self.state = STATE_CAND_SENT_AND_RECEIVED
else:
self.state = STATE_CAND_SENT_PENDING_REPLY
log.info('connect error, sid=' + sid)
@ -297,20 +328,81 @@ class JingleFileTransfer(JingleContent):
fingerprint = None
if self.use_security:
fingerprint = 'server'
return
if self.weinitiate:
listener = gajim.socks5queue.start_listener(port, sha_str,
self._store_socks5_sid, self.file_props['sid'],
self._store_socks5_sid, self.file_props,
fingerprint=fingerprint, type='sender')
else:
listener = gajim.socks5queue.start_listener(port, sha_str,
self._store_socks5_sid, self.file_props['sid'],
self._store_socks5_sid, self.file_props,
fingerprint=fingerprint, type='receiver')
if not listener:
# send error message, notify the user
return
def isOurCandUsed(self):
if self.nominated_cand['peer-cand'] == False:
return True
if self.nominated_cand['our-cand'] == False:
return False
peer_pr = int(self.nominated_cand['peer-cand']['priority'])
our_pr = int(self.nominated_cand['our-cand']['priority'])
if peer_pr != our_pr:
if peer_pr > our_pr:
# Choose peer host
return False
else:
# Choose our host
return True
else:
if self.weinitiate:
# Choose our host
return True
else:
# Choose peer host
return False
def start_transfer(self, streamhost_used):
self.state = STATE_TRANSFERING
if self.isOurCandUsed():
print 'our'
else:
print 'peer'
# FIXME if streamhost_used is none where do we get the proxy host
if streamhost_used and streamhost_used['type'] == 'proxy':
self.file_props['streamhost-used'] = True
for proxy in self.file_props['proxyhosts']:
if proxy['host'] == streamhost_used['host'] and \
proxy['port'] == streamhost_used['port'] and \
proxy['jid'] == streamhost_used['jid']:
host_used = proxy
break
if 'streamhosts' not in self.file_props:
self.file_props['streamhosts'] = []
self.file_props['streamhosts'].append(streamhost_used)
self.file_props['is_a_proxy'] = True
receiver = Socks5Receiver(gajim.idlequeue, streamhost_used,
self.file_props['sid'], self.file_props)
gajim.socks5queue.add_receiver(self.session.connection.name,
receiver)
streamhost_used['idx'] = receiver.queue_idx
gajim.socks5queue.on_success[self.file_props['sid']] = \
self.transport._on_proxy_auth_ok
else:
jid = gajim.get_jid_without_resource(self.session.ourjid)
gajim.socks5queue.send_file(self.file_props,
self.session.connection.name)
def get_content(desc):
return JingleFileTransfer

View File

@ -84,16 +84,18 @@ class SocksQueue:
self.on_success = {} # {id: cb}
self.on_failure = {} # {id: cb}
def start_listener(self, port, sha_str, sha_handler, sid, fingerprint=None,
def start_listener(self, port, sha_str, sha_handler, fp, fingerprint=None,
type='sender'):
"""
Start waiting for incomming connections on (host, port) and do a socks5
authentication using sid for generated SHA
"""
sid = fp['sid']
self.type = type # It says whether we are sending or receiving
self.sha_handlers[sha_str] = (sha_handler, sid)
if self.listener is None:
self.listener = Socks5Listener(self.idlequeue, port, fingerprint=fingerprint)
self.listener = Socks5Listener(self.idlequeue, port, fp,
fingerprint=fingerprint)
self.listener.queue = self
self.listener.bind()
if self.listener.started is False:
@ -130,7 +132,7 @@ class SocksQueue:
self.on_failure[sid] = on_failure
file_props = self.files_props[account][sid]
file_props['failure_cb'] = on_failure
# add streamhosts to the queue
for streamhost in file_props['streamhosts']:
if 'type' in streamhost and streamhost['type'] == 'proxy':
@ -149,8 +151,8 @@ class SocksQueue:
file_props['sha_str'], self, mode='client' ,
_sock=None, host=str(streamhost['host']),
port=int(streamhost['port']), fingerprint=fp,
connected=False)
socks5obj.file_props = file_props
connected=False, file_props=file_props)
#socks5obj.file_props = file_props
socks5obj.streamhost = streamhost
self.add_sockobj(account, socks5obj, type='sender')
@ -234,7 +236,7 @@ class SocksQueue:
return
# failure_cb exists - this means that it has never been called
if 'failure_cb' in file_props and file_props['failure_cb']:
file_props['failure_cb'](streamhost['initiator'], streamhost['idx'],
file_props['failure_cb'](streamhost['initiator'], None,
file_props['sid'], code = 404)
del(file_props['failure_cb'])
@ -258,14 +260,6 @@ class SocksQueue:
return 1
return None
def get_file_from_sender(self, file_props, account):
if file_props is None:
return
if 'hash' in file_props and file_props['hash'] in self.senders:
sender = self.senders[file_props['hash']]
sender.account = account
result = self.get_file_contents(0)
self.process_result(result, sender)
def result_sha(self, sha_str, idx):
if sha_str in self.sha_handlers:
@ -313,7 +307,9 @@ class SocksQueue:
file_props['last-time'] = self.idlequeue.current_time()
file_props['received-len'] = 0
sender.file_props = file_props
else:
log.info("socks5: NOT sending file")
def add_file_props(self, account, file_props):
@ -351,12 +347,12 @@ class SocksQueue:
return fl_props[sid]
return None
def on_connection_accepted(self, sock):
def on_connection_accepted(self, sock, listener):
sock_hash = sock.__hash__()
if self.type == 'sender' and (sock_hash not in self.senders):
self.senders[sock_hash] = Socks5Sender(self.idlequeue, sock_hash, self,
sock[0], 'server', sock[1][0], sock[1][1],
fingerprint='server')
'server', sock[0], sock[1][0], sock[1][1],
fingerprint='server', file_props=listener.file_props)
# Start waiting for data
self.idlequeue.plug_idle(self.senders[sock_hash], False, True)
self.connected += 1
@ -371,6 +367,7 @@ class SocksQueue:
streamhost=sh,sid=None, file_props=None,
mode='server',fingerprint='server')
self.readers[sock_hash].set_sock(sock[0])
self.readers[sock_hash].queue = self
self.connected += 1
def process_result(self, result, actor):
@ -452,6 +449,7 @@ class Socks5:
self.remaining_buff = ''
self.file = None
self.connected = False
self.type = ''
def start_transfer(self):
"""
@ -474,9 +472,10 @@ class Socks5:
for ai in self.ais:
try:
self._sock = socket.socket(*ai[:3])
'''
if not self.fingerprint is None:
self._sock = OpenSSL.SSL.Connection(
jingle_xtls.get_context('client'), self._sock)
jingle_xtls.get_context('client'), self._sock)'''
# this will not block the GUI
self._sock.setblocking(False)
self._server = ai[4]
@ -495,6 +494,7 @@ class Socks5:
def do_connect(self):
try:
#self._sock.setblocking(True)
self._sock.connect(self._server)
self._sock.setblocking(False)
self._send=self._sock.send
@ -587,6 +587,7 @@ class Socks5:
if self.queue.on_success:
result = self.queue.send_success_reply(self.file_props,
self.streamhost)
if result == 0:
self.state = 8
self.disconnect()
@ -605,11 +606,13 @@ class Socks5:
self.file_props['received-len'] = 0
self.pauses = 0
# start sending file contents to socket
self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT)
self.idlequeue.plug_idle(self, True, False)
#self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT)
#self.idlequeue.plug_idle(self, True, False)
self.idlequeue.plug_idle(self, False, False)
else:
# receiving file contents from socket
self.idlequeue.plug_idle(self, False, True)
self.file_props['continue_cb'] = self.continue_paused_transfer
# we have set up the connection, next - retrieve file
self.state = 6
@ -699,9 +702,14 @@ class Socks5:
self.disconnect()
elif self.state == 5:
if self.file_props is not None and self.file_props['type'] == 'r':
result = self.start_transfer() # receive
self.queue.process_result(result, self)
if self.type == 'sender':
# We wait for the end of the negotiation to
# send the file
self.state = 7
self.idlequeue.plug_idle(self, False, False)
else:
result = self.start_transfer() # receive
self.queue.process_result(result, self)
except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError,
OpenSSL.SSL.WantX509LookupError), e:
log.info('caught SSL exception, ignored')
@ -1115,24 +1123,28 @@ class Socks5Sender(Socks5, IdleObject):
"""
def __init__(self, idlequeue, sock_hash, parent, mode,_sock, host=None,
port=None, fingerprint = None, connected=True):
port=None, fingerprint = None, connected=True,
file_props={}):
self.fingerprint = fingerprint
self.queue_idx = sock_hash
self.queue = parent
self.mode = mode # client or server
self.file_props = {}
self.file_props['paused'] = False
self.file_props = file_props
Socks5.__init__(self, idlequeue, host, port, None, None, None)
self._sock = _sock
if _sock is not None:
if self.fingerprint is not None:
'''if self.fingerprint is not None:
self._sock = OpenSSL.SSL.Connection(
jingle_xtls.get_context('server'), self._sock)
jingle_xtls.get_context('server'), _sock)
else:
self._sock.setblocking(False)
'''
self.fd = _sock.fileno()
self._recv = _sock.recv
self._send = _sock.send
@ -1140,21 +1152,6 @@ class Socks5Sender(Socks5, IdleObject):
self.state = 1 # waiting for first bytes
self.connect_timeout = 0
def start_transfer(self):
"""
Send the file
"""
return self.write_next()
def send_file(self):
"""
Start sending the file over verified connection
"""
if self.file_props['started']:
return
self.file_props['error'] = 0
self.file_props['disconnect_cb'] = self.disconnect
self.file_props['started'] = True
@ -1166,6 +1163,39 @@ class Socks5Sender(Socks5, IdleObject):
self.file_props['elapsed-time'] = 0
self.file_props['last-time'] = self.idlequeue.current_time()
self.file_props['received-len'] = 0
self.type = 'sender'
def start_transfer(self):
"""
Send the file
"""
return self.write_next()
def set_connection_sock(self, _sock):
self._sock = _sock
if self.fingerprint is not None:
self._sock = OpenSSL.SSL.Connection(
jingle_xtls.get_context('client'), self._sock)
else:
self._sock.setblocking(False)
self.fd = _sock.fileno()
self._recv = _sock.recv
self._send = _sock.send
self.connected = True
self.state = 1 # waiting for first bytes
self.file_props = None
# start waiting for data
self.idlequeue.plug_idle(self, False, True)
def send_file(self):
"""
Start sending the file over verified connection
"""
self.pauses = 0
self.state = 7
# plug for writing
@ -1185,7 +1215,7 @@ class Socks5Sender(Socks5, IdleObject):
self.queue.remove_sender(self.queue_idx, False)
class Socks5Listener(IdleObject):
def __init__(self, idlequeue, port, fingerprint=None):
def __init__(self, idlequeue, port, fp, fingerprint=None):
"""
Handle all incomming connections on (0.0.0.0, port)
@ -1205,15 +1235,16 @@ class Socks5Listener(IdleObject):
self._sock = None
self.fd = -1
self.fingerprint = fingerprint
self.file_props = fp
def bind(self):
for ai in self.ais:
# try the different possibilities (ipv6, ipv4, etc.)
try:
self._serv = socket.socket(*ai[:3])
if self.fingerprint is not None:
'''if self.fingerprint is not None:
self._serv = OpenSSL.SSL.Connection(
jingle_xtls.get_context('server'), self._serv)
jingle_xtls.get_context('server'), self._serv)'''
except socket.error, e:
if e.args[0] == EAFNOSUPPORT:
self.ai = None
@ -1232,6 +1263,7 @@ class Socks5Listener(IdleObject):
# will fail when port as busy, or we don't have rights to bind
try:
self._serv.bind(ai[4])
f = ai[4]
self.ai = ai
break
except Exception:
@ -1257,7 +1289,7 @@ class Socks5Listener(IdleObject):
Accept a new incomming connection and notify queue
"""
sock = self.accept_conn()
self.queue.on_connection_accepted(sock)
self.queue.on_connection_accepted(sock, self)
def disconnect(self):
"""
@ -1314,6 +1346,7 @@ class Socks5Receiver(Socks5, IdleObject):
"""
Start receiving the file over verified connection
"""
print "receiving file"
if self.file_props['started']:
return
self.file_props['error'] = 0
@ -1339,6 +1372,16 @@ class Socks5Receiver(Socks5, IdleObject):
"""
return self.get_file_contents(0)
def set_sock(self, _sock):
self._sock = _sock
self._sock.setblocking(False)
self.fd = _sock.fileno()
self._recv = _sock.recv
self._send = _sock.send
self.connected = True
self.state = 1 # waiting for first bytes
# start waiting for data
self.idlequeue.plug_idle(self, False, True)
def disconnect(self, cb=True):
"""