added stub for new transports module plus basic test for it, testing code reorganized

This commit is contained in:
tomk 2008-06-18 23:58:19 +00:00
parent 16e274b9ec
commit 65644ca13f
7 changed files with 540 additions and 123 deletions

View File

@ -154,7 +154,7 @@ class Debug:
# If you dont want to validate flags on each call to
# show(), set this to 0
#
validate_flags = 1,
validate_flags = 0,
#
# If you dont want the welcome message, set to 0
# default is to show welcome if any flags are active

View File

@ -53,6 +53,7 @@ class IdleQueue:
self.selector = select.poll()
def remove_timeout(self, fd):
print 'read timeout removed for fd %s' % fd
if self.read_timeouts.has_key(fd):
del(self.read_timeouts[fd])
@ -68,6 +69,7 @@ class IdleQueue:
def set_read_timeout(self, fd, seconds):
''' set a new timeout, if it is not removed after 'seconds',
then obj.read_timeout() will be called '''
print 'read timeout set for fd %s on %s seconds' % (fd, seconds)
timeout = self.current_time() + seconds
self.read_timeouts[fd] = timeout
@ -204,14 +206,14 @@ class SelectIdleQueue(IdleQueue):
except select.error, e:
waiting_descriptors = ((),(),())
if e[0] != 4: # interrupt
raise
for fd in waiting_descriptors[0]:
q = self.queue.get(fd)
if q:
raise
for fd in waiting_descriptors[0]:
q = self.queue.get(fd)
if q:
q.pollin()
for fd in waiting_descriptors[1]:
q = self.queue.get(fd)
if q:
q = self.queue.get(fd)
if q:
q.pollout()
for fd in waiting_descriptors[2]:
self.queue.get(fd).pollend()

View File

@ -337,7 +337,9 @@ class NonBlockingTcp(PlugIn, IdleObject):
self.on_connect_failure()
def _plug_idle(self):
# readable if socket is connected or disconnecting
readable = self.state != 0
# writeable if sth to send
if self.sendqueue or self.sendbuff:
writable = True
else:
@ -346,6 +348,7 @@ class NonBlockingTcp(PlugIn, IdleObject):
self.idlequeue.plug_idle(self, writable, readable)
def pollout(self):
print 'pollout called - send possible'
if self.state == 0:
self.connect_to_next_ip()
return
@ -359,6 +362,7 @@ class NonBlockingTcp(PlugIn, IdleObject):
self._owner = None
def pollin(self):
print 'pollin called - receive possible'
self._do_receive()
def pollend(self, retry=False):
@ -583,11 +587,13 @@ class NonBlockingTcp(PlugIn, IdleObject):
errnum = 0
try:
print "==============sock.connect called"
self._sock.connect(self._server)
self._sock.setblocking(False)
except Exception, ee:
(errnum, errstr) = ee
# in progress, or would block
print "errnum: %s" % errnum
if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
self.state = 1
return

View File

