socks5 classes now use Gajim idlequeue

still there are several TODOs
This commit is contained in:
Dimitur Kirov 2006-02-06 00:11:45 +00:00
parent 149bff2b64
commit f52144c803
4 changed files with 253 additions and 260 deletions

View File

@ -824,7 +824,7 @@ class Connection:
file_props['streamhosts'] = [] file_props['streamhosts'] = []
file_props['streamhosts'].append(proxy) file_props['streamhosts'].append(proxy)
file_props['is_a_proxy'] = True file_props['is_a_proxy'] = True
receiver = socks5.Socks5Receiver(proxy, file_props['sid'], file_props) receiver = socks5.Socks5Receiver(gajim.idlequeue, proxy, file_props['sid'], file_props)
gajim.socks5queue.add_receiver(self.name, receiver) gajim.socks5queue.add_receiver(self.name, receiver)
proxy['idx'] = receiver.queue_idx proxy['idx'] = receiver.queue_idx
gajim.socks5queue.on_success = self.proxy_auth_ok gajim.socks5queue.on_success = self.proxy_auth_ok

View File

@ -36,12 +36,12 @@ import time
from errno import EWOULDBLOCK from errno import EWOULDBLOCK
from errno import ENOBUFS from errno import ENOBUFS
from errno import EINTR from errno import EINTR
from xmpp.idlequeue import IdleObject
MAX_BUFF_LEN = 65536 MAX_BUFF_LEN = 65536
class SocksQueue: class SocksQueue:
''' queue for all file requests objects ''' ''' queue for all file requests objects '''
def __init__(self, complete_transfer_cb = None, progress_transfer_cb = None): def __init__(self, idlequeue, complete_transfer_cb = None, progress_transfer_cb = None):
self.connected = 0 self.connected = 0
self.readers = {} self.readers = {}
self.files_props = {} self.files_props = {}
@ -49,6 +49,9 @@ class SocksQueue:
self.idx = 1 self.idx = 1
self.listener = None self.listener = None
self.sha_handlers = {} self.sha_handlers = {}
# handle all io events in the global idle queue, instead of processing
# each foo seconds
self.idlequeue = idlequeue
self.complete_transfer_cb = complete_transfer_cb self.complete_transfer_cb = complete_transfer_cb
self.progress_transfer_cb = progress_transfer_cb self.progress_transfer_cb = progress_transfer_cb
self.on_success = None self.on_success = None
@ -57,15 +60,17 @@ class SocksQueue:
def start_listener(self, host, port, sha_str, sha_handler, sid): def start_listener(self, host, port, sha_str, sha_handler, sid):
self.sha_handlers[sha_str] = (sha_handler, sid) self.sha_handlers[sha_str] = (sha_handler, sid)
if self.listener == None: if self.listener == None:
self.listener = Socks5Listener(host, port) self.listener = Socks5Listener(self.idlequeue, host, port)
self.listener.queue = self
self.listener.bind() self.listener.bind()
if self.listener.started is False: if self.listener.started is False:
self.listener = None self.listener = None
import sys import sys
sys.stderr.write('\n\n\n========================================\ print >> sys.stderr, '================================================='
========================\nUnable to bind to port %s. \nMaybe you have another \ print >> sys.stderr, 'Unable to bind to port %s.' % port
running instance of Gajim. \nFile Transfer will be canceled.\n==================\ print >> sys.stderr, 'Maybe you have another running instance of Gajim.'
==============================================\n\n\n' % port) print >> sys.stderr, 'File Transfer will be canceled.'
print >> sys.stderr, '================================================='
return None return None
self.connected += 1 self.connected += 1
return self.listener return self.listener
@ -102,7 +107,7 @@ running instance of Gajim. \nFile Transfer will be canceled.\n==================
# add streamhosts to the queue # add streamhosts to the queue
for streamhost in file_props['streamhosts']: for streamhost in file_props['streamhosts']:
receiver = Socks5Receiver(streamhost, sid, file_props) receiver = Socks5Receiver(self.idlequeue, streamhost, sid, file_props)
self.add_receiver(account, receiver) self.add_receiver(account, receiver)
streamhost['idx'] = receiver.queue_idx streamhost['idx'] = receiver.queue_idx
@ -180,6 +185,9 @@ running instance of Gajim. \nFile Transfer will be canceled.\n==================
reader.file_props['last-time'] = time.time() reader.file_props['last-time'] = time.time()
reader.file_props['received-len'] = 0 reader.file_props['received-len'] = 0
reader.pauses = 0 reader.pauses = 0
# start sending file to proxy
# TODO: add timeout for stalled state
self.idlequeue.plug_idle(reader, True, False)
result = reader.write_next() result = reader.write_next()
self.process_result(result, reader) self.process_result(result, reader)
@ -228,80 +236,13 @@ running instance of Gajim. \nFile Transfer will be canceled.\n==================
return fl_props[sid] return fl_props[sid]
return None return None
def process(self, timeout=0): def on_connection_accepted(self, sock):
''' Process all registered connection. sock_hash = sock.__hash__()
they can be receivers, senders and one listener
'''
if self.listener is not None:
if self.listener.pending_connection():
_sock = self.listener.accept_conn()
sock_hash = _sock.__hash__()
if not self.senders.has_key(sock_hash): if not self.senders.has_key(sock_hash):
self.senders[sock_hash] = Socks5Sender(sock_hash, self, self.senders[sock_hash] = Socks5Sender(self.idlequeue,
_sock[0], _sock[1][0], _sock[1][1]) sock_hash, self, sock[0], sock[1][0], sock[1][1])
self.connected += 1 self.connected += 1
for idx in self.senders.keys():
sender = self.senders[idx]
if sender.connected:
if sender.state < 5:
if sender.pending_data(timeout):
result = sender.main()
if sender.state == 4:
self.result_sha(sender.sha_msg, idx)
if result is None:
continue
if result == -1:
sender.disconnect()
elif sender.state == 5:
if sender.file_props is not None and \
sender.file_props['type'] == 'r':
result = sender.get_file_contents(0)
self.process_result(result, sender)
elif sender.state == 7:
while True:
if sender.file_props['paused']:
break
if not sender.connected:
self.process_result(-1, sender)
break
if sender.state == 8:
self.remove_sender(idx)
break
result = sender.write_next()
self.process_result(result, sender)
if result is None or result <= 0:
break
elif sender.state == 8:
self.remove_sender(idx)
else:
self.remove_sender(idx)
keys = self.readers.keys()
for idx in keys:
if not self.readers.has_key(idx):
continue
receiver = self.readers[idx]
if receiver.state == 0:
res = receiver.do_connect()
continue
if receiver.connected:
if receiver.file_props['paused']:
continue
if receiver.state < 5:
pd = receiver.pending_data(0)
if pd:
result = receiver.main(0)
self.process_result(result, receiver)
elif receiver.state == 5: # wait for proxy reply
pass
else:
if receiver.file_props['type'] == 'r':
result = receiver.get_file_contents(timeout)
else:
result = receiver.write_next()
self.process_result(result, receiver)
else:
self.remove_receiver(idx)
def process_result(self, result, actor): def process_result(self, result, actor):
''' Take appropriate actions upon the result: ''' Take appropriate actions upon the result:
@ -347,9 +288,11 @@ running instance of Gajim. \nFile Transfer will be canceled.\n==================
self.connected -= 1 self.connected -= 1
class Socks5: class Socks5:
def __init__(self, host, port, initiator, target, sid): def __init__(self, idlequeue, host, port, initiator, target, sid):
if host is not None: if host is not None:
self.host = socket.gethostbyname(host) self.host = socket.gethostbyname(host)
self.idlequeue = idlequeue
self.fd = -1
self.port = port self.port = port
self.initiator = initiator self.initiator = initiator
self.target = target self.target = target
@ -360,25 +303,28 @@ class Socks5:
self.pauses = 0 self.pauses = 0
self.size = 0 self.size = 0
self.remaining_buff = '' self.remaining_buff = ''
self.fd = None self.file = None
def open_file_for_reading(self): def open_file_for_reading(self):
if self.fd == None: if self.file == None:
try: try:
self.fd = open(self.file_props['file-name'],'rb') self.file = open(self.file_props['file-name'],'rb')
if self.file_props.has_key('offset') and self.file_props['offset']: if self.file_props.has_key('offset') and self.file_props['offset']:
self.size = self.file_props['offset'] self.size = self.file_props['offset']
self.fd.seek(self.size) self.file.seek(self.size)
self.file_props['received-len'] = self.size self.file_props['received-len'] = self.size
except IOError, e: except IOError, e:
self.close_file() self.close_file()
raise IOError, e raise IOError, e
def close_file(self): def close_file(self):
if self.file:
if not self.file.closed:
try: try:
self.fd.close() self.file.close()
except: except:
pass pass
self.file = None
def get_fd(self): def get_fd(self):
''' Test if file is already open and return its fd, ''' Test if file is already open and return its fd,
@ -411,7 +357,6 @@ class Socks5:
def receive(self): def receive(self):
''' Reads small chunks of data. ''' Reads small chunks of data.
Calls owner's disconnected() method if appropriate.''' Calls owner's disconnected() method if appropriate.'''
if self.pending_read():
received = '' received = ''
try: try:
add = self._recv(64) add = self._recv(64)
@ -420,8 +365,6 @@ class Socks5:
received +=add received +=add
if len(add) == 0: if len(add) == 0:
self.disconnect() self.disconnect()
else:
return None
return add return add
def send_raw(self,raw_data): def send_raw(self,raw_data):
@ -444,7 +387,7 @@ class Socks5:
self.disconnect() self.disconnect()
self.file_props['error'] = -7 # unable to read from file self.file_props['error'] = -7 # unable to read from file
return -1 return -1
buff = self.fd.read(MAX_BUFF_LEN) buff = self.file.read(MAX_BUFF_LEN)
if len(buff) > 0: if len(buff) > 0:
lenn = 0 lenn = 0
try: try:
@ -453,7 +396,6 @@ class Socks5:
if e.args[0] not in (EINTR, ENOBUFS, EWOULDBLOCK): if e.args[0] not in (EINTR, ENOBUFS, EWOULDBLOCK):
# peer stopped reading # peer stopped reading
self.state = 8 # end connection self.state = 8 # end connection
self.close_file()
self.disconnect() self.disconnect()
self.file_props['error'] = -1 self.file_props['error'] = -1
return -1 return -1
@ -466,7 +408,6 @@ class Socks5:
if self.size >= int(self.file_props['size']): if self.size >= int(self.file_props['size']):
self.state = 8 # end connection self.state = 8 # end connection
self.file_props['error'] = 0 self.file_props['error'] = 0
self.close_file()
self.disconnect() self.disconnect()
return -1 return -1
if lenn != len(buff): if lenn != len(buff):
@ -487,7 +428,6 @@ class Socks5:
return lenn return lenn
else: else:
self.state = 8 # end connection self.state = 8 # end connection
self.close_file()
self.disconnect() self.disconnect()
return -1 return -1
@ -516,7 +456,6 @@ class Socks5:
self.file_props['completed'] = True self.file_props['completed'] = True
return 0 return 0
else: else:
while self.pending_read(timeout):
fd = self.get_fd() fd = self.get_fd()
try: try:
buff = self._recv(MAX_BUFF_LEN) buff = self._recv(MAX_BUFF_LEN)
@ -571,28 +510,18 @@ class Socks5:
return self.file_props['received-len'] return self.file_props['received-len']
return None return None
def disconnect(self, cb = True): def disconnect(self):
''' Closes the socket. ''' ''' Closes open descriptors and remover socket descr. from idleque '''
# be sure that we don't leave open file
self.close_file()
try:
self._sock.close() self._sock.close()
except:
# socket is already closed
pass
self.connected = False self.connected = False
self.idlequeue.unplug_idle(self.fd)
def pending_read(self,timeout=0): self.fd = -1
''' Returns true if there is a data ready to be read. '''
if self._sock is None:
return False
try:
return select.select([self._sock],[],[],timeout)[0]
except Exception, e:
return False
def pending_connection(self,timeout=0):
''' Returns true if there is a data ready to be read. '''
if self._sock is None:
return False
try:
return select.select([],[self._sock],[],timeout)[0]
except Exception, e:
return False
def _get_auth_buff(self): def _get_auth_buff(self):
''' Message, that we support 1 one auth mechanism: ''' Message, that we support 1 one auth mechanism:
@ -673,19 +602,72 @@ class Socks5:
self.file_props['proxy_receiver'])).hexdigest() self.file_props['proxy_receiver'])).hexdigest()
return sha.new('%s%s%s' % (self.sid, self.initiator, self.target)).hexdigest() return sha.new('%s%s%s' % (self.sid, self.initiator, self.target)).hexdigest()
class Socks5Sender(Socks5): class Socks5Sender(Socks5, IdleObject):
''' class for sending file to socket over socks5 ''' ''' class for sending file to socket over socks5 '''
def __init__(self, sock_hash, parent, _sock, host = None, port = None): def __init__(self, idlequeue, sock_hash, parent, _sock, host = None, port = None):
self.queue_idx = sock_hash self.queue_idx = sock_hash
self.queue = parent self.queue = parent
Socks5.__init__(self, host, port, None, None, None) Socks5.__init__(self, idlequeue, host, port, None, None, None)
self._sock = _sock self._sock = _sock
self._sock.setblocking(False) self._sock.setblocking(False)
self.fd = _sock.fileno()
self._recv = _sock.recv self._recv = _sock.recv
self._send = _sock.send self._send = _sock.send
self.connected = True self.connected = True
self.state = 1 # waiting for first bytes self.state = 1 # waiting for first bytes
self.file_props = None self.file_props = None
# start waiting for data
self.idlequeue.plug_idle(self, False, True)
def pollout(self):
if not self.connected:
self.queue.remove_sender(self.queue_idx)
return
if self.state == 2: # send reply with desired auth type
self.send_raw(self._get_auth_response())
elif self.state == 4: # send positive response to the 'connect'
self.send_raw(self._get_request_buff(self.sha_msg, 0x00))
elif self.state == 7:
if self.file_props['paused']:
# TODO: better way is to remove it from idlequeue
return
result = self.write_next()
self.queue.process_result(result, self)
if result is None or result <= 0:
self.queue.remove_sender(self.queue_idx)
elif self.state == 8:
self.queue.remove_sender(self.queue_idx)
return
else:
self.disconnect()
if self.state < 5:
self.state += 1
# unplug and plug this time for reading
self.idlequeue.plug_idle(self, False, True)
def pollend(self):
self.state = 8 # end connection
self.close_file()
self.file_props['error'] = -1
self.queue.process_result(-1, self)
self.queue.remove_sender(self.queue_idx)
def pollin(self):
if self.connected:
if self.state < 5:
result = self.main()
if self.state == 4:
self.queue.result_sha(self.sha_msg, self.queue_idx)
if result == -1:
self.disconnect()
elif self.state == 5:
if self.file_props is not None and \
self.file_props['type'] == 'r':
result = self.get_file_contents(0)
self.queue.process_result(result, self)
else:
self.queue.remove_sender(self.queue_idx)
def send_file(self): def send_file(self):
''' start sending the file over verified connection ''' ''' start sending the file over verified connection '''
@ -701,6 +683,8 @@ class Socks5Sender(Socks5):
self.file_props['received-len'] = 0 self.file_props['received-len'] = 0
self.pauses = 0 self.pauses = 0
self.state = 7 self.state = 7
# plug for writing
self.idlequeue.plug_idle(self, True, False)
return self.write_next() # initial for nl byte return self.write_next() # initial for nl byte
def main(self): def main(self):
@ -712,68 +696,60 @@ class Socks5Sender(Socks5):
mechs = self._parse_auth_buff(buff) mechs = self._parse_auth_buff(buff)
if mechs is None: if mechs is None:
return -1 # invalid auth methods received return -1 # invalid auth methods received
elif self.state == 2: # send reply with desired auth type
self.send_raw(self._get_auth_response())
elif self.state == 3: # get next request elif self.state == 3: # get next request
buff = self.receive() buff = self.receive()
(req_type, self.sha_msg, port) = self._parse_request_buff(buff) (req_type, self.sha_msg, port) = self._parse_request_buff(buff)
if req_type != 0x01: if req_type != 0x01:
return -1 # request is not of type 'connect' return -1 # request is not of type 'connect'
elif self.state == 4: # send positive response to the 'connect'
self.send_raw(self._get_request_buff(self.sha_msg, 0x00))
self.state += 1 # go to the next step self.state += 1 # go to the next step
# unplug & plug for writing
self.idlequeue.plug_idle(self, True, False)
return None return None
def pending_data(self,timeout=0):
''' return true if there is a data ready to be read '''
if self._sock is None:
return False
try:
if self.state in (1, 3, 5): # (initial, request, send file)
return self.pending_read(timeout)
elif self.state in (2, 4): # send auth and positive response
return True
except Exception, e:
return False
return False
def disconnect(self, cb = True): def disconnect(self, cb = True):
''' Closes the socket. ''' ''' Closes the socket. '''
# close connection and remove us from the queue # close connection and remove us from the queue
try: Socks5.disconnect(self)
self._sock.close()
except:
pass
self.connected = False
if self.file_props is not None: if self.file_props is not None:
self.file_props['connected'] = False self.file_props['connected'] = False
self.file_props['disconnect_cb'] = None self.file_props['disconnect_cb'] = None
if self.queue is not None: if self.queue is not None:
self.queue.remove_sender(self.queue_idx, False) self.queue.remove_sender(self.queue_idx, False)
class Socks5Listener: class Socks5Listener(IdleObject):
def __init__(self, host, port): def __init__(self, idlequeue, host, port):
self.host, self.port = host, port self.host, self.port = host, port
self.queue_idx = -1 self.queue_idx = -1
self.idlequeue = idlequeue
self.queue = None self.queue = None
self.started = False self.started = False
self._sock = None self._sock = None
self.fd = -1
def bind(self): def bind(self):
try:
self._serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self._serv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self._serv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# will fail when port as busy, or we don't have rights to bind
try:
self._serv.bind(('0.0.0.0', self.port)) self._serv.bind(('0.0.0.0', self.port))
self._serv.listen(socket.SOMAXCONN)
self._serv.setblocking(False)
except Exception, e: except Exception, e:
# unable to bind, show error dialog # unable to bind, show error dialog
return None return None
self._serv.listen(socket.SOMAXCONN)
self._serv.setblocking(False)
self.fd = self._serv.fileno()
self.idlequeue.plug_idle(self, False, True)
self.started = True self.started = True
def pollin(self):
sock = self.accept_conn()
self.queue.on_connection_accepted(sock)
def disconnect(self): def disconnect(self):
self.idlequeue.unplug_idle(self.fd)
self.fd = -1
try: try:
self._serv.close() self._serv.close()
except: except:
@ -784,18 +760,8 @@ class Socks5Listener:
_sock[0].setblocking(False) _sock[0].setblocking(False)
return _sock return _sock
def pending_connection(self,timeout=0): class Socks5Receiver(Socks5, IdleObject):
''' Returns true if there is a data ready to be read. ''' def __init__(self, idlequeue, streamhost, sid, file_props = None):
if self._serv is None:
return False
try:
res = select.select([self._serv],[],[],timeout)
return res[0]
except Exception, e:
return False
class Socks5Receiver(Socks5):
def __init__(self, streamhost, sid, file_props = None):
self.queue_idx = -1 self.queue_idx = -1
self.streamhost = streamhost self.streamhost = streamhost
self.queue = None self.queue = None
@ -811,18 +777,62 @@ class Socks5Receiver(Socks5):
self.file_props['completed'] = False self.file_props['completed'] = False
self.file_props['paused'] = False self.file_props['paused'] = False
self.file_props['stalled'] = False self.file_props['stalled'] = False
Socks5.__init__(self, streamhost['host'], int(streamhost['port']), Socks5.__init__(self, idlequeue, streamhost['host'], int(streamhost['port']),
streamhost['initiator'], streamhost['target'], sid) streamhost['initiator'], streamhost['target'], sid)
def connect(self): def connect(self):
''' create the socket and start the connect loop ''' ''' create the socket and plug it to the idlequeue '''
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# this will not block the GUI # this will not block the GUI
self._sock.setblocking(False) self._sock.setblocking(False)
self.fd = self._sock.fileno()
self.state = 0 # about to be connected self.state = 0 # about to be connected
res = self.do_connect() self.idlequeue.plug_idle(self, True, False)
return res self.do_connect()
# TODO: add timeout for establishing connection
return None
def pollout(self):
if self.state == 0:
self.do_connect()
return
elif self.state == 1: # send initially: version and auth types
self.send_raw(self._get_auth_buff())
elif self.state == 3: # send 'connect' request
self.send_raw(self._get_request_buff(self._get_sha1_auth()))
elif self.file_props['type'] != 'r':
# TODO: better way to handle paused state
if self.file_props['paused'] == True:
return
result = self.write_next()
self.queue.process_result(result, self)
return
self.state += 1
# unplug and plug for reading
self.idlequeue.plug_idle(self, False, True)
def pollend(self):
self.file_props['error'] = -1
self.queue.process_result(-1, self)
self.queue.remove_receiver(self.queue_idx)
def pollin(self):
if self.connected:
if self.file_props['paused']:
return
if self.state < 5:
result = self.main(0)
self.queue.process_result(result, self)
elif self.state == 5: # wait for proxy reply
pass
elif self.file_props['type'] == 'r':
result = self.get_file_contents(0)
self.queue.process_result(result, self)
else:
self.queue.remove_receiver(self.queue_idx)
def read_timeout(self, fd):
self.disconnect()
def do_connect(self): def do_connect(self):
try: try:
@ -844,29 +854,25 @@ class Socks5Receiver(Socks5):
self._sock.setblocking(False) self._sock.setblocking(False)
self._send=self._sock.send self._send=self._sock.send
self._recv=self._sock.recv self._recv=self._sock.recv
pass
self.buff = '' self.buff = ''
self.connected = True self.connected = True
self.file_props['connected'] = True self.file_props['connected'] = True
self.file_props['disconnect_cb'] = self.disconnect self.file_props['disconnect_cb'] = self.disconnect
self.state = 1 # connected self.state = 1 # connected
self.queue._socket_connected(self.streamhost, self.file_props) self.queue._socket_connected(self.streamhost, self.file_props)
self.idlequeue.plug_idle(self, True, False)
return 1 # we are connected return 1 # we are connected
def main(self, timeout = 0): def main(self, timeout = 0):
''' begin negotiation. on success 'address' != 0 ''' ''' begin negotiation. on success 'address' != 0 '''
result = 1 result = 1
if self.state == 1: # send initially: version and auth types if self.state == 2: # read auth response
self.send_raw(self._get_auth_buff())
elif self.state == 2: # read auth response
buff = self.receive() buff = self.receive()
if buff is None or len(buff) != 2: if buff is None or len(buff) != 2:
return None return None
version, method = struct.unpack('!BB', buff[:2]) version, method = struct.unpack('!BB', buff[:2])
if version != 0x05 or method == 0xff: if version != 0x05 or method == 0xff:
self.disconnect() self.disconnect()
elif self.state == 3: # send 'connect' request
self.send_raw(self._get_request_buff(self._get_sha1_auth()))
elif self.state == 4: # get approve of our request elif self.state == 4: # get approve of our request
buff = self.receive() buff = self.receive()
if buff == None: if buff == None:
@ -895,7 +901,7 @@ class Socks5Receiver(Socks5):
self.state = 8 self.state = 8
self.disconnect() self.disconnect()
# for senders: init file_props and send '\n' # for senders: init file_props
if result == 1 and self.state == 5: if result == 1 and self.state == 5:
if self.file_props['type'] == 's': if self.file_props['type'] == 's':
self.file_props['error'] = 0 self.file_props['error'] = 0
@ -908,34 +914,24 @@ class Socks5Receiver(Socks5):
self.file_props['last-time'] = time.time() self.file_props['last-time'] = time.time()
self.file_props['received-len'] = 0 self.file_props['received-len'] = 0
self.pauses = 0 self.pauses = 0
self.state = 6 # send/get file contents # start sending file contents to socket
self.idlequeue.plug_idle(self, True, False)
else:
# receiving file contents from socket
self.idlequeue.plug_idle(self, False, True)
# we have set up the connection, next - retrieve file
# TODO: add timeout for stalled state
self.state = 6
if self.state < 5: if self.state < 5:
self.idlequeue.plug_idle(self, True, False)
self.state += 1 self.state += 1
return None return None
# we have set up the connection, next - retrieve file
def pending_data(self, timeout=0):
''' Returns true if there is a data ready to be read. '''
if self._sock is None:
return False
try:
if self.state in (2, 4, 6): # auth response, connect, file data
return self.pending_read(0)
elif self.state in (1, 3, 5): # auth types, connect request
return True
except Exception, e:
return False
return False
def disconnect(self, cb = True): def disconnect(self, cb = True):
''' Closes the socket. ''' ''' Closes the socket. Remove self from queue if cb is True'''
# close connection and remove us from the queue # close connection
if self._sock: Socks5.disconnect(self)
try:
self._sock.close()
except:
pass
self.connected = False
if cb is True: if cb is True:
self.file_props['disconnect_cb'] = None self.file_props['disconnect_cb'] = None
if self.queue is not None: if self.queue is not None:

