bidirectional jingle FT with socks5

This commit is contained in:
Jefry Lagrange 2011-08-08 19:43:07 -04:00
parent 92b932ca07
commit 9f3d472564
2 changed files with 486 additions and 359 deletions

View File

@ -51,6 +51,7 @@ class JingleFileTransfer(JingleContent):
self.callbacks['session-terminate'] += [self.__on_session_terminate]
self.callbacks['transport-accept'] += [self.__on_transport_accept]
self.callbacks['transport-replace'] += [self.__on_transport_replace]
self.callbacks['session-accept-sent'] += [self._listen_host]
# fallback transport method
self.callbacks['transport-reject'] += [self.__on_transport_reject]
self.callbacks['transport-info'] += [self.__on_transport_info]
@ -117,7 +118,8 @@ class JingleFileTransfer(JingleContent):
for host in self.file_props['streamhosts']:
host['initiator'] = self.session.initiator
host['target'] = self.session.responder
host['sid'] = self.file_props['sid']
response = stanza.buildReply('result')
con.connection.send(response)
@ -130,7 +132,8 @@ class JingleFileTransfer(JingleContent):
fingerprint = 'client'
gajim.socks5queue.connect_to_hosts(self.session.connection.name,
self.file_props['sid'], self.send_candidate_used,
self._on_connect_error, fingerprint=fingerprint)
self._on_connect_error, fingerprint=fingerprint,
receiving=False)
raise xmpp.NodeProcessed
@ -195,25 +198,8 @@ class JingleFileTransfer(JingleContent):
self.state = STATE_INITIALIZED
self.session.connection.files_props[self.file_props['sid']] = \
self.file_props
receiver = self.file_props['receiver']
sender = self.file_props['sender']
sha_str = helpers.get_auth_sha(self.file_props['sid'], sender,
receiver)
self.file_props['sha_str'] = sha_str
port = gajim.config.get('file_transfers_port')
fingerprint = None
if self.use_security:
fingerprint = 'server'
listener = gajim.socks5queue.start_listener(port, sha_str,
self._store_socks5_sid, self.file_props['sid'],
fingerprint=fingerprint)
if not listener:
return
# send error message, notify the user
# Listen on configured port for file transfer
self._listen_host()
elif not self.weinitiate and self.state == STATE_NOT_STARTED:
# session-accept iq-result
if not self.negotiated:
@ -295,7 +281,36 @@ class JingleFileTransfer(JingleContent):
def _store_socks5_sid(self, sid, hash_id):
# callback from socsk5queue.start_listener
self.file_props['hash'] = hash_id
def _listen_host(self, stanza=None, content=None, error=None
, action=None):
receiver = self.file_props['receiver']
sender = self.file_props['sender']
sha_str = helpers.get_auth_sha(self.file_props['sid'], sender,
receiver)
self.file_props['sha_str'] = sha_str
port = gajim.config.get('file_transfers_port')
fingerprint = None
if self.use_security:
fingerprint = 'server'
return
if self.weinitiate:
listener = gajim.socks5queue.start_listener(port, sha_str,
self._store_socks5_sid, self.file_props['sid'],
fingerprint=fingerprint, type='sender')
else:
listener = gajim.socks5queue.start_listener(port, sha_str,
self._store_socks5_sid, self.file_props['sid'],
fingerprint=fingerprint, type='receiver')
if not listener:
# send error message, notify the user
return
def get_content(desc):
return JingleFileTransfer

View File