@ -0,0 +1,270 @@
from idlequeue import IdleObject
from client import PlugIn
import threading, socket, errno
import logging
log = logging.getLogger('gajim.c.x.transports_nb')
consoleloghandler = logging.StreamHandler()
consoleloghandler.setLevel(logging.DEBUG)
consoleloghandler.setFormatter(
logging.Formatter('%(levelname)s: %(message)s')
)
log.setLevel(logging.DEBUG)
log.addHandler(consoleloghandler)
log.propagate = False
'''
this module will replace transports_nb.py
For now, it can be run from test/test_nonblockingtcp.py
* set credentials in the testing script
'''
class NBgetaddrinfo(threading.Thread):
'''
Class for nonblocking call of getaddrinfo. Maybe unnecessary.
'''
def __init__(self, server, on_success, on_failure, timeout_sec):
'''
Call is started from constructor. It is not needed to hold reference on
created instance.
:param server: tuple (hostname, port) for DNS request
:param on_success: callback for successful DNS request
:param on_failure: called when DNS request couldn't be performed
:param timeout_sec: max seconds to wait for return from getaddrinfo. After
this time, on_failure is called with error message.
'''
threading.Thread.__init__(self)
self.on_success = on_success
self.on_failure = on_failure
self.server = server
self.lock = threading.Lock()
self.already_called = False
self.timer = threading.Timer(timeout_sec, self.on_timeout)
self.timer.start()
self.start()
def on_timeout(self):
'''
Called by timer. Means that getaddrinfo takes too long and will be
interrupted.
'''
self.do_call(False, 'NBgetaddrinfo timeout while looking up %s:%s' % self.server)
def do_call(self, success, data):
'''
Method called either on success and failure. In case of timeout it will be
called twice but only the first (failure) call will be performed.
:param success: True if getaddrinfo returned properly, False if there was an
error or on timeout.
:param data: error message if failure, list of address structures if success
'''
log.debug('NBgetaddrinfo::do_call(): %s' % repr(data))
self.timer.cancel()
self.lock.acquire()
if not self.already_called:
self.already_called = True
self.lock.release()
if success:
self.on_success(data)
else:
self.on_failure(data)
return
else:
self.lock.release()
return
def run(self):
try:
ips = socket.getaddrinfo(self.server[0],self.server[1],socket.AF_UNSPEC,
socket.SOCK_STREAM)
except socket.gaierror, e:
self.do_call(False, 'Lookup failure for %s: %s %s' %
(repr(self.server), e[0], e[1]))
except Exception, e:
self.do_call(False, 'Exception while DNS lookup of %s: %s' %
(repr(e), repr(self.server)))
else:
self.do_call(True, ips)
DISCONNECTED ='DISCONNECTED'
CONNECTING ='CONNECTING'
CONNECTED ='CONNECTED'
DISCONNECTING ='DISCONNECTING'
CONNECT_TIMEOUT_SECONDS = 5
'''timeout to connect to the server socket, it doesn't include auth'''
DISCONNECT_TIMEOUT_SECONDS = 10
'''how long to wait for a disconnect to complete'''
class NonBlockingTcp(PlugIn, IdleObject):
def __init__(self, on_xmpp_connect=None, on_xmpp_failure=None):
'''
Class constructor. All parameters can be reset in tcp_connect or xmpp_connect
calls.
'''
PlugIn.__init__(self)
IdleObject.__init__(self)
self.on_tcp_connect = None
self.on_tcp_failure = None
self.sock = None
self.idlequeue = None
self.DBG_LINE='socket'
self.state = DISCONNECTED
'''
CONNECTING - after non-blocking socket.connect() until TCP connection is estabilished
CONNECTED - after TCP connection is estabilished
DISCONNECTING -
DISCONNECTED
'''
self._exported_methods=[self.send, self.disconnect, self.onreceive, self.set_send_timeout,
self.start_disconnect, self.set_timeout, self.remove_timeout]
def connect(self, conn_5tuple, on_tcp_connect, on_tcp_failure, idlequeue):
'''
Creates and connects socket to server and port defined in conn_5tupe which
should be list item returned from getaddrinfo.
:param conn_5tuple: 5-tuple returned from getaddrinfo
:param on_tcp_connect: callback called on successful tcp connection
:param on_tcp_failure: callback called on failure when estabilishing tcp
connection
:param idlequeue: idlequeue for socket
'''
self.on_tcp_connect = on_tcp_connect
self.on_tcp_failure = on_tcp_failure
self.conn_5tuple = conn_5tuple
try:
self.sock = socket.socket(*conn_5tuple[:3])
except socket.error, (errnum, errstr):
on_tcp_failure('NonBlockingTcp: Error while creating socket: %s %s' % (errnum, errstr))
return
self.idlequeue = idlequeue
self.fd = self.sock.fileno()
self.idlequeue.plug_idle(self, True, False)
errnum = 0
''' variable for errno symbol that will be found from exception raised from connect() '''
# set timeout for TCP connecting - if nonblocking connect() fails, pollend
# is called. If if succeeds pollout is called.
self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT_SECONDS)
try:
self.sock.setblocking(False)
self.sock.connect(conn_5tuple[4])
except Exception, (errnum, errstr):
pass
if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
# connecting in progress
self.state = CONNECTING
log.debug('After nonblocking connect. "%s" raised => CONNECTING' % errstr)
# on_tcp_connect/failure will be called from self.pollin/self.pollout
return
elif errnum in (0, 10056, errno.EISCONN):
# already connected - this branch is very unlikely, nonblocking connect() will
# return EINPROGRESS exception in most cases. Anyway, we don't need timeout
# on connected descriptor
log.debug('After nonblocking connect. "%s" raised => CONNECTED' % errstr)
self._on_tcp_connect(self)
return
# if there was some other error, call failure callback and unplug transport
# which will also remove read_timeouts for descriptor
self._on_tcp_failure('Exception while connecting to %s: %s - %s' %
(conn_5tuple[4], errnum, errstr))
def _on_tcp_connect(self, data):
''' This method preceeds actual call of on_tcp_connect callback
'''
self.state = CONNECTED
self.idlequeue.remove_timeout(self.fd)
self.on_tcp_connect(data)
def _on_tcp_failure(self,err_msg):
''' This method preceeds actual call of on_tcp_failure callback
'''
self.state = DISCONNECTED
self.idlequeue.unplug_idle(self.fd)
self.on_tcp_failure(err_msg)
def pollin(self):
'''called when receive on plugged socket is possible '''
log.debug('pollin called, state == %s' % self.state)
def pollout(self):
'''called when send to plugged socket is possible'''
log.debug('pollout called, state == %s' % self.state)
if self.state==CONNECTING:
self._on_tcp_connect(self)
return
def pollend(self):
'''called when remote site closed connection'''
log.debug('pollend called, state == %s' % self.state)
if self.state==CONNECTING:
self._on_tcp_failure('Error during connect to %s:%s' % self.conn_5tuple[4])
def read_timeout(self):
'''
Implemntation of IdleObject function called on timeouts from IdleQueue.
'''
log.debug('read_timeout called, state == %s' % self.state)
if self.state==CONNECTING:
# if read_timeout is called during connecting, connect() didn't end yet
# thus we have to close the socket
try:
self.sock.close()
except socket.error, (errnum, errmsg):
log.error('Error while closing socket on connection timeout: %s %s'
% (errnum, errmsg))
self._on_tcp_failure('Error during connect to %s:%s' % self.conn_5tuple[4])
def disconnect(self, on_disconnect=None):
if self.state == DISCONNECTED:
return
self.idlequeue.unplug_idle(self.fd)
try:
self.sock.shutdown(socket.SHUT_RDWR)
except socket.error, (errnum, errstr):
log.error('Error while disconnecting: %s %s' % (errnum,errstr))
try:
self.sock.close()
except socket.error, (errnum, errmsg):
log.error('Error closing socket: %s %s' % (errnum,errstr))
if on_disconnect:
on_disconnect()
def send(self, data, now=False):
pass
def onreceive(self):
pass
def set_send_timeout(self):
pass
def set_timeout(self):
pass
def remove_timeout(self):
pass
def start_disconnect(self):
pass