View File

@ -155,6 +155,8 @@ class NonBlockingTcp(PlugIn, IdleObject):
self.remove_timeout() self.remove_timeout()
self._owner.disconnected() self._owner.disconnected()
self.idlequeue.unplug_idle(self.fd) self.idlequeue.unplug_idle(self.fd)
# socket descriptor cannot be (un)plugged anymore
self.fd = -1
if self.on_disconnect: if self.on_disconnect:
self.on_disconnect() self.on_disconnect()

View File

@ -1379,14 +1379,9 @@ class Interface:
gajim.handlers = self.handlers gajim.handlers = self.handlers
def process_connections(self): def process_connections(self):
''' called each XXX (200) miliseconds. For now it checks for idlequeue timeouts ''' called each foo (200) miliseconds. Check for idlequeue timeouts.
and FT events.
''' '''
gajim.idlequeue.process() gajim.idlequeue.process()
# TODO: rewrite socks5 classes to work with idlequeue and remove these lines
if gajim.socks5queue.connected:
gajim.socks5queue.process(0)
return True # renew timeout (loop for ever) return True # renew timeout (loop for ever)
def save_config(self): def save_config(self):
@ -1501,9 +1496,6 @@ class Interface:
gajim.log.setLevel(gajim.logging.DEBUG) gajim.log.setLevel(gajim.logging.DEBUG)
else: else:
gajim.log.setLevel(None) gajim.log.setLevel(None)
gajim.socks5queue = socks5.SocksQueue(
self.handle_event_file_rcv_completed,
self.handle_event_file_progress)
# pygtk2.8 on win, breaks io_add_watch. We use good old select.select() # pygtk2.8 on win, breaks io_add_watch. We use good old select.select()
if os.name == 'nt' and gtk.pygtk_version > (2, 8, 0): if os.name == 'nt' and gtk.pygtk_version > (2, 8, 0):
@ -1515,6 +1507,9 @@ class Interface:
gajim.idlequeue = GlibIdleQueue() gajim.idlequeue = GlibIdleQueue()
# resolve and keep current record of resolved hosts # resolve and keep current record of resolved hosts
gajim.resolver = nslookup.Resolver(gajim.idlequeue) gajim.resolver = nslookup.Resolver(gajim.idlequeue)
gajim.socks5queue = socks5.SocksQueue(gajim.idlequeue,
self.handle_event_file_rcv_completed,
self.handle_event_file_progress)
self.register_handlers() self.register_handlers()
for account in gajim.config.get_per('accounts'): for account in gajim.config.get_per('accounts'):
gajim.connections[account] = common.connection.Connection(account) gajim.connections[account] = common.connection.Connection(account)