@ -84,11 +84,13 @@ class SocksQueue:
self.on_success = {} # {id: cb}
self.on_failure = {} # {id: cb}
def start_listener(self, port, sha_str, sha_handler, sid, fingerprint=None):
def start_listener(self, port, sha_str, sha_handler, sid, fingerprint=None,
type='sender'):
"""
Start waiting for incomming connections on (host, port) and do a socks5
authentication using sid for generated SHA
"""
self.type = type # It says whether we are sending or receiving
self.sha_handlers[sha_str] = (sha_handler, sid)
if self.listener is None:
self.listener = Socks5Listener(self.idlequeue, port, fingerprint=fingerprint)
@ -123,7 +125,7 @@ class SocksQueue:
return 0
def connect_to_hosts(self, account, sid, on_success=None, on_failure=None,
fingerprint=None):
fingerprint=None, receiving=True):
self.on_success[sid] = on_success
self.on_failure[sid] = on_failure
file_props = self.files_props[account][sid]
@ -135,38 +137,59 @@ class SocksQueue:
fp = None
else:
fp = fingerprint
receiver = Socks5Receiver(self.idlequeue, streamhost, sid,
file_props, fingerprint=fp)
self.add_receiver(account, receiver)
streamhost['idx'] = receiver.queue_idx
if receiving:
self.type = 'receiver'
socks5obj = Socks5Receiver(self.idlequeue, streamhost, sid,
'client', file_props,
fingerprint=fp)
self.add_sockobj(account, socks5obj)
else:
self.type = 'sender'
socks5obj = Socks5Sender(self.idlequeue,
file_props['sha_str'], self, mode='client' ,
_sock=None, host=str(streamhost['host']),
port=int(streamhost['port']), fingerprint=fp,
connected=False)
socks5obj.file_props = file_props
socks5obj.streamhost = streamhost
self.add_sockobj(account, socks5obj, type='sender')
socks5obj.file_props = file_props
streamhost['idx'] = socks5obj.queue_idx
def _socket_connected(self, streamhost, file_props):
"""
Called when there is a host connected to one of the senders's
streamhosts. Stop othere attempts for connections
streamhosts. Stop other attempts for connections
"""
for host in file_props['streamhosts']:
if host != streamhost and 'idx' in host:
if host['state'] == 1:
# remove current
self.remove_receiver(streamhost['idx'])
if self.type == 'sender':
self.remove_sender(streamhost['idx'], False)
else:
self.remove_receiver(streamhost['idx'])
return
# set state -2, meaning that this streamhost is stopped,
# but it may be connectected later
if host['state'] >= 0:
self.remove_receiver(host['idx'])
if self.type == 'sender':
self.remove_sender(host['idx'], False)
else:
self.remove_receiver(host['idx'])
host['idx'] = -1
host['state'] = -2
def reconnect_receiver(self, receiver, streamhost):
def reconnect_client(self, client, streamhost):
"""
Check the state of all streamhosts and if all has failed, then emit
connection failure cb. If there are some which are still not connected
try to establish connection to one of them
"""
self.idlequeue.remove_timeout(receiver.fd)
self.idlequeue.unplug_idle(receiver.fd)
file_props = receiver.file_props
self.idlequeue.remove_timeout(client.fd)
self.idlequeue.unplug_idle(client.fd)
file_props = client.file_props
streamhost['state'] = -1
# boolean, indicates that there are hosts, which are not tested yet
unused_hosts = False
@ -180,20 +203,22 @@ class SocksQueue:
for host in file_props['streamhosts']:
if host['state'] == -2:
host['state'] = 0
receiver = Socks5Receiver(self.idlequeue, host, host['sid'],
file_props)
self.add_receiver(receiver.account, receiver)
host['idx'] = receiver.queue_idx
# FIXME: make the sender reconnect also
print 'reconnecting using socks receiver'
client = Socks5Receiver(self.idlequeue, host, host['sid'],
'client',file_props)
self.add_sockobj(client.account, client)
host['idx'] = client.queue_idx
# we still have chances to connect
return
if 'received-len' not in file_props or file_props['received-len'] == 0:
# there are no other streamhosts and transfer hasn't started
self._connection_refused(streamhost, file_props, receiver.queue_idx)
self._connection_refused(streamhost, file_props, client.queue_idx)
else:
# transfer stopped, it is most likely stopped from sender
receiver.disconnect()
client.disconnect()
file_props['error'] = -1
self.process_result(-1, receiver)
self.process_result(-1, client)
def _connection_refused(self, streamhost, file_props, idx):
"""
@ -213,20 +238,23 @@ class SocksQueue:
file_props['sid'], code = 404)
del(file_props['failure_cb'])
def add_receiver(self, account, sock5_receiver):
def add_sockobj(self, account, sockobj, type='receiver'):
"""
Add new file request
Add new file a sockobj type receiver or sendder
"""
self.readers[self.idx] = sock5_receiver
sock5_receiver.queue_idx = self.idx
sock5_receiver.queue = self
sock5_receiver.account = account
if type == 'receiver':
self.readers[self.idx] = sockobj
else:
self.senders[self.idx] = sockobj
sockobj.queue_idx = self.idx
sockobj.queue = self
sockobj.account = account
self.idx += 1
result = sock5_receiver.connect()
result = sockobj.connect()
self.connected += 1
if result is not None:
result = sock5_receiver.main()
self.process_result(result, sock5_receiver)
result = sockobj.main()
self.process_result(result, sockobj)
return 1
return None
@ -325,13 +353,26 @@ class SocksQueue:
def on_connection_accepted(self, sock):
sock_hash = sock.__hash__()
if sock_hash not in self.senders:
if self.type == 'sender' and (sock_hash not in self.senders):
self.senders[sock_hash] = Socks5Sender(self.idlequeue, sock_hash, self,
sock[0], sock[1][0], sock[1][1], fingerprint='server')
sock[0], 'server', sock[1][0], sock[1][1],
fingerprint='server')
# Start waiting for data
self.idlequeue.plug_idle(self.senders[sock_hash], False, True)
self.connected += 1
if self.type == 'receiver' and (sock_hash not in self.readers):
sh = {}
sh['host'] = sock[1][0]
sh['port'] = sock[1][1]
sh['initiator'] = None
sh['target'] = None
self.readers[sock_hash] = Socks5Receiver(idlequeue=self.idlequeue,
streamhost=sh,sid=None, file_props=None,
mode='server',fingerprint='server')
self.readers[sock_hash].set_sock(sock[0])
self.connected += 1
def process_result(self, result, actor):
"""
Take appropriate actions upon the result:
@ -373,10 +414,13 @@ class SocksQueue:
"""
if idx != -1:
if idx in self.senders:
sender = self.senders[idx]
if do_disconnect:
self.senders[idx].disconnect()
return
else:
self.idlequeue.unplug_idle(sender.fd)
self.idlequeue.remove_timeout(sender.fd)
del(self.senders[idx])
if self.connected > 0:
self.connected -= 1
@ -407,7 +451,329 @@ class Socks5:
self.size = 0
self.remaining_buff = ''
self.file = None
self.connected = False
def start_transfer(self):
"""
Must be implemented by subclass.
"""
pass
def _is_connected(self):
if self.state < 5:
return False
return True
def connect(self):
"""
Create the socket and plug it to the idlequeue
"""
if self.ais is None:
return None
for ai in self.ais:
try:
self._sock = socket.socket(*ai[:3])
if not self.fingerprint is None:
self._sock = OpenSSL.SSL.Connection(
jingle_xtls.get_context('client'), self._sock)
# this will not block the GUI
self._sock.setblocking(False)
self._server = ai[4]
break
except socket.error, e:
if not isinstance(e, basestring) and e[0] == EINPROGRESS:
break
# for all other errors, we try other addresses
continue
self.fd = self._sock.fileno()
self.state = 0 # about to be connected
self.idlequeue.plug_idle(self, True, False)
self.do_connect()
self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT)
return None
def do_connect(self):
try:
self._sock.connect(self._server)
self._sock.setblocking(False)
self._send=self._sock.send
self._recv=self._sock.recv
except Exception, ee:
errnum = ee[0]
self.connect_timeout += 1
if errnum == 111 or self.connect_timeout > 1000:
self.queue._connection_refused(self.streamhost,
self.file_props, self.queue_idx)
self.connected = False
return None
# win32 needs this
elif errnum not in (10056, EISCONN) or self.state != 0:
return None
else: # socket is already connected
self._sock.setblocking(False)
self._send=self._sock.send
self._recv=self._sock.recv
self.buff = ''
self.connected = True
self.file_props['connected'] = True
self.file_props['disconnect_cb'] = self.disconnect
self.file_props['paused'] = False
self.state = 1 # connected
# stop all others connections to sender's streamhosts
self.queue._socket_connected(self.streamhost, self.file_props)
self.idlequeue.plug_idle(self, True, False)
return 1 # we are connected
def svr_main(self):
"""
Initial requests for verifying the connection
"""
if self.state == 1: # initial read
buff = self.receive()
if not self.connected:
return -1
mechs = self._parse_auth_buff(buff)
if mechs is None:
return -1 # invalid auth methods received
elif self.state == 3: # get next request
buff = self.receive()
req_type, self.sha_msg = self._parse_request_buff(buff)[:2]
if req_type != 0x01:
return -1 # request is not of type 'connect'
self.state += 1 # go to the next step
# unplug & plug for writing
self.idlequeue.plug_idle(self, True, False)
return None
def clnt_main(self, timeout=0):
"""
Begin negotiation. on success 'address' != 0
"""
result = 1
buff = self.receive()
if buff == '':
# end connection
self.pollend()
return
if self.state == 2: # read auth response
if buff is None or len(buff) != 2:
return None
version, method = struct.unpack('!BB', buff[:2])
if version != 0x05 or method == 0xff:
self.disconnect()
elif self.state == 4: # get approve of our request
if buff is None:
return None
sub_buff = buff[:4]
if len(sub_buff) < 4:
return None
version, address_type = struct.unpack('!BxxB', buff[:4])
addrlen = 0
if address_type == 0x03:
addrlen = ord(buff[4])
address = struct.unpack('!%ds' % addrlen, buff[5:addrlen + 5])
portlen = len(buff[addrlen + 5:])
if portlen == 1:
port, = struct.unpack('!B', buff[addrlen + 5])
elif portlen == 2:
port, = struct.unpack('!H', buff[addrlen + 5:])
else: # Gaim bug :)
port, = struct.unpack('!H', buff[addrlen + 5:addrlen + 7])
self.remaining_buff = buff[addrlen + 7:]
self.state = 5 # for senders: init file_props and send '\n'
if self.queue.on_success:
result = self.queue.send_success_reply(self.file_props,
self.streamhost)
if result == 0:
self.state = 8
self.disconnect()
# for senders: init file_props
if result == 1 and self.state == 5:
if self.file_props['type'] == 's':
self.file_props['error'] = 0
self.file_props['disconnect_cb'] = self.disconnect
self.file_props['started'] = True
self.file_props['completed'] = False
self.file_props['paused'] = False
self.file_props['stalled'] = False
self.file_props['elapsed-time'] = 0
self.file_props['last-time'] = self.idlequeue.current_time()
self.file_props['received-len'] = 0
self.pauses = 0
# start sending file contents to socket
self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT)
self.idlequeue.plug_idle(self, True, False)
else:
# receiving file contents from socket
self.idlequeue.plug_idle(self, False, True)
self.file_props['continue_cb'] = self.continue_paused_transfer
# we have set up the connection, next - retrieve file
self.state = 6
if self.state < 5:
self.idlequeue.plug_idle(self, True, False)
self.state += 1
return None
def pollout(self):
if self.mode == 'client':
self.clnt_pollout()
elif self.mode == 'server':
self.svr_pollout()
def svr_pollout(self):
if not self.connected:
self.disconnect()
return
self.idlequeue.remove_timeout(self.fd)
if self.state == 2: # send reply with desired auth type
self.send_raw(self._get_auth_response())
elif self.state == 4: # send positive response to the 'connect'
self.send_raw(self._get_request_buff(self.sha_msg, 0x00))
elif self.state == 7:
if self.file_props['paused']:
self.file_props['continue_cb'] = self.continue_paused_transfer
self.idlequeue.plug_idle(self, False, False)
return
result = self.start_transfer() # send
self.queue.process_result(result, self)
if result is None or result <= 0:
self.disconnect()
return
self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT)
elif self.state == 8:
self.disconnect()
return
else:
self.disconnect()
if self.state < 5:
self.state += 1
# unplug and plug this time for reading
self.idlequeue.plug_idle(self, False, True)
def clnt_pollout(self):
self.idlequeue.remove_timeout(self.fd)
try:
if self.state == 0:
self.do_connect()
return
elif self.state == 1: # send initially: version and auth types
self.send_raw(self._get_auth_buff())
elif self.state == 3: # send 'connect' request
self.send_raw(self._get_request_buff(self._get_sha1_auth()))
elif self.file_props['type'] != 'r':
if self.file_props['paused']:
self.idlequeue.plug_idle(self, False, False)
return
result = self.start_transfer() # send
self.queue.process_result(result, self)
return
except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError,
OpenSSL.SSL.WantX509LookupError), e:
log.info('caught SSL exception, ignored')
return
self.state += 1
# unplug and plug for reading
self.idlequeue.plug_idle(self, False, True)
self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT)
def pollin(self):
if self.mode == 'client':
self.clnt_pollin()
elif self.mode == 'server':
self.svr_pollin()
def svr_pollin(self):
if self.connected:
try:
if self.state < 5:
result = self.svr_main()
if self.state == 4:
self.queue.result_sha(self.sha_msg, self.queue_idx)
if result == -1:
self.disconnect()
elif self.state == 5:
if self.file_props is not None and self.file_props['type'] == 'r':
result = self.start_transfer() # receive
self.queue.process_result(result, self)
except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError,
OpenSSL.SSL.WantX509LookupError), e:
log.info('caught SSL exception, ignored')
else:
self.disconnect()
def clnt_pollin(self):
self.idlequeue.remove_timeout(self.fd)
if self.connected:
try:
if self.file_props['paused']:
self.idlequeue.plug_idle(self, False, False)
return
if self.state < 5:
self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT)
result = self.clnt_main(0)
self.queue.process_result(result, self)
elif self.state == 5: # wait for proxy reply
pass
elif self.file_props['type'] == 'r':
self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT)
result = self.start_transfer() # receive
self.queue.process_result(result, self)
except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError,
OpenSSL.SSL.WantX509LookupError), e:
log.info('caught SSL exception, ignored')
return
else:
self.disconnect()
def pollend(self):
if self.mode == 'client':
self.clnt_pollend()
elif self.mode == 'server':
self.svr_pollend()
def svr_pollend(self):
self.state = 8 # end connection
self.disconnect()
self.file_props['error'] = -1
self.queue.process_result(-1, self)
def clnt_pollend(self):
if self.state >= 5:
# error during transfer
self.disconnect()
self.file_props['error'] = -1
self.queue.process_result(-1, self)
else:
self.queue.reconnect_client(self, self.streamhost)
def read_timeout(self):
self.idlequeue.remove_timeout(self.fd)
if self.state > 5:
# no activity for foo seconds
if self.file_props['stalled'] == False:
self.file_props['stalled'] = True
self.queue.process_result(-1, self)
#if 'received-len' not in self.file_props:
# self.file_props['received-len'] = 0
if SEND_TIMEOUT > 0:
self.idlequeue.set_read_timeout(self.fd, SEND_TIMEOUT)
else:
# stop transfer, there is no error code for this
self.pollend()
else:
if self.mode == 'client':
self.queue.reconnect_client(self, self.streamhost)
def open_file_for_reading(self):
if self.file is None:
try:
@ -748,95 +1114,41 @@ class Socks5Sender(Socks5, IdleObject):
Class for sending file to socket over socks5
"""
def __init__(self, idlequeue, sock_hash, parent, _sock, host=None,
port=None, fingerprint = None):
def __init__(self, idlequeue, sock_hash, parent, mode,_sock, host=None,
port=None, fingerprint = None, connected=True):
self.fingerprint = fingerprint
self.queue_idx = sock_hash
self.queue = parent
self.mode = mode # client or server
self.file_props = {}
self.file_props['paused'] = False
Socks5.__init__(self, idlequeue, host, port, None, None, None)
self._sock = _sock
if self.fingerprint is not None:
self._sock = OpenSSL.SSL.Connection(
jingle_xtls.get_context('server'), self._sock)
else:
self._sock.setblocking(False)
self.fd = _sock.fileno()
self._recv = _sock.recv
self._send = _sock.send
self.connected = True
self.state = 1 # waiting for first bytes
self.file_props = None
def read_timeout(self):
self.idlequeue.remove_timeout(self.fd)
if self.state > 5:
# no activity for foo seconds
if self.file_props['stalled'] == False:
self.file_props['stalled'] = True
self.queue.process_result(-1, self)
if SEND_TIMEOUT > 0:
self.idlequeue.set_read_timeout(self.fd, SEND_TIMEOUT)
if _sock is not None:
if self.fingerprint is not None:
self._sock = OpenSSL.SSL.Connection(
jingle_xtls.get_context('server'), self._sock)
else:
# stop transfer, there is no error code for this
self.pollend()
def pollout(self):
if not self.connected:
self.disconnect()
return
self.idlequeue.remove_timeout(self.fd)
if self.state == 2: # send reply with desired auth type
self.send_raw(self._get_auth_response())
elif self.state == 4: # send positive response to the 'connect'
self.send_raw(self._get_request_buff(self.sha_msg, 0x00))
elif self.state == 7:
if self.file_props['paused']:
self.file_props['continue_cb'] = self.continue_paused_transfer
self.idlequeue.plug_idle(self, False, False)
return
result = self.write_next()
self.queue.process_result(result, self)
if result is None or result <= 0:
self.disconnect()
return
self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT)
elif self.state == 8:
self.disconnect()
return
else:
self.disconnect()
if self.state < 5:
self.state += 1
# unplug and plug this time for reading
self.idlequeue.plug_idle(self, False, True)
def pollend(self):
self.state = 8 # end connection
self.disconnect()
self.file_props['error'] = -1
self.queue.process_result(-1, self)
def pollin(self):
if self.connected:
try:
if self.state < 5:
result = self.main()
if self.state == 4:
self.queue.result_sha(self.sha_msg, self.queue_idx)
if result == -1:
self.disconnect()
elif self.state == 5:
if self.file_props is not None and self.file_props['type'] == 'r':
result = self.get_file_contents(0)
self.queue.process_result(result, self)
except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError,
OpenSSL.SSL.WantX509LookupError), e:
log.info('caught SSL exception, ignored')
else:
self.disconnect()
self._sock.setblocking(False)
self.fd = _sock.fileno()
self._recv = _sock.recv
self._send = _sock.send
self.connected = connected
self.state = 1 # waiting for first bytes
self.connect_timeout = 0
def start_transfer(self):
"""
Send the file
"""
return self.write_next()
def send_file(self):
"""
Start sending the file over verified connection
@ -860,27 +1172,6 @@ class Socks5Sender(Socks5, IdleObject):
self.idlequeue.plug_idle(self, True, False)
return self.write_next() # initial for nl byte
def main(self):
"""
Initial requests for verifying the connection
"""
if self.state == 1: # initial read
buff = self.receive()
if not self.connected:
return -1
mechs = self._parse_auth_buff(buff)
if mechs is None:
return -1 # invalid auth methods received
elif self.state == 3: # get next request
buff = self.receive()
req_type, self.sha_msg = self._parse_request_buff(buff)[:2]
if req_type != 0x01:
return -1 # request is not of type 'connect'
self.state += 1 # go to the next step
# unplug & plug for writing
self.idlequeue.plug_idle(self, True, False)
return None
def disconnect(self, cb=True):
"""
Close the socket
@ -991,7 +1282,8 @@ class Socks5Listener(IdleObject):
return _sock
class Socks5Receiver(Socks5, IdleObject):
def __init__(self, idlequeue, streamhost, sid, file_props = None, fingerprint=None):
def __init__(self, idlequeue, streamhost, sid, mode, file_props = None,
fingerprint=None):
"""
fingerprint: fingerprint of certificates we shall use, set to None if TLS connection not desired
"""
@ -1012,221 +1304,41 @@ class Socks5Receiver(Socks5, IdleObject):
self.file_props['paused'] = False
self.file_props['continue_cb'] = self.continue_paused_transfer
self.file_props['stalled'] = False
self.mode = mode # client or server
Socks5.__init__(self, idlequeue, streamhost['host'],
int(streamhost['port']), streamhost['initiator'], streamhost['target'],
sid)
def read_timeout(self):
self.idlequeue.remove_timeout(self.fd)
if self.state > 5:
# no activity for foo seconds
if self.file_props['stalled'] == False:
self.file_props['stalled'] = True
if 'received-len' not in self.file_props:
self.file_props['received-len'] = 0
self.queue.process_result(-1, self)
if READ_TIMEOUT > 0:
self.idlequeue.set_read_timeout(self.fd, READ_TIMEOUT)
else:
# stop transfer, there is no error code for this
self.pollend()
else:
self.queue.reconnect_receiver(self, self.streamhost)
def connect(self):
def receive_file(self):
"""
Create the socket and plug it to the idlequeue
Start receiving the file over verified connection
"""
if self.ais is None:
return None
for ai in self.ais:
try:
self._sock = socket.socket(*ai[:3])
if not self.fingerprint is None:
self._sock = OpenSSL.SSL.Connection(
jingle_xtls.get_context('client'), self._sock)
# this will not block the GUI
self._sock.setblocking(False)
self._server = ai[4]
break
except socket.error, e:
if not isinstance(e, basestring) and e[0] == EINPROGRESS:
break
# for all other errors, we try other addresses
continue
self.fd = self._sock.fileno()
self.state = 0 # about to be connected
self.idlequeue.plug_idle(self, True, False)
self.do_connect()
self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT)
return None
def _is_connected(self):
if self.state < 5:
return False
return True
def pollout(self):
self.idlequeue.remove_timeout(self.fd)
try:
if self.state == 0:
self.do_connect()
return
elif self.state == 1: # send initially: version and auth types
self.send_raw(self._get_auth_buff())
elif self.state == 3: # send 'connect' request
self.send_raw(self._get_request_buff(self._get_sha1_auth()))
elif self.file_props['type'] != 'r':
if self.file_props['paused']:
self.idlequeue.plug_idle(self, False, False)
return
result = self.write_next()
self.queue.process_result(result, self)
return
except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError,
OpenSSL.SSL.WantX509LookupError), e:
log.info('caught SSL exception, ignored')
if self.file_props['started']:
return
self.state += 1
# unplug and plug for reading
self.idlequeue.plug_idle(self, False, True)
self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT)
def pollend(self):
if self.state >= 5:
# error during transfer
self.disconnect()
self.file_props['error'] = -1
self.queue.process_result(-1, self)
else:
self.queue.reconnect_receiver(self, self.streamhost)
def pollin(self):
self.idlequeue.remove_timeout(self.fd)
if self.connected:
try:
if self.file_props['paused']:
self.idlequeue.plug_idle(self, False, False)
return
if self.state < 5:
self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT)
result = self.main(0)
self.queue.process_result(result, self)
elif self.state == 5: # wait for proxy reply
pass
elif self.file_props['type'] == 'r':
self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT)
result = self.get_file_contents(0)
self.queue.process_result(result, self)
except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantWriteError,
OpenSSL.SSL.WantX509LookupError), e:
log.info('caught SSL exception, ignored')
return
else:
self.disconnect()
def do_connect(self):
try:
self._sock.connect(self._server)
self._sock.setblocking(False)
self._send=self._sock.send
self._recv=self._sock.recv
except Exception, ee:
errnum = ee[0]
self.connect_timeout += 1
if errnum == 111 or self.connect_timeout > 1000:
self.queue._connection_refused(self.streamhost,
self.file_props, self.queue_idx)
return None
# win32 needs this
elif errnum not in (10056, EISCONN) or self.state != 0:
return None
else: # socket is already connected
self._sock.setblocking(False)
self._send=self._sock.send
self._recv=self._sock.recv
self.buff = ''
self.connected = True
self.file_props['connected'] = True
self.file_props['error'] = 0
self.file_props['disconnect_cb'] = self.disconnect
self.state = 1 # connected
self.file_props['started'] = True
self.file_props['completed'] = False
self.file_props['paused'] = False
self.file_props['continue_cb'] = self.continue_paused_transfer
self.file_props['stalled'] = False
self.file_props['connected'] = True
self.file_props['elapsed-time'] = 0
self.file_props['last-time'] = self.idlequeue.current_time()
self.file_props['received-len'] = 0
self.pauses = 0
self.state = 7
# plug for reading
self.idlequeue.plug_idle(self, False, True)
return self.get_file_contents(0) # initial for nl byte
# stop all others connections to sender's streamhosts
self.queue._socket_connected(self.streamhost, self.file_props)
self.idlequeue.plug_idle(self, True, False)
return 1 # we are connected
def main(self, timeout=0):
def start_transfer(self):
"""
Begin negotiation. on success 'address' != 0
Receive the file
"""
result = 1
buff = self.receive()
if buff == '':
# end connection
self.pollend()
return
return self.get_file_contents(0)
if self.state == 2: # read auth response
if buff is None or len(buff) != 2:
return None
version, method = struct.unpack('!BB', buff[:2])
if version != 0x05 or method == 0xff:
self.disconnect()
elif self.state == 4: # get approve of our request
if buff is None:
return None
sub_buff = buff[:4]
if len(sub_buff) < 4:
return None
version, address_type = struct.unpack('!BxxB', buff[:4])
addrlen = 0
if address_type == 0x03:
addrlen = ord(buff[4])
address = struct.unpack('!%ds' % addrlen, buff[5:addrlen + 5])
portlen = len(buff[addrlen + 5:])
if portlen == 1:
port, = struct.unpack('!B', buff[addrlen + 5])
elif portlen == 2:
port, = struct.unpack('!H', buff[addrlen + 5:])
else: # Gaim bug :)
port, = struct.unpack('!H', buff[addrlen + 5:addrlen + 7])
self.remaining_buff = buff[addrlen + 7:]
self.state = 5 # for senders: init file_props and send '\n'
if self.queue.on_success:
result = self.queue.send_success_reply(self.file_props,
self.streamhost)
if result == 0:
self.state = 8
self.disconnect()
# for senders: init file_props
if result == 1 and self.state == 5:
if self.file_props['type'] == 's':
self.file_props['error'] = 0
self.file_props['disconnect_cb'] = self.disconnect
self.file_props['started'] = True
self.file_props['completed'] = False
self.file_props['paused'] = False
self.file_props['stalled'] = False
self.file_props['elapsed-time'] = 0
self.file_props['last-time'] = self.idlequeue.current_time()
self.file_props['received-len'] = 0
self.pauses = 0
# start sending file contents to socket
self.idlequeue.set_read_timeout(self.fd, STALLED_TIMEOUT)
self.idlequeue.plug_idle(self, True, False)
else:
# receiving file contents from socket
self.idlequeue.plug_idle(self, False, True)
self.file_props['continue_cb'] = self.continue_paused_transfer
# we have set up the connection, next - retrieve file
self.state = 6
if self.state < 5:
self.idlequeue.plug_idle(self, True, False)
self.state += 1
return None
def disconnect(self, cb=True):
"""