View File

@ -1,13 +1,13 @@
import unittest, threading
from mock import Mock
import unittest
from xmpp_mocks import *
import sys, time, os.path
import sys, os.path
gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
sys.path.append(gajim_root + '/src/common/xmpp')
import client_nb, idlequeue
import client_nb
'''
Testing script for NonBlockingClient class (src/common/xmpp/client_nb.py)
@ -15,10 +15,6 @@ It actually connects to a xmpp server so the connection values have to be
changed before running.
'''
idlequeue_interval = 0.2
'''
IdleQueue polling interval. 200ms is used in Gajim as default
'''
xmpp_server_port = ('xmpp.example.org',5222)
'''
@ -26,111 +22,12 @@ xmpp_server_port = ('xmpp.example.org',5222)
Script will connect to the machine.
'''
credentials = ['primus', 'l0v3', 'testclient']
credentials = ['login', 'pass', 'testclient']
'''
[username, password, passphrase]
Script will autheticate itself with this credentials on above mentioned server.
'''
class MockConnectionClass(Mock):
'''
Class simulating Connection class from src/common/connection.py
It is derived from Mock in order to avoid defining all methods
from real Connection that are called from NBClient or Dispatcher
( _event_dispatcher for example)
'''
def __init__(self, *args):
self.event = threading.Event()
'''
is used for waiting on connect, auth and disconnect callbacks
'''
self.event.clear()
Mock.__init__(self, *args)
def on_connect(self, *args):
'''
Method called on succesful connecting - after receiving <stream:features>
from server (NOT after TLS stream restart).
'''
#print 'on_connect - args:'
#for i in args:
# print ' %s' % i
self.connect_failed = False
self.event.set()
def on_connect_failure(self, *args):
'''
Method called on failure while connecting - on everything from TCP error
to error during TLS handshake
'''
#print 'on_connect failure - args:'
#for i in args:
# print ' %s' % i
self.connect_failed = True
self.event.set()
def on_auth(self, con, auth):
'''
Method called after authentication is done regardless on the result.
:Parameters:
con : NonBlockingClient
reference to authenticated object
auth : string
type of authetication in case of success ('old_auth', 'sasl') or
None in case of auth failure
'''
#print 'on_auth - args:'
#print ' con: %s' % con
#print ' auth: %s' % auth
self.auth_connection = con
self.auth = auth
self.event.set()
def wait(self):
'''
Waiting until some callback sets the event and clearing the event subsequently.
'''
self.event.wait()
self.event.clear()
class IdleQueueThread(threading.Thread):
'''
Thread for regular processing of idlequeue.
'''
def __init__(self):
self.iq = idlequeue.IdleQueue()
self.stop = threading.Event()
'''
Event used to stopping the thread main loop.
'''
self.stop.clear()
threading.Thread.__init__(self)
def run(self):
while not self.stop.isSet():
self.iq.process()
time.sleep(idlequeue_interval)
self.iq.process()
def stop_thread(self):
self.stop.set()
class TestNonBlockingClient(unittest.TestCase):
'''
Test Cases class for NonBlockingClient.
@ -147,8 +44,8 @@ class TestNonBlockingClient(unittest.TestCase):
self.client = client_nb.NonBlockingClient(
server=xmpp_server_port[0],
port=xmpp_server_port[1],
on_connect=self.connection.on_connect,
on_connect_failure=self.connection.on_connect_failure,
on_connect=lambda *args: self.connection.on_connect(True, *args),
on_connect_failure=lambda *args: self.connection.on_connect(False, *args),
caller=self.connection
)
'''
@ -180,10 +77,10 @@ class TestNonBlockingClient(unittest.TestCase):
self.connection.wait()
# if on_connect was called, client has to be connected and vice versa
if self.connection.connect_failed:
self.assert_(not self.client.isConnected())
else:
if self.connection.connect_succeeded:
self.assert_(self.client.isConnected())
else:
self.assert_(not self.client.isConnected())
def client_auth(self, username, password, resource, sasl):
'''
@ -203,7 +100,7 @@ class TestNonBlockingClient(unittest.TestCase):
'''
Does disconnecting of connected client. Returns when TCP connection is closed.
'''
self.client.start_disconnect(None, on_disconnect=self.connection.event.set)
self.client.start_disconnect(None, on_disconnect=self.connection.set_event)
print 'waiting for disconnecting...'
self.connection.wait()
@ -260,7 +157,7 @@ class TestNonBlockingClient(unittest.TestCase):
'''
self.open_stream(xmpp_server_port)
self.assert_(self.client.isConnected())
self.client_auth(credentials[0], "wrong pass", credentials[2], sasl=0)
self.client_auth(credentials[0], "wrong pass", credentials[2], sasl=1)
self.assert_(self.connection.auth is None)
self.do_disconnect()
@ -271,7 +168,10 @@ class TestNonBlockingClient(unittest.TestCase):
if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingClient)
#suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingClient)
suite = unittest.TestSuite()
suite.addTest(TestNonBlockingClient('test_proper_connect_sasl'))
unittest.TextTestRunner(verbosity=2).run(suite)

