diff --git a/src/common/xmpp/debug.py b/src/common/xmpp/debug.py index 29321c1ff..60480b66c 100644 --- a/src/common/xmpp/debug.py +++ b/src/common/xmpp/debug.py @@ -154,7 +154,7 @@ class Debug: # If you dont want to validate flags on each call to # show(), set this to 0 # - validate_flags = 1, + validate_flags = 0, # # If you dont want the welcome message, set to 0 # default is to show welcome if any flags are active diff --git a/src/common/xmpp/idlequeue.py b/src/common/xmpp/idlequeue.py index 5ee73c987..955af1be1 100644 --- a/src/common/xmpp/idlequeue.py +++ b/src/common/xmpp/idlequeue.py @@ -53,6 +53,7 @@ class IdleQueue: self.selector = select.poll() def remove_timeout(self, fd): + print 'read timeout removed for fd %s' % fd if self.read_timeouts.has_key(fd): del(self.read_timeouts[fd]) @@ -68,6 +69,7 @@ class IdleQueue: def set_read_timeout(self, fd, seconds): ''' set a new timeout, if it is not removed after 'seconds', then obj.read_timeout() will be called ''' + print 'read timeout set for fd %s on %s seconds' % (fd, seconds) timeout = self.current_time() + seconds self.read_timeouts[fd] = timeout @@ -204,14 +206,14 @@ class SelectIdleQueue(IdleQueue): except select.error, e: waiting_descriptors = ((),(),()) if e[0] != 4: # interrupt - raise - for fd in waiting_descriptors[0]: - q = self.queue.get(fd) - if q: + raise + for fd in waiting_descriptors[0]: + q = self.queue.get(fd) + if q: q.pollin() for fd in waiting_descriptors[1]: - q = self.queue.get(fd) - if q: + q = self.queue.get(fd) + if q: q.pollout() for fd in waiting_descriptors[2]: self.queue.get(fd).pollend() diff --git a/src/common/xmpp/transports_nb.py b/src/common/xmpp/transports_nb.py index 4c056c09d..a4f847764 100644 --- a/src/common/xmpp/transports_nb.py +++ b/src/common/xmpp/transports_nb.py @@ -337,7 +337,9 @@ class NonBlockingTcp(PlugIn, IdleObject): self.on_connect_failure() def _plug_idle(self): + # readable if socket is connected or disconnecting readable = self.state != 0 + # writeable if sth to send if self.sendqueue or self.sendbuff: writable = True else: @@ -346,6 +348,7 @@ class NonBlockingTcp(PlugIn, IdleObject): self.idlequeue.plug_idle(self, writable, readable) def pollout(self): + print 'pollout called - send possible' if self.state == 0: self.connect_to_next_ip() return @@ -359,6 +362,7 @@ class NonBlockingTcp(PlugIn, IdleObject): self._owner = None def pollin(self): + print 'pollin called - receive possible' self._do_receive() def pollend(self, retry=False): @@ -583,11 +587,13 @@ class NonBlockingTcp(PlugIn, IdleObject): errnum = 0 try: + print "==============sock.connect called" self._sock.connect(self._server) self._sock.setblocking(False) except Exception, ee: (errnum, errstr) = ee # in progress, or would block + print "errnum: %s" % errnum if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): self.state = 1 return diff --git a/src/common/xmpp/transports_new.py b/src/common/xmpp/transports_new.py new file mode 100644 index 000000000..36992ba95 --- /dev/null +++ b/src/common/xmpp/transports_new.py @@ -0,0 +1,270 @@ +from idlequeue import IdleObject +from client import PlugIn +import threading, socket, errno + +import logging +log = logging.getLogger('gajim.c.x.transports_nb') +consoleloghandler = logging.StreamHandler() +consoleloghandler.setLevel(logging.DEBUG) +consoleloghandler.setFormatter( + logging.Formatter('%(levelname)s: %(message)s') +) +log.setLevel(logging.DEBUG) +log.addHandler(consoleloghandler) +log.propagate = False + +''' +this module will replace transports_nb.py +For now, it can be run from test/test_nonblockingtcp.py +* set credentials in the testing script +''' + + +class NBgetaddrinfo(threading.Thread): + ''' + Class for nonblocking call of getaddrinfo. Maybe unnecessary. + ''' + def __init__(self, server, on_success, on_failure, timeout_sec): + ''' + Call is started from constructor. It is not needed to hold reference on + created instance. + :param server: tuple (hostname, port) for DNS request + :param on_success: callback for successful DNS request + :param on_failure: called when DNS request couldn't be performed + :param timeout_sec: max seconds to wait for return from getaddrinfo. After + this time, on_failure is called with error message. + ''' + threading.Thread.__init__(self) + self.on_success = on_success + self.on_failure = on_failure + self.server = server + self.lock = threading.Lock() + self.already_called = False + self.timer = threading.Timer(timeout_sec, self.on_timeout) + self.timer.start() + self.start() + + def on_timeout(self): + ''' + Called by timer. Means that getaddrinfo takes too long and will be + interrupted. + ''' + self.do_call(False, 'NBgetaddrinfo timeout while looking up %s:%s' % self.server) + + def do_call(self, success, data): + ''' + Method called either on success and failure. In case of timeout it will be + called twice but only the first (failure) call will be performed. + :param success: True if getaddrinfo returned properly, False if there was an + error or on timeout. + :param data: error message if failure, list of address structures if success + ''' + log.debug('NBgetaddrinfo::do_call(): %s' % repr(data)) + self.timer.cancel() + self.lock.acquire() + if not self.already_called: + self.already_called = True + self.lock.release() + if success: + self.on_success(data) + else: + self.on_failure(data) + return + else: + self.lock.release() + return + + def run(self): + try: + ips = socket.getaddrinfo(self.server[0],self.server[1],socket.AF_UNSPEC, + socket.SOCK_STREAM) + except socket.gaierror, e: + self.do_call(False, 'Lookup failure for %s: %s %s' % + (repr(self.server), e[0], e[1])) + except Exception, e: + self.do_call(False, 'Exception while DNS lookup of %s: %s' % + (repr(e), repr(self.server))) + else: + self.do_call(True, ips) + + + +DISCONNECTED ='DISCONNECTED' +CONNECTING ='CONNECTING' +CONNECTED ='CONNECTED' +DISCONNECTING ='DISCONNECTING' + +CONNECT_TIMEOUT_SECONDS = 5 +'''timeout to connect to the server socket, it doesn't include auth''' + +DISCONNECT_TIMEOUT_SECONDS = 10 +'''how long to wait for a disconnect to complete''' + +class NonBlockingTcp(PlugIn, IdleObject): + def __init__(self, on_xmpp_connect=None, on_xmpp_failure=None): + ''' + Class constructor. All parameters can be reset in tcp_connect or xmpp_connect + calls. + + ''' + PlugIn.__init__(self) + IdleObject.__init__(self) + self.on_tcp_connect = None + self.on_tcp_failure = None + self.sock = None + self.idlequeue = None + self.DBG_LINE='socket' + self.state = DISCONNECTED + ''' + CONNECTING - after non-blocking socket.connect() until TCP connection is estabilished + CONNECTED - after TCP connection is estabilished + DISCONNECTING - + DISCONNECTED + ''' + self._exported_methods=[self.send, self.disconnect, self.onreceive, self.set_send_timeout, + self.start_disconnect, self.set_timeout, self.remove_timeout] + + + def connect(self, conn_5tuple, on_tcp_connect, on_tcp_failure, idlequeue): + ''' + Creates and connects socket to server and port defined in conn_5tupe which + should be list item returned from getaddrinfo. + :param conn_5tuple: 5-tuple returned from getaddrinfo + :param on_tcp_connect: callback called on successful tcp connection + :param on_tcp_failure: callback called on failure when estabilishing tcp + connection + :param idlequeue: idlequeue for socket + ''' + self.on_tcp_connect = on_tcp_connect + self.on_tcp_failure = on_tcp_failure + self.conn_5tuple = conn_5tuple + try: + self.sock = socket.socket(*conn_5tuple[:3]) + except socket.error, (errnum, errstr): + on_tcp_failure('NonBlockingTcp: Error while creating socket: %s %s' % (errnum, errstr)) + return + + self.idlequeue = idlequeue + self.fd = self.sock.fileno() + self.idlequeue.plug_idle(self, True, False) + + errnum = 0 + ''' variable for errno symbol that will be found from exception raised from connect() ''' + + # set timeout for TCP connecting - if nonblocking connect() fails, pollend + # is called. If if succeeds pollout is called. + self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT_SECONDS) + + try: + self.sock.setblocking(False) + self.sock.connect(conn_5tuple[4]) + except Exception, (errnum, errstr): + pass + + if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): + # connecting in progress + self.state = CONNECTING + log.debug('After nonblocking connect. "%s" raised => CONNECTING' % errstr) + # on_tcp_connect/failure will be called from self.pollin/self.pollout + return + elif errnum in (0, 10056, errno.EISCONN): + # already connected - this branch is very unlikely, nonblocking connect() will + # return EINPROGRESS exception in most cases. Anyway, we don't need timeout + # on connected descriptor + log.debug('After nonblocking connect. "%s" raised => CONNECTED' % errstr) + self._on_tcp_connect(self) + return + + # if there was some other error, call failure callback and unplug transport + # which will also remove read_timeouts for descriptor + self._on_tcp_failure('Exception while connecting to %s: %s - %s' % + (conn_5tuple[4], errnum, errstr)) + + def _on_tcp_connect(self, data): + ''' This method preceeds actual call of on_tcp_connect callback + ''' + self.state = CONNECTED + self.idlequeue.remove_timeout(self.fd) + self.on_tcp_connect(data) + + + def _on_tcp_failure(self,err_msg): + ''' This method preceeds actual call of on_tcp_failure callback + ''' + self.state = DISCONNECTED + self.idlequeue.unplug_idle(self.fd) + self.on_tcp_failure(err_msg) + + def pollin(self): + '''called when receive on plugged socket is possible ''' + log.debug('pollin called, state == %s' % self.state) + + def pollout(self): + '''called when send to plugged socket is possible''' + log.debug('pollout called, state == %s' % self.state) + + if self.state==CONNECTING: + self._on_tcp_connect(self) + return + + def pollend(self): + '''called when remote site closed connection''' + log.debug('pollend called, state == %s' % self.state) + if self.state==CONNECTING: + self._on_tcp_failure('Error during connect to %s:%s' % self.conn_5tuple[4]) + + def read_timeout(self): + ''' + Implemntation of IdleObject function called on timeouts from IdleQueue. + ''' + log.debug('read_timeout called, state == %s' % self.state) + if self.state==CONNECTING: + # if read_timeout is called during connecting, connect() didn't end yet + # thus we have to close the socket + try: + self.sock.close() + except socket.error, (errnum, errmsg): + log.error('Error while closing socket on connection timeout: %s %s' + % (errnum, errmsg)) + self._on_tcp_failure('Error during connect to %s:%s' % self.conn_5tuple[4]) + + + + def disconnect(self, on_disconnect=None): + if self.state == DISCONNECTED: + return + self.idlequeue.unplug_idle(self.fd) + try: + self.sock.shutdown(socket.SHUT_RDWR) + except socket.error, (errnum, errstr): + log.error('Error while disconnecting: %s %s' % (errnum,errstr)) + + try: + self.sock.close() + except socket.error, (errnum, errmsg): + log.error('Error closing socket: %s %s' % (errnum,errstr)) + if on_disconnect: + on_disconnect() + + + + + + + def send(self, data, now=False): + pass + + def onreceive(self): + pass + + def set_send_timeout(self): + pass + + def set_timeout(self): + pass + + def remove_timeout(self): + pass + + def start_disconnect(self): + pass diff --git a/test/test_client_nb.py b/test/test_client_nb.py index 768c19096..3ddc75eef 100644 --- a/test/test_client_nb.py +++ b/test/test_client_nb.py @@ -1,13 +1,13 @@ -import unittest, threading -from mock import Mock +import unittest +from xmpp_mocks import * -import sys, time, os.path +import sys, os.path gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..') sys.path.append(gajim_root + '/src/common/xmpp') -import client_nb, idlequeue +import client_nb ''' Testing script for NonBlockingClient class (src/common/xmpp/client_nb.py) @@ -15,10 +15,6 @@ It actually connects to a xmpp server so the connection values have to be changed before running. ''' -idlequeue_interval = 0.2 -''' -IdleQueue polling interval. 200ms is used in Gajim as default -''' xmpp_server_port = ('xmpp.example.org',5222) ''' @@ -26,111 +22,12 @@ xmpp_server_port = ('xmpp.example.org',5222) Script will connect to the machine. ''' -credentials = ['primus', 'l0v3', 'testclient'] +credentials = ['login', 'pass', 'testclient'] ''' [username, password, passphrase] Script will autheticate itself with this credentials on above mentioned server. ''' - -class MockConnectionClass(Mock): - ''' - Class simulating Connection class from src/common/connection.py - - It is derived from Mock in order to avoid defining all methods - from real Connection that are called from NBClient or Dispatcher - ( _event_dispatcher for example) - ''' - - def __init__(self, *args): - self.event = threading.Event() - ''' - is used for waiting on connect, auth and disconnect callbacks - ''' - - self.event.clear() - Mock.__init__(self, *args) - - def on_connect(self, *args): - ''' - Method called on succesful connecting - after receiving - from server (NOT after TLS stream restart). - ''' - - #print 'on_connect - args:' - #for i in args: - # print ' %s' % i - self.connect_failed = False - self.event.set() - - def on_connect_failure(self, *args): - ''' - Method called on failure while connecting - on everything from TCP error - to error during TLS handshake - ''' - - #print 'on_connect failure - args:' - #for i in args: - # print ' %s' % i - self.connect_failed = True - self.event.set() - - def on_auth(self, con, auth): - ''' - Method called after authentication is done regardless on the result. - - :Parameters: - con : NonBlockingClient - reference to authenticated object - auth : string - type of authetication in case of success ('old_auth', 'sasl') or - None in case of auth failure - ''' - - #print 'on_auth - args:' - #print ' con: %s' % con - #print ' auth: %s' % auth - self.auth_connection = con - self.auth = auth - self.event.set() - - def wait(self): - ''' - Waiting until some callback sets the event and clearing the event subsequently. - ''' - - self.event.wait() - self.event.clear() - - - - -class IdleQueueThread(threading.Thread): - ''' - Thread for regular processing of idlequeue. - ''' - def __init__(self): - self.iq = idlequeue.IdleQueue() - self.stop = threading.Event() - ''' - Event used to stopping the thread main loop. - ''' - - self.stop.clear() - threading.Thread.__init__(self) - - def run(self): - while not self.stop.isSet(): - self.iq.process() - time.sleep(idlequeue_interval) - self.iq.process() - - def stop_thread(self): - self.stop.set() - - - - class TestNonBlockingClient(unittest.TestCase): ''' Test Cases class for NonBlockingClient. @@ -147,8 +44,8 @@ class TestNonBlockingClient(unittest.TestCase): self.client = client_nb.NonBlockingClient( server=xmpp_server_port[0], port=xmpp_server_port[1], - on_connect=self.connection.on_connect, - on_connect_failure=self.connection.on_connect_failure, + on_connect=lambda *args: self.connection.on_connect(True, *args), + on_connect_failure=lambda *args: self.connection.on_connect(False, *args), caller=self.connection ) ''' @@ -180,10 +77,10 @@ class TestNonBlockingClient(unittest.TestCase): self.connection.wait() # if on_connect was called, client has to be connected and vice versa - if self.connection.connect_failed: - self.assert_(not self.client.isConnected()) - else: + if self.connection.connect_succeeded: self.assert_(self.client.isConnected()) + else: + self.assert_(not self.client.isConnected()) def client_auth(self, username, password, resource, sasl): ''' @@ -203,7 +100,7 @@ class TestNonBlockingClient(unittest.TestCase): ''' Does disconnecting of connected client. Returns when TCP connection is closed. ''' - self.client.start_disconnect(None, on_disconnect=self.connection.event.set) + self.client.start_disconnect(None, on_disconnect=self.connection.set_event) print 'waiting for disconnecting...' self.connection.wait() @@ -260,7 +157,7 @@ class TestNonBlockingClient(unittest.TestCase): ''' self.open_stream(xmpp_server_port) self.assert_(self.client.isConnected()) - self.client_auth(credentials[0], "wrong pass", credentials[2], sasl=0) + self.client_auth(credentials[0], "wrong pass", credentials[2], sasl=1) self.assert_(self.connection.auth is None) self.do_disconnect() @@ -271,7 +168,10 @@ class TestNonBlockingClient(unittest.TestCase): if __name__ == '__main__': - suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingClient) + #suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingClient) + suite = unittest.TestSuite() + suite.addTest(TestNonBlockingClient('test_proper_connect_sasl')) + unittest.TextTestRunner(verbosity=2).run(suite) diff --git a/test/test_nonblockingtcp.py b/test/test_nonblockingtcp.py new file mode 100644 index 000000000..cf6d31a2d --- /dev/null +++ b/test/test_nonblockingtcp.py @@ -0,0 +1,119 @@ +''' +Unit test for NonBlockingTcp tranport. +''' + +import unittest +from xmpp_mocks import * + +import threading, sys, os.path, time + +gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..') + +sys.path.append(gajim_root + '/src/common/xmpp') +sys.path.append(gajim_root + '/src/common') + +import transports_new, debug +from client import * + +xmpp_server = ('xmpp.example.org',5222) +''' +2-tuple - (XMPP server hostname, c2s port) +Script will connect to the machine. +''' + +dns_timeout = 10 +''' +timeout for DNS A-request (for getaddrinfo() call) +''' + +class MockClient(IdleMock): + def __init__(self, server, port): + self.debug_flags=['all', 'nodebuilder'] + self._DEBUG = debug.Debug(['socket']) + self.DEBUG = self._DEBUG.Show + self.server = server + self.port = port + IdleMock.__init__(self) + self.tcp_connected = False + self.ip_addresses = [] + self.socket = None + + def do_dns_request(self): + transports_new.NBgetaddrinfo( + server=(self.server, self.port), + on_success=lambda *args:self.on_success('DNSrequest', *args), + on_failure=self.on_failure, + timeout_sec=dns_timeout + ) + self.wait() + + + def try_next_ip(self, err_message=None): + if err_message: + print err_message + if self.ip_addresses == []: + self.on_failure('Run out of hosts') + return + current_ip = self.ip_addresses.pop(0) + self.NonBlockingTcp.connect( + conn_5tuple=current_ip, + on_tcp_connect=lambda *args: self.on_success('TCPconnect',*args), + on_tcp_failure=self.try_next_ip, + idlequeue=self.idlequeue + ) + self.wait() + + + def set_idlequeue(self, idlequeue): + self.idlequeue=idlequeue + + def on_failure(self, data): + print 'Error: %s' % data + self.set_event() + + def on_success(self, mode, data): + if mode == "DNSrequest": + self.ip_addresses = data + elif mode == "TCPconnect": + pass + self.set_event() + + + + + + + + +class TestNonBlockingTcp(unittest.TestCase): + def setUp(self): + self.nbtcp = transports_new.NonBlockingTcp() + self.client = MockClient(*xmpp_server) + self.idlequeue_thread = IdleQueueThread() + self.idlequeue_thread.start() + self.client.set_idlequeue(self.idlequeue_thread.iq) + self.nbtcp.PlugIn(self.client) + + def tearDown(self): + self.idlequeue_thread.stop_thread() + self.idlequeue_thread.join() + + + def testSth(self): + self.client.do_dns_request() + if self.client.ip_addresses == []: + print 'No IP found for given hostname: %s' % self.client.server + return + else: + self.client.try_next_ip() + + + + + + +if __name__ == '__main__': + + suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingTcp) + unittest.TextTestRunner(verbosity=2).run(suite) + diff --git a/test/xmpp_mocks.py b/test/xmpp_mocks.py new file mode 100644 index 000000000..47d5b0c9f --- /dev/null +++ b/test/xmpp_mocks.py @@ -0,0 +1,120 @@ +''' +Module with dummy classes for unit testing of xmpp code (src/common/xmpp/*). +''' + +import threading, time, os.path, sys + +from mock import Mock + +gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..') + +sys.path.append(gajim_root + '/src/common/xmpp') + +''' +Module with classes usable for XMPP related testing. +''' + +import idlequeue +from client import PlugIn + +idlequeue_interval = 0.2 +''' +IdleQueue polling interval. 200ms is used in Gajim as default +''' + + +class IdleQueueThread(threading.Thread): + ''' + Thread for regular processing of idlequeue. + ''' + def __init__(self): + self.iq = idlequeue.IdleQueue() + self.stop = threading.Event() + ''' + Event used to stopping the thread main loop. + ''' + + self.stop.clear() + threading.Thread.__init__(self) + + def run(self): + while not self.stop.isSet(): + self.iq.process() + time.sleep(idlequeue_interval) + + def stop_thread(self): + self.stop.set() + + + + +class IdleMock: + ''' + Serves as template for testing objects that are normally controlled by GUI. + Allows to wait for asynchronous callbacks with wait() method. + ''' + def __init__(self): + self.event = threading.Event() + ''' + Event is used for waiting on callbacks. + ''' + self.event.clear() + + + def wait(self): + ''' + Waiting until some callback sets the event and clearing the event subsequently. + ''' + self.event.wait() + self.event.clear() + + def set_event(self): + self.event.set() + + +class MockConnectionClass(IdleMock,Mock): + ''' + Class simulating Connection class from src/common/connection.py + + It is derived from Mock in order to avoid defining all methods + from real Connection that are called from NBClient or Dispatcher + ( _event_dispatcher for example) + ''' + + def __init__(self, *args): + self.connect_succeeded = True + IdleMock.__init__(self) + Mock.__init__(self, *args) + + def on_connect(self, success, *args): + ''' + Method called after connecting - after receiving + from server (NOT after TLS stream restart) or connect failure + ''' + + print 'on_connect - args:' + print ' success - %s' % success + for i in args: + print ' %s' % i + self.connect_succeeded = success + self.set_event() + + def on_auth(self, con, auth): + ''' + Method called after authentication is done regardless on the result. + + :Parameters: + con : NonBlockingClient + reference to authenticated object + auth : string + type of authetication in case of success ('old_auth', 'sasl') or + None in case of auth failure + ''' + + #print 'on_auth - args:' + #print ' con: %s' % con + #print ' auth: %s' % auth + self.auth_connection = con + self.auth = auth + self.set_event() +