diff --git a/src/common/jingle_ft.py b/src/common/jingle_ft.py index 4e2452c5d..2ee4cf983 100644 --- a/src/common/jingle_ft.py +++ b/src/common/jingle_ft.py @@ -35,7 +35,17 @@ STATE_INITIALIZED = 1 STATE_ACCEPTED = 2 STATE_TRANSPORT_INFO = 3 STATE_PROXY_ACTIVATED = 4 -STATE_TRANSPORT_REPLACE = 5 +# We send the candidates and we are waiting for a reply +STATE_CAND_SENT_PENDING_REPLY = 5 +# We received the candidates and we are waiting to reply +STATE_CAND_RECEIVED_PENDING_REPLY = 6 +# We have sent and received the candidates +# This also includes any candidate-error received or sent +STATE_CAND_SENT_AND_RECEIVED = 7 +# We are transfering the file +STATE_TRANSFERING = 8 +STATE_TRANSPORT_REPLACE = 9 + class JingleFileTransfer(JingleContent): def __init__(self, session, transport=None, file_props=None, @@ -88,7 +98,7 @@ class JingleFileTransfer(JingleContent): self.session = session self.media = 'file' - + self.nominated_cand = {} def __on_session_initiate(self, stanza, content, error, action): gajim.nec.push_incoming_event(FileRequestReceivedEvent(None, @@ -152,10 +162,20 @@ class JingleFileTransfer(JingleContent): def __on_transport_info(self, stanza, content, error, action): log.info("__on_transport_info") - if not self.weinitiate: # proxy activated from initiator - return + #if not self.weinitiate: # proxy activated from initiator + # return if content.getTag('transport').getTag('candidate-error'): - self.session.transport_replace() + self.nominated_cand['peer-cand'] = False + if self.state == STATE_CAND_SENT_PENDING_REPLY: + #self.state = STATE_CAND_SENT_AND_RECEIVED + if not self.nominated_cand['our-cand'] and \ + not self.nominated_cand['peer-cand']: + if not self.weinitiate: + return + self.session.transport_replace() + else: + self.state = STATE_CAND_RECEIVED_PENDING_REPLY + return streamhost_cid = content.getTag('transport').getTag('candidate-used').\ getAttr('cid') @@ -167,29 +187,17 @@ class JingleFileTransfer(JingleContent): if streamhost_used == None: log.info("unknow streamhost") return - if streamhost_used['type'] == 'proxy': - self.file_props['streamhost-used'] = True - for proxy in self.file_props['proxyhosts']: - if proxy['host'] == streamhost_used['host'] and \ - proxy['port'] == streamhost_used['port'] and \ - proxy['jid'] == streamhost_used['jid']: - host_used = proxy - break - if 'streamhosts' not in self.file_props: - self.file_props['streamhosts'] = [] - self.file_props['streamhosts'].append(streamhost_used) - self.file_props['is_a_proxy'] = True - receiver = Socks5Receiver(gajim.idlequeue, streamhost_used, - self.file_props['sid'], self.file_props) - gajim.socks5queue.add_receiver(self.session.connection.name, - receiver) - streamhost_used['idx'] = receiver.queue_idx - gajim.socks5queue.on_success[self.file_props['sid']] = \ - self.transport._on_proxy_auth_ok + # We save the candidate nominated by peer + self.nominated_cand['peer-cand'] = streamhost_used + if self.state == STATE_CAND_SENT_PENDING_REPLY: + response = stanza.buildReply('result') + self.session.connection.connection.send(response) + self.start_transfer(streamhost_used) + raise xmpp.NodeProcessed else: - jid = gajim.get_jid_without_resource(self.session.ourjid) - gajim.socks5queue.send_file(self.file_props, - self.session.connection.name) + self.state = STATE_CAND_RECEIVED_PENDING_REPLY + + def __on_iq_result(self, stanza, content, error, action): log.info("__on_iq_result") @@ -221,6 +229,16 @@ class JingleFileTransfer(JingleContent): elif self.weinitiate and self.state == STATE_INITIALIZED: # proxy activated self.state = STATE_PROXY_ACTIVATED + elif self.state == STATE_CAND_SENT_AND_RECEIVED: + + if not self.nominated_cand['our-cand'] and \ + not self.nominated_cand['peer-cand']: + if not self.weinitiate: + return + self.session.transport_replace() + return + # initiate transfer + self.start_transfer(None) def send_candidate_used(self, streamhost): """ @@ -229,7 +247,13 @@ class JingleFileTransfer(JingleContent): log.info('send_candidate_used') if streamhost is None: return - + + self.nominated_cand['our-cand'] = streamhost + if self.state == STATE_CAND_RECEIVED_PENDING_REPLY: + self.state = STATE_CAND_SENT_AND_RECEIVED + else: + self.state = STATE_CAND_SENT_PENDING_REPLY + content = xmpp.Node('content') content.setAttr('creator', 'initiator') content.setAttr('name', self.name) @@ -246,9 +270,16 @@ class JingleFileTransfer(JingleContent): self.session.send_transport_info(content) + def _on_connect_error(self, to, _id, sid, code=404): - if code == 404 and self.file_props['sid'] == sid: - self.send_error_candidate() + self.nominated_cand['our-cand'] = False + self.send_error_candidate() + + if self.state == STATE_CAND_RECEIVED_PENDING_REPLY: + self.state = STATE_CAND_SENT_AND_RECEIVED + else: + self.state = STATE_CAND_SENT_PENDING_REPLY + log.info('connect error, sid=' + sid) @@ -297,20 +328,81 @@ class JingleFileTransfer(JingleContent): fingerprint = None if self.use_security: fingerprint = 'server' - return + if self.weinitiate: listener = gajim.socks5queue.start_listener(port, sha_str, - self._store_socks5_sid, self.file_props['sid'], + self._store_socks5_sid, self.file_props, fingerprint=fingerprint, type='sender') else: listener = gajim.socks5queue.start_listener(port, sha_str, - self._store_socks5_sid, self.file_props['sid'], + self._store_socks5_sid, self.file_props, fingerprint=fingerprint, type='receiver') if not listener: # send error message, notify the user return + def isOurCandUsed(self): + if self.nominated_cand['peer-cand'] == False: + return True + if self.nominated_cand['our-cand'] == False: + return False + + peer_pr = int(self.nominated_cand['peer-cand']['priority']) + our_pr = int(self.nominated_cand['our-cand']['priority']) + + if peer_pr != our_pr: + if peer_pr > our_pr: + # Choose peer host + return False + else: + # Choose our host + return True + else: + if self.weinitiate: + # Choose our host + return True + else: + # Choose peer host + return False + + + + def start_transfer(self, streamhost_used): + + self.state = STATE_TRANSFERING + + if self.isOurCandUsed(): + print 'our' + else: + print 'peer' + + + # FIXME if streamhost_used is none where do we get the proxy host + if streamhost_used and streamhost_used['type'] == 'proxy': + self.file_props['streamhost-used'] = True + for proxy in self.file_props['proxyhosts']: + if proxy['host'] == streamhost_used['host'] and \ + proxy['port'] == streamhost_used['port'] and \ + proxy['jid'] == streamhost_used['jid']: + host_used = proxy + break + if 'streamhosts' not in self.file_props: + self.file_props['streamhosts'] = [] + self.file_props['streamhosts'].append(streamhost_used) + self.file_props['is_a_proxy'] = True + receiver = Socks5Receiver(gajim.idlequeue, streamhost_used, + self.file_props['sid'], self.file_props) + gajim.socks5queue.add_receiver(self.session.connection.name, + receiver) + streamhost_used['idx'] = receiver.queue_idx + gajim.socks5queue.on_success[self.file_props['sid']] = \ + self.transport._on_proxy_auth_ok + else: + jid = gajim.get_jid_without_resource(self.session.ourjid) + gajim.socks5queue.send_file(self.file_props, + self.session.connection.name) + def get_content(desc): return JingleFileTransfer diff --git a/src/common/socks5.py b/src/common/socks5.py index 8a3b8b5f5..37e3034fb 100644 --- a/src/common/socks5.py +++ b/src/common/socks5.py @@ -84,16 +84,18 @@ class SocksQueue: self.on_success = {} # {id: cb} self.on_failure = {} # {id: cb} - def start_listener(self, port, sha_str, sha_handler, sid, fingerprint=None, + def start_listener(self, port, sha_str, sha_handler, fp, fingerprint=None, type='sender'): """ Start waiting for incomming connections on (host, port) and do a socks5 authentication using sid for generated SHA """ + sid = fp['sid'] self.type = type # It says whether we are sending or receiving self.sha_handlers[sha_str] = (sha_handler, sid) if self.listener is None: - self.listener = Socks5Listener(self.idlequeue, port, fingerprint=fingerprint) + self.listener = Socks5Listener(self.idlequeue, port, fp, + fingerprint=fingerprint) self.listener.queue = self self.listener.bind() if self.listener.started is False: @@ -130,7 +132,7 @@ class SocksQueue: self.on_failure[sid] = on_failure file_props = self.files_props[account][sid] file_props['failure_cb'] = on_failure - + # add streamhosts to the queue for streamhost in file_props['streamhosts']: if 'type' in streamhost and streamhost['type'] == 'proxy': @@ -149,8 +151,8 @@ class SocksQueue: file_props['sha_str'], self, mode='client' , _sock=None, host=str(streamhost['host']), port=int(streamhost['port']), fingerprint=fp, - connected=False) - socks5obj.file_props = file_props + connected=False, file_props=file_props) + #socks5obj.file_props = file_props socks5obj.streamhost = streamhost self.add_sockobj(account, socks5obj, type='sender') @@ -234,7 +236,7 @@ class SocksQueue: return # failure_cb exists - this means that it has never been called if 'failure_cb' in file_props and file_props['failure_cb']: - file_props['failure_cb'](streamhost['initiator'], streamhost['idx'], + file_props['failure_cb'](streamhost['initiator'], None, file_props['sid'], code = 404) del(file_props['failure_cb']) @@ -258,14 +260,6 @@ class SocksQueue: return 1 return None - def get_file_from_sender(self, file_props, account): - if file_props is None: - return - if 'hash' in file_props and file_props['hash'] in self.senders: - sender = self.senders[file_props['hash']] - sender.account = account - result = self.get_file_contents(0) - self.process_result(result, sender) def result_sha(self, sha_str, idx): if sha_str in self.sha_handlers: @@ -313,7 +307,9 @@ class SocksQueue: file_props['last-time'] = self.idlequeue.current_time() file_props['received-len'] = 0 sender.file_props = file_props + else: + log.info("socks5: NOT sending file") def add_file_props(self, account, file_props): @@ -351,12 +347,12 @@ class SocksQueue: return fl_props[sid] return None - def on_connection_accepted(self, sock): + def on_connection_accepted(self, sock, listener): sock_hash = sock.__hash__() if self.type == 'sender' and (sock_hash not in self.senders): self.senders[sock_hash] = Socks5Sender(self.idlequeue, sock_hash, self, - sock[0], 'server', sock[1][0], sock[1][1], - fingerprint='server') + 'server', sock[0], sock[1][0], sock[1][1], + fingerprint='server', file_props=listener.file_props) # Start waiting for data self.idlequeue.plug_idle(self.senders[sock_hash], False, True) self.connected += 1 @@ -371,6 +367,7 @@ class SocksQueue: streamhost=sh,sid=None, file_props=None, mode='server',fingerprint='server') self.readers[sock_hash].set_sock(sock[0]) + self.readers[sock_hash].queue = self self.connected += 1 def process_result(self, result, actor): @@ -452,6 +449,7 @@ class Socks5: self.remaining_buff = '' self.file = None self.connected = False + self.type = '' def start_transfer(self): """ @@ -474,9 +472,10 @@ class Socks5: for ai in self.ais: try: self._sock = socket.socket(*ai[:3]) + ''' if not self.fingerprint is None: self._sock = OpenSSL.SSL.Connection( - jingle_xtls.get_context('client'), self._sock) + jingle_xtls.get_context('client'), self._sock)''' # this will not block the GUI self._sock.setblocking(False) self._server = ai[4] @@ -495,6 +494,7 @@ class Socks5: def do_connect(self): try: + #self._sock.setblocking(True) self._sock.connect(self._server) self._sock.setblocking(False) self._send=self._sock.send @@ -587,6 +587,7 @@ class Socks5: if self.queue.on_success: result = self.queue.send_success_reply(self.file_props, self.streamhost) + if result == 0: self.state = 8 self.disconnect() @@ -605,11 +606,13 @@ class Socks5: self.file_props['received-len'] = 0 self.pauses = 0 # start sending file contents to socket - self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT) - self.idlequeue.plug_idle(self, True, False) + #self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT) + #self.idlequeue.plug_idle(self, True, False) + self.idlequeue.plug_idle(self, False, False) else: # receiving file contents from socket self.idlequeue.plug_idle(self, False, True) + self.file_props['continue_cb'] = self.continue_paused_transfer # we have set up the connection, next - retrieve file self.state = 6 @@ -699,9 +702,14 @@ class Socks5: self.disconnect() elif self.state == 5: - if self.file_props is not None and self.file_props['type'] == 'r': - result = self.start_transfer() # receive - self.queue.process_result(result, self) + if self.type == 'sender': + # We wait for the end of the negotiation to + # send the file + self.state = 7 + self.idlequeue.plug_idle(self, False, False) + else: + result = self.start_transfer() # receive + self.queue.process_result(result, self) except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError, OpenSSL.SSL.WantX509LookupError), e: log.info('caught SSL exception, ignored') @@ -1115,24 +1123,28 @@ class Socks5Sender(Socks5, IdleObject): """ def __init__(self, idlequeue, sock_hash, parent, mode,_sock, host=None, - port=None, fingerprint = None, connected=True): + port=None, fingerprint = None, connected=True, + file_props={}): + self.fingerprint = fingerprint self.queue_idx = sock_hash self.queue = parent self.mode = mode # client or server - self.file_props = {} - self.file_props['paused'] = False + self.file_props = file_props + + + Socks5.__init__(self, idlequeue, host, port, None, None, None) self._sock = _sock if _sock is not None: - if self.fingerprint is not None: + '''if self.fingerprint is not None: self._sock = OpenSSL.SSL.Connection( - jingle_xtls.get_context('server'), self._sock) + jingle_xtls.get_context('server'), _sock) else: self._sock.setblocking(False) - + ''' self.fd = _sock.fileno() self._recv = _sock.recv self._send = _sock.send @@ -1140,21 +1152,6 @@ class Socks5Sender(Socks5, IdleObject): self.state = 1 # waiting for first bytes self.connect_timeout = 0 - - def start_transfer(self): - """ - Send the file - """ - return self.write_next() - - - - def send_file(self): - """ - Start sending the file over verified connection - """ - if self.file_props['started']: - return self.file_props['error'] = 0 self.file_props['disconnect_cb'] = self.disconnect self.file_props['started'] = True @@ -1166,6 +1163,39 @@ class Socks5Sender(Socks5, IdleObject): self.file_props['elapsed-time'] = 0 self.file_props['last-time'] = self.idlequeue.current_time() self.file_props['received-len'] = 0 + self.type = 'sender' + + def start_transfer(self): + """ + Send the file + """ + return self.write_next() + + + def set_connection_sock(self, _sock): + + self._sock = _sock + + if self.fingerprint is not None: + self._sock = OpenSSL.SSL.Connection( + jingle_xtls.get_context('client'), self._sock) + else: + self._sock.setblocking(False) + + self.fd = _sock.fileno() + self._recv = _sock.recv + self._send = _sock.send + self.connected = True + self.state = 1 # waiting for first bytes + self.file_props = None + # start waiting for data + self.idlequeue.plug_idle(self, False, True) + + def send_file(self): + """ + Start sending the file over verified connection + """ + self.pauses = 0 self.state = 7 # plug for writing @@ -1185,7 +1215,7 @@ class Socks5Sender(Socks5, IdleObject): self.queue.remove_sender(self.queue_idx, False) class Socks5Listener(IdleObject): - def __init__(self, idlequeue, port, fingerprint=None): + def __init__(self, idlequeue, port, fp, fingerprint=None): """ Handle all incomming connections on (0.0.0.0, port) @@ -1205,15 +1235,16 @@ class Socks5Listener(IdleObject): self._sock = None self.fd = -1 self.fingerprint = fingerprint - + self.file_props = fp + def bind(self): for ai in self.ais: # try the different possibilities (ipv6, ipv4, etc.) try: self._serv = socket.socket(*ai[:3]) - if self.fingerprint is not None: + '''if self.fingerprint is not None: self._serv = OpenSSL.SSL.Connection( - jingle_xtls.get_context('server'), self._serv) + jingle_xtls.get_context('server'), self._serv)''' except socket.error, e: if e.args[0] == EAFNOSUPPORT: self.ai = None @@ -1232,6 +1263,7 @@ class Socks5Listener(IdleObject): # will fail when port as busy, or we don't have rights to bind try: self._serv.bind(ai[4]) + f = ai[4] self.ai = ai break except Exception: @@ -1257,7 +1289,7 @@ class Socks5Listener(IdleObject): Accept a new incomming connection and notify queue """ sock = self.accept_conn() - self.queue.on_connection_accepted(sock) + self.queue.on_connection_accepted(sock, self) def disconnect(self): """ @@ -1314,6 +1346,7 @@ class Socks5Receiver(Socks5, IdleObject): """ Start receiving the file over verified connection """ + print "receiving file" if self.file_props['started']: return self.file_props['error'] = 0 @@ -1339,6 +1372,16 @@ class Socks5Receiver(Socks5, IdleObject): """ return self.get_file_contents(0) + def set_sock(self, _sock): + self._sock = _sock + self._sock.setblocking(False) + self.fd = _sock.fileno() + self._recv = _sock.recv + self._send = _sock.send + self.connected = True + self.state = 1 # waiting for first bytes + # start waiting for data + self.idlequeue.plug_idle(self, False, True) def disconnect(self, cb=True): """