119
test/test_nonblockingtcp.py Normal file
View File

@ -0,0 +1,119 @@
'''
Unit test for NonBlockingTcp tranport.
'''
import unittest
from xmpp_mocks import *
import threading, sys, os.path, time
gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
sys.path.append(gajim_root + '/src/common/xmpp')
sys.path.append(gajim_root + '/src/common')
import transports_new, debug
from client import *
xmpp_server = ('xmpp.example.org',5222)
'''
2-tuple - (XMPP server hostname, c2s port)
Script will connect to the machine.
'''
dns_timeout = 10
'''
timeout for DNS A-request (for getaddrinfo() call)
'''
class MockClient(IdleMock):
def __init__(self, server, port):
self.debug_flags=['all', 'nodebuilder']
self._DEBUG = debug.Debug(['socket'])
self.DEBUG = self._DEBUG.Show
self.server = server
self.port = port
IdleMock.__init__(self)
self.tcp_connected = False
self.ip_addresses = []
self.socket = None
def do_dns_request(self):
transports_new.NBgetaddrinfo(
server=(self.server, self.port),
on_success=lambda *args:self.on_success('DNSrequest', *args),
on_failure=self.on_failure,
timeout_sec=dns_timeout
)
self.wait()
def try_next_ip(self, err_message=None):
if err_message:
print err_message
if self.ip_addresses == []:
self.on_failure('Run out of hosts')
return
current_ip = self.ip_addresses.pop(0)
self.NonBlockingTcp.connect(
conn_5tuple=current_ip,
on_tcp_connect=lambda *args: self.on_success('TCPconnect',*args),
on_tcp_failure=self.try_next_ip,
idlequeue=self.idlequeue
)
self.wait()
def set_idlequeue(self, idlequeue):
self.idlequeue=idlequeue
def on_failure(self, data):
print 'Error: %s' % data
self.set_event()
def on_success(self, mode, data):
if mode == "DNSrequest":
self.ip_addresses = data
elif mode == "TCPconnect":
pass
self.set_event()
class TestNonBlockingTcp(unittest.TestCase):
def setUp(self):
self.nbtcp = transports_new.NonBlockingTcp()
self.client = MockClient(*xmpp_server)
self.idlequeue_thread = IdleQueueThread()
self.idlequeue_thread.start()
self.client.set_idlequeue(self.idlequeue_thread.iq)
self.nbtcp.PlugIn(self.client)
def tearDown(self):
self.idlequeue_thread.stop_thread()
self.idlequeue_thread.join()
def testSth(self):
self.client.do_dns_request()
if self.client.ip_addresses == []:
print 'No IP found for given hostname: %s' % self.client.server
return
else:
self.client.try_next_ip()
if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingTcp)
unittest.TextTestRunner(verbosity=2).run(suite)

