diff --git a/src/common/jingle_ft.py b/src/common/jingle_ft.py index c3bb9a9a6..4e2452c5d 100644 --- a/src/common/jingle_ft.py +++ b/src/common/jingle_ft.py @@ -51,6 +51,7 @@ class JingleFileTransfer(JingleContent): self.callbacks['session-terminate'] += [self.__on_session_terminate] self.callbacks['transport-accept'] += [self.__on_transport_accept] self.callbacks['transport-replace'] += [self.__on_transport_replace] + self.callbacks['session-accept-sent'] += [self._listen_host] # fallback transport method self.callbacks['transport-reject'] += [self.__on_transport_reject] self.callbacks['transport-info'] += [self.__on_transport_info] @@ -117,7 +118,8 @@ class JingleFileTransfer(JingleContent): for host in self.file_props['streamhosts']: host['initiator'] = self.session.initiator host['target'] = self.session.responder - + host['sid'] = self.file_props['sid'] + response = stanza.buildReply('result') con.connection.send(response) @@ -130,7 +132,8 @@ class JingleFileTransfer(JingleContent): fingerprint = 'client' gajim.socks5queue.connect_to_hosts(self.session.connection.name, self.file_props['sid'], self.send_candidate_used, - self._on_connect_error, fingerprint=fingerprint) + self._on_connect_error, fingerprint=fingerprint, + receiving=False) raise xmpp.NodeProcessed @@ -195,25 +198,8 @@ class JingleFileTransfer(JingleContent): self.state = STATE_INITIALIZED self.session.connection.files_props[self.file_props['sid']] = \ self.file_props - receiver = self.file_props['receiver'] - sender = self.file_props['sender'] - - sha_str = helpers.get_auth_sha(self.file_props['sid'], sender, - receiver) - self.file_props['sha_str'] = sha_str - - port = gajim.config.get('file_transfers_port') - - fingerprint = None - if self.use_security: - fingerprint = 'server' - listener = gajim.socks5queue.start_listener(port, sha_str, - self._store_socks5_sid, self.file_props['sid'], - fingerprint=fingerprint) - - if not listener: - return - # send error message, notify the user + # Listen on configured port for file transfer + self._listen_host() elif not self.weinitiate and self.state == STATE_NOT_STARTED: # session-accept iq-result if not self.negotiated: @@ -295,7 +281,36 @@ class JingleFileTransfer(JingleContent): def _store_socks5_sid(self, sid, hash_id): # callback from socsk5queue.start_listener self.file_props['hash'] = hash_id + + def _listen_host(self, stanza=None, content=None, error=None + , action=None): + + receiver = self.file_props['receiver'] + sender = self.file_props['sender'] + sha_str = helpers.get_auth_sha(self.file_props['sid'], sender, + receiver) + self.file_props['sha_str'] = sha_str + + port = gajim.config.get('file_transfers_port') + + fingerprint = None + if self.use_security: + fingerprint = 'server' + return + if self.weinitiate: + listener = gajim.socks5queue.start_listener(port, sha_str, + self._store_socks5_sid, self.file_props['sid'], + fingerprint=fingerprint, type='sender') + else: + listener = gajim.socks5queue.start_listener(port, sha_str, + self._store_socks5_sid, self.file_props['sid'], + fingerprint=fingerprint, type='receiver') + + if not listener: + # send error message, notify the user + return + def get_content(desc): return JingleFileTransfer diff --git a/src/common/socks5.py b/src/common/socks5.py index 1aa53ac43..8a3b8b5f5 100644 --- a/src/common/socks5.py +++ b/src/common/socks5.py @@ -84,11 +84,13 @@ class SocksQueue: self.on_success = {} # {id: cb} self.on_failure = {} # {id: cb} - def start_listener(self, port, sha_str, sha_handler, sid, fingerprint=None): + def start_listener(self, port, sha_str, sha_handler, sid, fingerprint=None, + type='sender'): """ Start waiting for incomming connections on (host, port) and do a socks5 authentication using sid for generated SHA """ + self.type = type # It says whether we are sending or receiving self.sha_handlers[sha_str] = (sha_handler, sid) if self.listener is None: self.listener = Socks5Listener(self.idlequeue, port, fingerprint=fingerprint) @@ -123,7 +125,7 @@ class SocksQueue: return 0 def connect_to_hosts(self, account, sid, on_success=None, on_failure=None, - fingerprint=None): + fingerprint=None, receiving=True): self.on_success[sid] = on_success self.on_failure[sid] = on_failure file_props = self.files_props[account][sid] @@ -135,38 +137,59 @@ class SocksQueue: fp = None else: fp = fingerprint - receiver = Socks5Receiver(self.idlequeue, streamhost, sid, - file_props, fingerprint=fp) - self.add_receiver(account, receiver) - streamhost['idx'] = receiver.queue_idx + if receiving: + self.type = 'receiver' + socks5obj = Socks5Receiver(self.idlequeue, streamhost, sid, + 'client', file_props, + fingerprint=fp) + self.add_sockobj(account, socks5obj) + else: + self.type = 'sender' + socks5obj = Socks5Sender(self.idlequeue, + file_props['sha_str'], self, mode='client' , + _sock=None, host=str(streamhost['host']), + port=int(streamhost['port']), fingerprint=fp, + connected=False) + socks5obj.file_props = file_props + socks5obj.streamhost = streamhost + self.add_sockobj(account, socks5obj, type='sender') + + socks5obj.file_props = file_props + streamhost['idx'] = socks5obj.queue_idx def _socket_connected(self, streamhost, file_props): """ Called when there is a host connected to one of the senders's - streamhosts. Stop othere attempts for connections + streamhosts. Stop other attempts for connections """ for host in file_props['streamhosts']: if host != streamhost and 'idx' in host: if host['state'] == 1: # remove current - self.remove_receiver(streamhost['idx']) + if self.type == 'sender': + self.remove_sender(streamhost['idx'], False) + else: + self.remove_receiver(streamhost['idx']) return # set state -2, meaning that this streamhost is stopped, # but it may be connectected later if host['state'] >= 0: - self.remove_receiver(host['idx']) + if self.type == 'sender': + self.remove_sender(host['idx'], False) + else: + self.remove_receiver(host['idx']) host['idx'] = -1 host['state'] = -2 - def reconnect_receiver(self, receiver, streamhost): + def reconnect_client(self, client, streamhost): """ Check the state of all streamhosts and if all has failed, then emit connection failure cb. If there are some which are still not connected try to establish connection to one of them """ - self.idlequeue.remove_timeout(receiver.fd) - self.idlequeue.unplug_idle(receiver.fd) - file_props = receiver.file_props + self.idlequeue.remove_timeout(client.fd) + self.idlequeue.unplug_idle(client.fd) + file_props = client.file_props streamhost['state'] = -1 # boolean, indicates that there are hosts, which are not tested yet unused_hosts = False @@ -180,20 +203,22 @@ class SocksQueue: for host in file_props['streamhosts']: if host['state'] == -2: host['state'] = 0 - receiver = Socks5Receiver(self.idlequeue, host, host['sid'], - file_props) - self.add_receiver(receiver.account, receiver) - host['idx'] = receiver.queue_idx + # FIXME: make the sender reconnect also + print 'reconnecting using socks receiver' + client = Socks5Receiver(self.idlequeue, host, host['sid'], + 'client',file_props) + self.add_sockobj(client.account, client) + host['idx'] = client.queue_idx # we still have chances to connect return if 'received-len' not in file_props or file_props['received-len'] == 0: # there are no other streamhosts and transfer hasn't started - self._connection_refused(streamhost, file_props, receiver.queue_idx) + self._connection_refused(streamhost, file_props, client.queue_idx) else: # transfer stopped, it is most likely stopped from sender - receiver.disconnect() + client.disconnect() file_props['error'] = -1 - self.process_result(-1, receiver) + self.process_result(-1, client) def _connection_refused(self, streamhost, file_props, idx): """ @@ -213,20 +238,23 @@ class SocksQueue: file_props['sid'], code = 404) del(file_props['failure_cb']) - def add_receiver(self, account, sock5_receiver): + def add_sockobj(self, account, sockobj, type='receiver'): """ - Add new file request + Add new file a sockobj type receiver or sendder """ - self.readers[self.idx] = sock5_receiver - sock5_receiver.queue_idx = self.idx - sock5_receiver.queue = self - sock5_receiver.account = account + if type == 'receiver': + self.readers[self.idx] = sockobj + else: + self.senders[self.idx] = sockobj + sockobj.queue_idx = self.idx + sockobj.queue = self + sockobj.account = account self.idx += 1 - result = sock5_receiver.connect() + result = sockobj.connect() self.connected += 1 if result is not None: - result = sock5_receiver.main() - self.process_result(result, sock5_receiver) + result = sockobj.main() + self.process_result(result, sockobj) return 1 return None @@ -325,13 +353,26 @@ class SocksQueue: def on_connection_accepted(self, sock): sock_hash = sock.__hash__() - if sock_hash not in self.senders: + if self.type == 'sender' and (sock_hash not in self.senders): self.senders[sock_hash] = Socks5Sender(self.idlequeue, sock_hash, self, - sock[0], sock[1][0], sock[1][1], fingerprint='server') + sock[0], 'server', sock[1][0], sock[1][1], + fingerprint='server') # Start waiting for data self.idlequeue.plug_idle(self.senders[sock_hash], False, True) self.connected += 1 - + + if self.type == 'receiver' and (sock_hash not in self.readers): + sh = {} + sh['host'] = sock[1][0] + sh['port'] = sock[1][1] + sh['initiator'] = None + sh['target'] = None + self.readers[sock_hash] = Socks5Receiver(idlequeue=self.idlequeue, + streamhost=sh,sid=None, file_props=None, + mode='server',fingerprint='server') + self.readers[sock_hash].set_sock(sock[0]) + self.connected += 1 + def process_result(self, result, actor): """ Take appropriate actions upon the result: @@ -373,10 +414,13 @@ class SocksQueue: """ if idx != -1: if idx in self.senders: + sender = self.senders[idx] if do_disconnect: self.senders[idx].disconnect() return else: + self.idlequeue.unplug_idle(sender.fd) + self.idlequeue.remove_timeout(sender.fd) del(self.senders[idx]) if self.connected > 0: self.connected -= 1 @@ -407,7 +451,329 @@ class Socks5: self.size = 0 self.remaining_buff = '' self.file = None + self.connected = False + def start_transfer(self): + """ + Must be implemented by subclass. + """ + pass + + def _is_connected(self): + if self.state < 5: + return False + return True + + def connect(self): + """ + Create the socket and plug it to the idlequeue + """ + if self.ais is None: + return None + + for ai in self.ais: + try: + self._sock = socket.socket(*ai[:3]) + if not self.fingerprint is None: + self._sock = OpenSSL.SSL.Connection( + jingle_xtls.get_context('client'), self._sock) + # this will not block the GUI + self._sock.setblocking(False) + self._server = ai[4] + break + except socket.error, e: + if not isinstance(e, basestring) and e[0] == EINPROGRESS: + break + # for all other errors, we try other addresses + continue + self.fd = self._sock.fileno() + self.state = 0 # about to be connected + self.idlequeue.plug_idle(self, True, False) + self.do_connect() + self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT) + return None + + def do_connect(self): + try: + self._sock.connect(self._server) + self._sock.setblocking(False) + self._send=self._sock.send + self._recv=self._sock.recv + except Exception, ee: + errnum = ee[0] + self.connect_timeout += 1 + if errnum == 111 or self.connect_timeout > 1000: + self.queue._connection_refused(self.streamhost, + self.file_props, self.queue_idx) + self.connected = False + return None + # win32 needs this + elif errnum not in (10056, EISCONN) or self.state != 0: + return None + else: # socket is already connected + self._sock.setblocking(False) + self._send=self._sock.send + self._recv=self._sock.recv + self.buff = '' + self.connected = True + self.file_props['connected'] = True + self.file_props['disconnect_cb'] = self.disconnect + self.file_props['paused'] = False + self.state = 1 # connected + + # stop all others connections to sender's streamhosts + self.queue._socket_connected(self.streamhost, self.file_props) + self.idlequeue.plug_idle(self, True, False) + return 1 # we are connected + + def svr_main(self): + """ + Initial requests for verifying the connection + """ + if self.state == 1: # initial read + buff = self.receive() + if not self.connected: + return -1 + mechs = self._parse_auth_buff(buff) + if mechs is None: + return -1 # invalid auth methods received + elif self.state == 3: # get next request + buff = self.receive() + req_type, self.sha_msg = self._parse_request_buff(buff)[:2] + if req_type != 0x01: + return -1 # request is not of type 'connect' + self.state += 1 # go to the next step + # unplug & plug for writing + self.idlequeue.plug_idle(self, True, False) + return None + + def clnt_main(self, timeout=0): + """ + Begin negotiation. on success 'address' != 0 + """ + result = 1 + buff = self.receive() + if buff == '': + # end connection + self.pollend() + return + + if self.state == 2: # read auth response + if buff is None or len(buff) != 2: + return None + version, method = struct.unpack('!BB', buff[:2]) + if version != 0x05 or method == 0xff: + self.disconnect() + elif self.state == 4: # get approve of our request + if buff is None: + return None + sub_buff = buff[:4] + if len(sub_buff) < 4: + return None + version, address_type = struct.unpack('!BxxB', buff[:4]) + addrlen = 0 + if address_type == 0x03: + addrlen = ord(buff[4]) + address = struct.unpack('!%ds' % addrlen, buff[5:addrlen + 5]) + portlen = len(buff[addrlen + 5:]) + if portlen == 1: + port, = struct.unpack('!B', buff[addrlen + 5]) + elif portlen == 2: + port, = struct.unpack('!H', buff[addrlen + 5:]) + else: # Gaim bug :) + port, = struct.unpack('!H', buff[addrlen + 5:addrlen + 7]) + self.remaining_buff = buff[addrlen + 7:] + self.state = 5 # for senders: init file_props and send '\n' + if self.queue.on_success: + result = self.queue.send_success_reply(self.file_props, + self.streamhost) + if result == 0: + self.state = 8 + self.disconnect() + + # for senders: init file_props + if result == 1 and self.state == 5: + if self.file_props['type'] == 's': + self.file_props['error'] = 0 + self.file_props['disconnect_cb'] = self.disconnect + self.file_props['started'] = True + self.file_props['completed'] = False + self.file_props['paused'] = False + self.file_props['stalled'] = False + self.file_props['elapsed-time'] = 0 + self.file_props['last-time'] = self.idlequeue.current_time() + self.file_props['received-len'] = 0 + self.pauses = 0 + # start sending file contents to socket + self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT) + self.idlequeue.plug_idle(self, True, False) + else: + # receiving file contents from socket + self.idlequeue.plug_idle(self, False, True) + self.file_props['continue_cb'] = self.continue_paused_transfer + # we have set up the connection, next - retrieve file + self.state = 6 + if self.state < 5: + self.idlequeue.plug_idle(self, True, False) + self.state += 1 + return None + + def pollout(self): + if self.mode == 'client': + self.clnt_pollout() + elif self.mode == 'server': + self.svr_pollout() + + def svr_pollout(self): + if not self.connected: + self.disconnect() + return + self.idlequeue.remove_timeout(self.fd) + if self.state == 2: # send reply with desired auth type + self.send_raw(self._get_auth_response()) + elif self.state == 4: # send positive response to the 'connect' + self.send_raw(self._get_request_buff(self.sha_msg, 0x00)) + elif self.state == 7: + if self.file_props['paused']: + self.file_props['continue_cb'] = self.continue_paused_transfer + self.idlequeue.plug_idle(self, False, False) + return + result = self.start_transfer() # send + self.queue.process_result(result, self) + if result is None or result <= 0: + self.disconnect() + return + self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT) + elif self.state == 8: + self.disconnect() + return + else: + self.disconnect() + if self.state < 5: + self.state += 1 + # unplug and plug this time for reading + self.idlequeue.plug_idle(self, False, True) + + def clnt_pollout(self): + self.idlequeue.remove_timeout(self.fd) + try: + if self.state == 0: + self.do_connect() + return + elif self.state == 1: # send initially: version and auth types + self.send_raw(self._get_auth_buff()) + elif self.state == 3: # send 'connect' request + self.send_raw(self._get_request_buff(self._get_sha1_auth())) + elif self.file_props['type'] != 'r': + if self.file_props['paused']: + self.idlequeue.plug_idle(self, False, False) + return + result = self.start_transfer() # send + self.queue.process_result(result, self) + return + except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError, + OpenSSL.SSL.WantX509LookupError), e: + log.info('caught SSL exception, ignored') + return + self.state += 1 + # unplug and plug for reading + self.idlequeue.plug_idle(self, False, True) + self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT) + + + def pollin(self): + + if self.mode == 'client': + self.clnt_pollin() + elif self.mode == 'server': + self.svr_pollin() + + def svr_pollin(self): + if self.connected: + try: + if self.state < 5: + result = self.svr_main() + if self.state == 4: + self.queue.result_sha(self.sha_msg, self.queue_idx) + if result == -1: + self.disconnect() + + elif self.state == 5: + if self.file_props is not None and self.file_props['type'] == 'r': + result = self.start_transfer() # receive + self.queue.process_result(result, self) + except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError, + OpenSSL.SSL.WantX509LookupError), e: + log.info('caught SSL exception, ignored') + else: + self.disconnect() + + def clnt_pollin(self): + self.idlequeue.remove_timeout(self.fd) + if self.connected: + try: + if self.file_props['paused']: + self.idlequeue.plug_idle(self, False, False) + return + if self.state < 5: + self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT) + result = self.clnt_main(0) + self.queue.process_result(result, self) + elif self.state == 5: # wait for proxy reply + pass + elif self.file_props['type'] == 'r': + self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT) + result = self.start_transfer() # receive + self.queue.process_result(result, self) + except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError, + OpenSSL.SSL.WantX509LookupError), e: + log.info('caught SSL exception, ignored') + return + else: + self.disconnect() + + + def pollend(self): + + if self.mode == 'client': + self.clnt_pollend() + elif self.mode == 'server': + self.svr_pollend() + + def svr_pollend(self): + self.state = 8 # end connection + self.disconnect() + self.file_props['error'] = -1 + self.queue.process_result(-1, self) + + def clnt_pollend(self): + if self.state >= 5: + # error during transfer + self.disconnect() + self.file_props['error'] = -1 + self.queue.process_result(-1, self) + else: + self.queue.reconnect_client(self, self.streamhost) + + def read_timeout(self): + self.idlequeue.remove_timeout(self.fd) + if self.state > 5: + # no activity for foo seconds + if self.file_props['stalled'] == False: + self.file_props['stalled'] = True + self.queue.process_result(-1, self) + #if 'received-len' not in self.file_props: + # self.file_props['received-len'] = 0 + if SEND_TIMEOUT > 0: + self.idlequeue.set_read_timeout(self.fd, SEND_TIMEOUT) + else: + # stop transfer, there is no error code for this + self.pollend() + + else: + if self.mode == 'client': + self.queue.reconnect_client(self, self.streamhost) + def open_file_for_reading(self): if self.file is None: try: @@ -748,95 +1114,41 @@ class Socks5Sender(Socks5, IdleObject): Class for sending file to socket over socks5 """ - def __init__(self, idlequeue, sock_hash, parent, _sock, host=None, - port=None, fingerprint = None): + def __init__(self, idlequeue, sock_hash, parent, mode,_sock, host=None, + port=None, fingerprint = None, connected=True): self.fingerprint = fingerprint self.queue_idx = sock_hash self.queue = parent + self.mode = mode # client or server + self.file_props = {} + self.file_props['paused'] = False Socks5.__init__(self, idlequeue, host, port, None, None, None) self._sock = _sock - if self.fingerprint is not None: - self._sock = OpenSSL.SSL.Connection( - jingle_xtls.get_context('server'), self._sock) - else: - self._sock.setblocking(False) - self.fd = _sock.fileno() - self._recv = _sock.recv - self._send = _sock.send - self.connected = True - self.state = 1 # waiting for first bytes - self.file_props = None - - - def read_timeout(self): - self.idlequeue.remove_timeout(self.fd) - if self.state > 5: - # no activity for foo seconds - if self.file_props['stalled'] == False: - self.file_props['stalled'] = True - self.queue.process_result(-1, self) - if SEND_TIMEOUT > 0: - self.idlequeue.set_read_timeout(self.fd, SEND_TIMEOUT) + + + if _sock is not None: + if self.fingerprint is not None: + self._sock = OpenSSL.SSL.Connection( + jingle_xtls.get_context('server'), self._sock) else: - # stop transfer, there is no error code for this - self.pollend() - - def pollout(self): - if not self.connected: - self.disconnect() - return - self.idlequeue.remove_timeout(self.fd) - if self.state == 2: # send reply with desired auth type - self.send_raw(self._get_auth_response()) - elif self.state == 4: # send positive response to the 'connect' - self.send_raw(self._get_request_buff(self.sha_msg, 0x00)) - elif self.state == 7: - if self.file_props['paused']: - self.file_props['continue_cb'] = self.continue_paused_transfer - self.idlequeue.plug_idle(self, False, False) - return - result = self.write_next() - self.queue.process_result(result, self) - if result is None or result <= 0: - self.disconnect() - return - self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT) - elif self.state == 8: - self.disconnect() - return - else: - self.disconnect() - if self.state < 5: - self.state += 1 - # unplug and plug this time for reading - self.idlequeue.plug_idle(self, False, True) - - def pollend(self): - self.state = 8 # end connection - self.disconnect() - self.file_props['error'] = -1 - self.queue.process_result(-1, self) - - def pollin(self): - if self.connected: - try: - if self.state < 5: - result = self.main() - if self.state == 4: - self.queue.result_sha(self.sha_msg, self.queue_idx) - if result == -1: - self.disconnect() - - elif self.state == 5: - if self.file_props is not None and self.file_props['type'] == 'r': - result = self.get_file_contents(0) - self.queue.process_result(result, self) - except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError, - OpenSSL.SSL.WantX509LookupError), e: - log.info('caught SSL exception, ignored') - else: - self.disconnect() + self._sock.setblocking(False) + + self.fd = _sock.fileno() + self._recv = _sock.recv + self._send = _sock.send + self.connected = connected + self.state = 1 # waiting for first bytes + self.connect_timeout = 0 + + def start_transfer(self): + """ + Send the file + """ + return self.write_next() + + + def send_file(self): """ Start sending the file over verified connection @@ -860,27 +1172,6 @@ class Socks5Sender(Socks5, IdleObject): self.idlequeue.plug_idle(self, True, False) return self.write_next() # initial for nl byte - def main(self): - """ - Initial requests for verifying the connection - """ - if self.state == 1: # initial read - buff = self.receive() - if not self.connected: - return -1 - mechs = self._parse_auth_buff(buff) - if mechs is None: - return -1 # invalid auth methods received - elif self.state == 3: # get next request - buff = self.receive() - req_type, self.sha_msg = self._parse_request_buff(buff)[:2] - if req_type != 0x01: - return -1 # request is not of type 'connect' - self.state += 1 # go to the next step - # unplug & plug for writing - self.idlequeue.plug_idle(self, True, False) - return None - def disconnect(self, cb=True): """ Close the socket @@ -991,7 +1282,8 @@ class Socks5Listener(IdleObject): return _sock class Socks5Receiver(Socks5, IdleObject): - def __init__(self, idlequeue, streamhost, sid, file_props = None, fingerprint=None): + def __init__(self, idlequeue, streamhost, sid, mode, file_props = None, + fingerprint=None): """ fingerprint: fingerprint of certificates we shall use, set to None if TLS connection not desired """ @@ -1012,221 +1304,41 @@ class Socks5Receiver(Socks5, IdleObject): self.file_props['paused'] = False self.file_props['continue_cb'] = self.continue_paused_transfer self.file_props['stalled'] = False + self.mode = mode # client or server Socks5.__init__(self, idlequeue, streamhost['host'], int(streamhost['port']), streamhost['initiator'], streamhost['target'], sid) - def read_timeout(self): - self.idlequeue.remove_timeout(self.fd) - if self.state > 5: - # no activity for foo seconds - if self.file_props['stalled'] == False: - self.file_props['stalled'] = True - if 'received-len' not in self.file_props: - self.file_props['received-len'] = 0 - self.queue.process_result(-1, self) - if READ_TIMEOUT > 0: - self.idlequeue.set_read_timeout(self.fd, READ_TIMEOUT) - else: - # stop transfer, there is no error code for this - self.pollend() - else: - self.queue.reconnect_receiver(self, self.streamhost) - def connect(self): + def receive_file(self): """ - Create the socket and plug it to the idlequeue + Start receiving the file over verified connection """ - if self.ais is None: - return None - - for ai in self.ais: - try: - self._sock = socket.socket(*ai[:3]) - if not self.fingerprint is None: - self._sock = OpenSSL.SSL.Connection( - jingle_xtls.get_context('client'), self._sock) - # this will not block the GUI - self._sock.setblocking(False) - self._server = ai[4] - break - except socket.error, e: - if not isinstance(e, basestring) and e[0] == EINPROGRESS: - break - # for all other errors, we try other addresses - continue - self.fd = self._sock.fileno() - self.state = 0 # about to be connected - self.idlequeue.plug_idle(self, True, False) - self.do_connect() - self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT) - return None - - def _is_connected(self): - if self.state < 5: - return False - return True - - def pollout(self): - self.idlequeue.remove_timeout(self.fd) - try: - if self.state == 0: - self.do_connect() - return - elif self.state == 1: # send initially: version and auth types - self.send_raw(self._get_auth_buff()) - elif self.state == 3: # send 'connect' request - self.send_raw(self._get_request_buff(self._get_sha1_auth())) - elif self.file_props['type'] != 'r': - if self.file_props['paused']: - self.idlequeue.plug_idle(self, False, False) - return - result = self.write_next() - self.queue.process_result(result, self) - return - except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError, - OpenSSL.SSL.WantX509LookupError), e: - log.info('caught SSL exception, ignored') + if self.file_props['started']: return - self.state += 1 - # unplug and plug for reading - self.idlequeue.plug_idle(self, False, True) - self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT) - - def pollend(self): - if self.state >= 5: - # error during transfer - self.disconnect() - self.file_props['error'] = -1 - self.queue.process_result(-1, self) - else: - self.queue.reconnect_receiver(self, self.streamhost) - - def pollin(self): - self.idlequeue.remove_timeout(self.fd) - if self.connected: - try: - if self.file_props['paused']: - self.idlequeue.plug_idle(self, False, False) - return - if self.state < 5: - self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT) - result = self.main(0) - self.queue.process_result(result, self) - elif self.state == 5: # wait for proxy reply - pass - elif self.file_props['type'] == 'r': - self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT) - result = self.get_file_contents(0) - self.queue.process_result(result, self) - except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError, - OpenSSL.SSL.WantX509LookupError), e: - log.info('caught SSL exception, ignored') - return - else: - self.disconnect() - - def do_connect(self): - try: - self._sock.connect(self._server) - self._sock.setblocking(False) - self._send=self._sock.send - self._recv=self._sock.recv - except Exception, ee: - errnum = ee[0] - self.connect_timeout += 1 - if errnum == 111 or self.connect_timeout > 1000: - self.queue._connection_refused(self.streamhost, - self.file_props, self.queue_idx) - return None - # win32 needs this - elif errnum not in (10056, EISCONN) or self.state != 0: - return None - else: # socket is already connected - self._sock.setblocking(False) - self._send=self._sock.send - self._recv=self._sock.recv - self.buff = '' - self.connected = True - self.file_props['connected'] = True + self.file_props['error'] = 0 self.file_props['disconnect_cb'] = self.disconnect - self.state = 1 # connected + self.file_props['started'] = True + self.file_props['completed'] = False + self.file_props['paused'] = False + self.file_props['continue_cb'] = self.continue_paused_transfer + self.file_props['stalled'] = False + self.file_props['connected'] = True + self.file_props['elapsed-time'] = 0 + self.file_props['last-time'] = self.idlequeue.current_time() + self.file_props['received-len'] = 0 + self.pauses = 0 + self.state = 7 + # plug for reading + self.idlequeue.plug_idle(self, False, True) + return self.get_file_contents(0) # initial for nl byte - # stop all others connections to sender's streamhosts - self.queue._socket_connected(self.streamhost, self.file_props) - self.idlequeue.plug_idle(self, True, False) - return 1 # we are connected - - def main(self, timeout=0): + def start_transfer(self): """ - Begin negotiation. on success 'address' != 0 + Receive the file """ - result = 1 - buff = self.receive() - if buff == '': - # end connection - self.pollend() - return + return self.get_file_contents(0) - if self.state == 2: # read auth response - if buff is None or len(buff) != 2: - return None - version, method = struct.unpack('!BB', buff[:2]) - if version != 0x05 or method == 0xff: - self.disconnect() - elif self.state == 4: # get approve of our request - if buff is None: - return None - sub_buff = buff[:4] - if len(sub_buff) < 4: - return None - version, address_type = struct.unpack('!BxxB', buff[:4]) - addrlen = 0 - if address_type == 0x03: - addrlen = ord(buff[4]) - address = struct.unpack('!%ds' % addrlen, buff[5:addrlen + 5]) - portlen = len(buff[addrlen + 5:]) - if portlen == 1: - port, = struct.unpack('!B', buff[addrlen + 5]) - elif portlen == 2: - port, = struct.unpack('!H', buff[addrlen + 5:]) - else: # Gaim bug :) - port, = struct.unpack('!H', buff[addrlen + 5:addrlen + 7]) - self.remaining_buff = buff[addrlen + 7:] - self.state = 5 # for senders: init file_props and send '\n' - if self.queue.on_success: - result = self.queue.send_success_reply(self.file_props, - self.streamhost) - if result == 0: - self.state = 8 - self.disconnect() - - # for senders: init file_props - if result == 1 and self.state == 5: - if self.file_props['type'] == 's': - self.file_props['error'] = 0 - self.file_props['disconnect_cb'] = self.disconnect - self.file_props['started'] = True - self.file_props['completed'] = False - self.file_props['paused'] = False - self.file_props['stalled'] = False - self.file_props['elapsed-time'] = 0 - self.file_props['last-time'] = self.idlequeue.current_time() - self.file_props['received-len'] = 0 - self.pauses = 0 - # start sending file contents to socket - self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT) - self.idlequeue.plug_idle(self, True, False) - else: - # receiving file contents from socket - self.idlequeue.plug_idle(self, False, True) - self.file_props['continue_cb'] = self.continue_paused_transfer - # we have set up the connection, next - retrieve file - self.state = 6 - if self.state < 5: - self.idlequeue.plug_idle(self, True, False) - self.state += 1 - return None def disconnect(self, cb=True): """