120
test/xmpp_mocks.py Normal file
View File

@ -0,0 +1,120 @@
'''
Module with dummy classes for unit testing of xmpp code (src/common/xmpp/*).
'''
import threading, time, os.path, sys
from mock import Mock
gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
sys.path.append(gajim_root + '/src/common/xmpp')
'''
Module with classes usable for XMPP related testing.
'''
import idlequeue
from client import PlugIn
idlequeue_interval = 0.2
'''
IdleQueue polling interval. 200ms is used in Gajim as default
'''
class IdleQueueThread(threading.Thread):
'''
Thread for regular processing of idlequeue.
'''
def __init__(self):
self.iq = idlequeue.IdleQueue()
self.stop = threading.Event()
'''
Event used to stopping the thread main loop.
'''
self.stop.clear()
threading.Thread.__init__(self)
def run(self):
while not self.stop.isSet():
self.iq.process()
time.sleep(idlequeue_interval)
def stop_thread(self):
self.stop.set()
class IdleMock:
'''
Serves as template for testing objects that are normally controlled by GUI.
Allows to wait for asynchronous callbacks with wait() method.
'''
def __init__(self):
self.event = threading.Event()
'''
Event is used for waiting on callbacks.
'''
self.event.clear()
def wait(self):
'''
Waiting until some callback sets the event and clearing the event subsequently.
'''
self.event.wait()
self.event.clear()
def set_event(self):
self.event.set()
class MockConnectionClass(IdleMock,Mock):
'''
Class simulating Connection class from src/common/connection.py
It is derived from Mock in order to avoid defining all methods
from real Connection that are called from NBClient or Dispatcher
( _event_dispatcher for example)
'''
def __init__(self, *args):
self.connect_succeeded = True
IdleMock.__init__(self)
Mock.__init__(self, *args)
def on_connect(self, success, *args):
'''
Method called after connecting - after receiving <stream:features>
from server (NOT after TLS stream restart) or connect failure
'''
print 'on_connect - args:'
print ' success - %s' % success
for i in args:
print ' %s' % i
self.connect_succeeded = success
self.set_event()
def on_auth(self, con, auth):
'''
Method called after authentication is done regardless on the result.
:Parameters:
con : NonBlockingClient
reference to authenticated object
auth : string
type of authetication in case of success ('old_auth', 'sasl') or
None in case of auth failure
'''
#print 'on_auth - args:'
#print ' con: %s' % con
#print ' auth: %s' % auth
self.auth_connection = con
self.auth = auth
self.set_event()