Improve code coverage of our testsuites and do some refactoring.

* resolver does not depend on GTK anymore
 * renamed a few modules for consistency
 * moved all mocks to lib/
 * let client_nb test work again. Was broken here

There are many failing tests, help appreciated :-)
This commit is contained in:
Stephan Erb 2009-01-11 13:49:03 +00:00
parent 8a19e11bee
commit a757177e45
15 changed files with 536 additions and 254 deletions

View File

@ -1,5 +1,5 @@
import sys
import os.path
import os
import getopt
use_x = True
@ -12,7 +12,8 @@ for o, a in opts:
gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..')
# look for modules in the CWD, then gajim/test/lib, then gajim/src, then everywhere else
# look for modules in the CWD, then gajim/test/lib, then gajim/src,
# then everywhere else
sys.path.insert(1, gajim_root + '/src')
sys.path.insert(1, gajim_root + '/test/lib')
@ -23,8 +24,6 @@ configdir = gajim_root + '/test/tmp'
import __builtin__
__builtin__._ = lambda x: x
import os
def setup_env():
# wipe config directory
if os.path.isdir(configdir):
@ -40,6 +39,9 @@ def setup_env():
# for some reason common.gajim needs to be imported before xmpppy?
from common import gajim
import logging
logging.basicConfig()
gajim.DATA_DIR = gajim_root + '/data'
gajim.use_x = use_x

View File

@ -52,6 +52,7 @@ contacts[account3] = {
# 'ask': None, 'groups': [], 'name': None,
# 'resources': {}, 'subscription': u'both'}
}
# We have contacts that are not in roster but only specified in the metadata
metacontact_data = [
[{'account': account3,
@ -75,3 +76,4 @@ metacontact_data = [
'order': 0}]
]
# vim: se ts=3:

View File

@ -1,6 +1,8 @@
# gajim-specific mock objects
from mock import Mock
'''
Module with dummy classes for Gajim specific unit testing
'''
from mock import Mock
from common import gajim
from common.connection_handlers import ConnectionHandlersBase

View File

@ -2,25 +2,15 @@
Module with dummy classes for unit testing of XMPP and related code.
'''
import threading, time, os.path, sys
gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
sys.path.append(gajim_root + '/src/common/xmpp')
import idlequeue
from client import PlugIn
import lib
lib.setup_env()
import threading, time
from mock import Mock
from common.xmpp import idlequeue
from common.xmpp.plugin import PlugIn
idlequeue_interval = 0.2
'''
IdleQueue polling interval. 200ms is used in Gajim as default
'''
IDLEQUEUE_INTERVAL = 0.2 # polling interval. 200ms is used in Gajim as default
IDLEMOCK_TIMEOUT = 30 # how long we wait for an event
class IdleQueueThread(threading.Thread):
'''
@ -28,18 +18,14 @@ class IdleQueueThread(threading.Thread):
'''
def __init__(self):
self.iq = idlequeue.IdleQueue()
self.stop = threading.Event()
'''
Event used to stopping the thread main loop.
'''
self.stop = threading.Event() # Event to stop the thread main loop.
self.stop.clear()
threading.Thread.__init__(self)
def run(self):
while not self.stop.isSet():
self.iq.process()
time.sleep(idlequeue_interval)
time.sleep(IDLEQUEUE_INTERVAL)
def stop_thread(self):
self.stop.set()
@ -51,25 +37,27 @@ class IdleMock:
Allows to wait for asynchronous callbacks with wait() method.
'''
def __init__(self):
self.event = threading.Event()
'''
Event is used for waiting on callbacks.
'''
self.event.clear()
self._event = threading.Event()
self._event.clear()
def wait(self):
'''
Waiting until some callback sets the event and clearing the event
Block until some callback sets the event and clearing the event
subsequently.
Returns True if event was set, False on timeout
'''
self.event.wait()
self.event.clear()
self._event.wait(IDLEMOCK_TIMEOUT)
if self._event.is_set():
self._event.clear()
return True
else:
return False
def set_event(self):
self.event.set()
self._event.set()
class MockConnectionClass(IdleMock, Mock):
class MockConnection(IdleMock, Mock):
'''
Class simulating Connection class from src/common/connection.py
@ -88,12 +76,9 @@ class MockConnectionClass(IdleMock, Mock):
Method called after connecting - after receiving <stream:features>
from server (NOT after TLS stream restart) or connect failure
'''
#print 'on_connect - args:'
#print ' success - %s' % success
#for i in args:
# print ' %s' % i
self.connect_succeeded = success
self.set_event()
def on_auth(self, con, auth):
'''
@ -106,9 +91,6 @@ class MockConnectionClass(IdleMock, Mock):
type of authetication in case of success ('old_auth', 'sasl') or
None in case of auth failure
'''
#print 'on_auth - args:'
#print ' con: %s' % con
#print ' auth: %s' % auth
self.auth_connection = con
self.auth = auth
self.set_event()

View File

@ -35,16 +35,18 @@ for o, a in opts:
sys.exit(2)
# new test modules need to be added manually
#modules = ( 'test_xmpp_dispatcher_nb',
# 'test_xmpp_client_nb',
# 'test_xmpp_transports_nb',
# 'test_resolver',
# 'test_sessions',
# 'test_caps',
# )
modules = ()
if use_x:
modules += ('test_misc_interface',
'test_roster',
'test_sessions',
'test_resolver',
'test_caps',
'test_dispatcher_nb',
'test_nonblockingtcp',
)
nb_errors = 0

View File

@ -1,4 +1,6 @@
# tests for capabilities and the capabilities cache
'''
Tests for capabilities and the capabilities cache
'''
import unittest
import lib

View File

@ -1,55 +0,0 @@
# tests for xmpppy's dispatcher_nb.py
import unittest
import lib
lib.setup_env()
from mock import Mock
from common.xmpp import dispatcher_nb
from common.xmpp import auth_nb
class TestDispatcherNB(unittest.TestCase):
def test_unbound_namespace_prefix(self):
'''tests our handling of a message with an unbound namespace prefix'''
d = dispatcher_nb.XMPPDispatcher()
conn = Mock()
owner = Mock()
owner._caller = Mock()
owner.defaultNamespace = auth_nb.NS_CLIENT
owner.debug_flags = []
owner.Connection = conn
owner._component = False
d._owner = owner
d.plugin(owner)
msgs = []
def _got_message(conn, msg):
msgs.append(msg)
d.RegisterHandler('message', _got_message)
d.StreamInit()
d.ProcessNonBlocking("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'>")
# should be able to parse a normal message
d.ProcessNonBlocking('<message><body>hello</body></message>')
self.assertEqual(1, len(msgs))
d.ProcessNonBlocking('<message><x:y/></message>')
# we should not have been disconnected after that message
self.assertEqual(0, len(conn.mockGetNamedCalls('pollend')))
# we should be able to keep parsing
d.ProcessNonBlocking('<message><body>still here?</body></message>')
self.assertEqual(3, len(msgs))
if __name__ == '__main__':
unittest.main()
# vim: se ts=3:

View File

@ -1,4 +1,6 @@
# tests for the miscellaneous functions scattered throughout src/gajim.py
'''
Tests for the miscellaneous functions scattered throughout src/gajim.py
'''
import unittest
import lib
@ -7,7 +9,7 @@ lib.setup_env()
from common import gajim
from gajim import Interface
from mocks import *
from gajim_mocks import *
gajim.logger = MockLogger()
Interface()

View File

@ -1,84 +0,0 @@
'''
Unit test for NonBlockingTCP tranport.
'''
import unittest
from xmpp_mocks import IdleQueueThread, IdleMock
import sys, os.path
gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
sys.path.append(gajim_root + '/src/common/xmpp')
sys.path.append(gajim_root + '/src/common')
import transports_nb
from client import *
xmpp_server = ('gajim.org', 5222)
'''
2-tuple - (XMPP server hostname, c2s port)
Script will connect to the machine.
'''
import socket
ips = socket.getaddrinfo(xmpp_server[0], xmpp_server[1],
socket.AF_UNSPEC,socket.SOCK_STREAM)
# change xmpp_server on real values
ip = ips[0]
class MockClient(IdleMock):
def __init__(self, idlequeue):
self.idlequeue = idlequeue
IdleMock.__init__(self)
def do_connect(self):
self.socket = transports_nb.NonBlockingTCP(
lambda(event_type, data): sys.stdout.write('raising event %s: %s' % (
event_type, data)), lambda: self.on_success(mode='SocketDisconnect'),
self.idlequeue, False, None)
self.socket.PlugIn(self)
self.socket.connect(conn_5tuple=ip,
on_connect=lambda: self.on_success(mode='TCPconnect'),
on_connect_failure=self.on_failure)
self.wait()
def do_disconnect(self):
self.socket.disconnect()
self.wait()
def on_failure(self, data):
print 'Error: %s' % data
self.set_event()
def on_success(self, mode, data=None):
if mode == "TCPconnect":
pass
if mode == "SocketDisconnect":
pass
self.set_event()
class TestNonBlockingTCP(unittest.TestCase):
def setUp(self):
self.idlequeue_thread = IdleQueueThread()
self.idlequeue_thread.start()
self.client = MockClient(idlequeue=self.idlequeue_thread.iq)
def tearDown(self):
self.idlequeue_thread.stop_thread()
self.idlequeue_thread.join()
def test_connect_disconnect(self):
self.client.do_connect()
self.assert_(self.client.socket.state == 'CONNECTED')
self.client.do_disconnect()
self.assert_(self.client.socket.state == 'DISCONNECTED')
if __name__ == '__main__':
unittest.main()
# vim: se ts=3:

View File

@ -6,12 +6,10 @@ import lib
lib.setup_env()
from common import resolver
from common.xmpp.idlequeue import GlibIdleQueue
from mock import Mock, expectParams
from mocks import *
import gtk
from gajim_mocks import *
from xmpp_mocks import IdleQueueThread
GMAIL_SRV_NAME = '_xmpp-client._tcp.gmail.com'
NONSENSE_NAME = 'sfsdfsdfsdf.sdfs.fsd'
@ -23,21 +21,30 @@ TEST_LIST = [(GMAIL_SRV_NAME, 'srv', True),
(JABBERCZ_SRV_NAME, 'srv', True)]
class TestResolver(unittest.TestCase):
''' Test for LibAsyncNSResolver and NSLookupResolver '''
'''
Test for LibAsyncNSResolver and NSLookupResolver. Requires working
network connection.
'''
def setUp(self):
self.iq = GlibIdleQueue()
self.reset()
self.resolver = None
self.idlequeue_thread = IdleQueueThread()
self.idlequeue_thread.start()
def reset(self):
self.iq = self.idlequeue_thread.iq
self._reset()
self.resolver = None
def tear_down(self):
self.idlequeue_thread.stop_thread()
self.idlequeue_thread.join()
def _reset(self):
self.flag = False
self.expect_results = False
self.nslookup = False
self.resolver = None
def testLibAsyncNSResolver(self):
self.reset()
self._reset()
if not resolver.USE_LIBASYNCNS:
print 'testLibAsyncResolver: libasyncns-python not installed'
return
@ -45,51 +52,49 @@ class TestResolver(unittest.TestCase):
for name, type, expect_results in TEST_LIST:
self.expect_results = expect_results
self.runLANSR(name, type)
self._runLANSR(name, type)
self.flag = False
def runLANSR(self, name, type):
def _runLANSR(self, name, type):
self.resolver.resolve(
host = name,
type = type,
on_ready = self.myonready)
on_ready = self._myonready)
while not self.flag:
time.sleep(1)
self.resolver.process()
def myonready(self, name, result_set):
def _myonready(self, name, result_set):
if __name__ == '__main__':
print 'on_ready called ...'
print 'hostname: %s' % name
print 'result set: %s' % result_set
print 'res.resolved_hosts: %s' % self.resolver.resolved_hosts
from pprint import pprint
pprint('on_ready called ...')
pprint('hostname: %s' % name)
pprint('result set: %s' % result_set)
pprint('res.resolved_hosts: %s' % self.resolver.resolved_hosts)
pprint('')
if self.expect_results:
self.assert_(len(result_set) > 0)
else:
self.assert_(result_set == [])
self.flag = True
if self.nslookup: self._testNSLR()
if self.nslookup:
self._testNSLR()
def testNSLookupResolver(self):
self.reset()
self._reset()
self.nslookup = True
self.resolver = resolver.NSLookupResolver(self.iq)
self.test_list = TEST_LIST
self._testNSLR()
try:
gtk.main()
except KeyboardInterrupt:
print 'KeyboardInterrupt caught'
def _testNSLR(self):
if self.test_list == []:
gtk.main_quit()
return
name, type, self.expect_results = self.test_list.pop()
self.resolver.resolve(
host = name,
type = type,
on_ready = self.myonready)
on_ready = self._myonready)
if __name__ == '__main__':
unittest.main()

View File

@ -1,14 +1,12 @@
import unittest
import time
import lib
lib.setup_env()
from data import *
from mock import Mock, expectParams
from mocks import *
from gajim_mocks import *
from common import gajim
from common import zeroconf

View File

@ -9,7 +9,7 @@ from common import gajim
from common import xmpp
from mock import Mock, expectParams
from mocks import *
from gajim_mocks import *
from common.stanza_session import StanzaSession
@ -17,6 +17,7 @@ from common.stanza_session import StanzaSession
account_name = 'test'
class TestStanzaSession(unittest.TestCase):
''' Testclass for common/stanzasession.py '''
def setUp(self):
self.jid = 'test@example.org/Gajim'
self.conn = MockConnection(account_name, {'send_stanza': None})
@ -74,6 +75,7 @@ gajim.interface = MockInterface()
import notify
class TestChatControlSession(unittest.TestCase):
''' Testclass for session.py '''
def setUp(self):
self.jid = 'test@example.org/Gajim'
self.conn = MockConnection(account_name, {'send_stanza': None})

View File

@ -1,24 +1,24 @@
'''
Testing script for NonBlockingClient class (src/common/xmpp/client_nb.py)
It actually connects to a xmpp server so the connection values have to be
changed before running.
It actually connects to a xmpp server.
'''
import unittest
from xmpp_mocks import MockConnectionClass, IdleQueueThread
import sys, os.path
import lib
lib.setup_env()
gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
sys.path.append(gajim_root + '/src/common/xmpp')
from xmpp_mocks import MockConnection, IdleQueueThread
from mock import Mock
from common.xmpp import client_nb
import client_nb
# (XMPP server hostname, c2s port). Script will connect to the machine.
xmpp_server_port = ('gajim.org', 5222)
# [username, password, passphrase]. Script will authenticate to server above
credentials = ['loginn', 'passwo', 'testresour']
credentials = ['unittest', 'testtest', 'res']
class TestNonBlockingClient(unittest.TestCase):
'''
@ -27,7 +27,7 @@ class TestNonBlockingClient(unittest.TestCase):
def setUp(self):
''' IdleQueue thread is run and dummy connection is created. '''
self.idlequeue_thread = IdleQueueThread()
self.connection = MockConnectionClass()
self.connection = MockConnection() # for dummy callbacks
self.idlequeue_thread.start()
def tearDown(self):
@ -35,6 +35,8 @@ class TestNonBlockingClient(unittest.TestCase):
self.idlequeue_thread.stop_thread()
self.idlequeue_thread.join()
self.client = None
def open_stream(self, server_port):
'''
Method opening the XMPP connection. It returns when <stream:features>
@ -43,13 +45,11 @@ class TestNonBlockingClient(unittest.TestCase):
:param server_port: tuple of (hostname, port) for where the client should
connect.
'''
self.client = client_nb.NonBlockingClient(
domain=server_port[0],
idlequeue=self.idlequeue_thread.iq)
'''
NonBlockingClient instance with parameters from global variables and with
callbacks from dummy connection.
'''
idlequeue=self.idlequeue_thread.iq,
caller=Mock())
self.client.connect(
hostname=server_port[0],
@ -58,8 +58,8 @@ class TestNonBlockingClient(unittest.TestCase):
on_connect_failure=lambda *args: self.connection.on_connect(
False, *args))
print 'waiting for callback from client constructor'
self.connection.wait()
self.assert_(self.connection.wait(),
msg='waiting for callback from client constructor')
# if on_connect was called, client has to be connected and vice versa
if self.connection.connect_succeeded:
@ -71,26 +71,26 @@ class TestNonBlockingClient(unittest.TestCase):
'''
Method authenticating connected client with supplied credentials. Returns
when authentication is over.
:param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
:todo: to check and be more specific about when it returns (bind, session..)
:param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
:todo: to check and be more specific about when it returns
(bind, session..)
'''
self.client.auth(username, password, resource, sasl,
on_auth=self.connection.on_auth)
print 'waiting for authentication...'
self.connection.wait()
self.assert_(self.connection.wait(), msg='waiting for authentication')
def do_disconnect(self):
'''
Does disconnecting of connected client. Returns when TCP connection is closed.
Does disconnecting of connected client. Returns when TCP connection is
closed.
'''
#self.client.start_disconnect(None, on_disconnect=self.connection.set_event)
self.client.RegisterDisconnectHandler(self.connection.set_event)
self.client.disconnect()
print 'waiting for disconnecting...'
self.connection.wait()
self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
def test_proper_connect_sasl(self):
'''
@ -101,16 +101,15 @@ class TestNonBlockingClient(unittest.TestCase):
# if client is not connected, lets raise the AssertionError
self.assert_(self.client.get_connect_type())
# (client.disconnect() is already called from NBClient._on_connected_failure
# so there's need to call it in this case
# client.disconnect() is already called from NBClient via
# _on_connected_failure, no need to call it here
self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
self.assert_(self.connection.con)
self.assert_(self.connection.auth=='sasl')
self.assert_(self.connection.auth=='sasl', msg="Unable to auth via SASL")
self.do_disconnect()
def test_proper_connect_oldauth(self):
'''
The ideal testcase - client is connected, authenticated with old auth and
@ -120,21 +119,22 @@ class TestNonBlockingClient(unittest.TestCase):
self.assert_(self.client.get_connect_type())
self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
self.assert_(self.connection.con)
self.assert_(self.connection.auth=='old_auth')
self.assert_(self.connection.auth=='old_auth',
msg="Unable to auth via old_auth")
self.do_disconnect()
def test_connect_to_nonexisting_host(self):
'''
Connect to nonexisting host. DNS request for A records should return nothing.
Connect to nonexisting host. DNS request for A records should return
nothing.
'''
self.open_stream(('fdsfsdf.fdsf.fss', 5222))
print 'nonexthost: %s' % self.client.get_connect_type()
self.assert_(not self.client.get_connect_type())
def test_connect_to_wrong_port(self):
'''
Connect to nonexisting host. DNS request for A records should return some IP
but there shouldn't be XMPP server running on specified port.
Connect to nonexisting server. DNS request for A records should return an
IP but there shouldn't be XMPP server running on specified port.
'''
self.open_stream((xmpp_server_port[0], 31337))
self.assert_(not self.client.get_connect_type())

View File

@ -0,0 +1,98 @@
'''
Tests for dispatcher_nb.py
'''
import unittest
import lib
lib.setup_env()
from mock import Mock
from common.xmpp import dispatcher_nb
from common.xmpp import protocol
class TestDispatcherNB(unittest.TestCase):
'''
Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
into a mock client
'''
def setUp(self):
self.dispatcher = dispatcher_nb.XMPPDispatcher()
# Setup mock client
self.client = Mock()
self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one?
self.client._caller = Mock()
self.client.defaultNamespace = protocol.NS_CLIENT
self.client.Connection = Mock() # mock transport
self.con = self.client.Connection
def tearDown(self):
# Unplug if needed
if hasattr(self.dispatcher, '_owner'):
self.dispatcher.PlugOut()
def _simulate_connect(self):
self.dispatcher.PlugIn(self.client) # client is owner
# Simulate that we have established a connection
self.dispatcher.ProcessNonBlocking("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'>")
def test_unbound_namespace_prefix(self):
'''tests our handling of a message with an unbound namespace prefix'''
self._simulate_connect()
msgs = []
def _got_message(conn, msg):
msgs.append(msg)
self.dispatcher.RegisterHandler('message', _got_message)
# should be able to parse a normal message
self.dispatcher.ProcessNonBlocking('<message><body>hello</body></message>')
self.assertEqual(1, len(msgs))
self.dispatcher.ProcessNonBlocking('<message><x:y/></message>')
self.assertEqual(2, len(msgs))
# we should not have been disconnected after that message
self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')))
self.assertEqual(0, len(self.con.mockGetNamedCalls('disconnect')))
# we should be able to keep parsing
self.dispatcher.ProcessNonBlocking('<message><body>still here?</body></message>')
self.assertEqual(3, len(msgs))
def test_process_non_blocking(self):
''' Check for ProcessNonBlocking return types '''
self._simulate_connect()
process = self.dispatcher.ProcessNonBlocking
# length of data expected
data = "Please don't fail"
result = process(data)
self.assertEqual(result, len(data))
# no data processed, link shall still be active
result = process('')
self.assertEqual(result, '0')
self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')) +
len(self.con.mockGetNamedCalls('disconnect')))
# simulate disconnect
result = process('</stream:stream>')
self.assertEqual(1, len(self.client.mockGetNamedCalls('disconnect')))
def test_return_stanza_handler(self):
''' Test sasl_error_conditions transformation in protocol.py '''
# quick'n dirty...I wasn't aware of it existance and thought it would
# always fail :-)
self._simulate_connect()
stanza = "<iq type='get' />"
def send(data):
self.assertEqual(str(data), '<iq xmlns="jabber:client" type="error"><error code="501" type="cancel"><feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" /><text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">The feature requested is not implemented by the recipient or server and therefore cannot be processed.</text></error></iq>')
self.client.send = send
self.dispatcher.ProcessNonBlocking(stanza)
if __name__ == '__main__':
unittest.main()
# vim: se ts=3:

View File

@ -0,0 +1,324 @@
'''
Unit test for tranports classes.
'''
import unittest
import socket
import lib
lib.setup_env()
from xmpp_mocks import IdleQueueThread, IdleMock
from common.xmpp import transports_nb
class TestModuleLevelFunctions(unittest.TestCase):
'''
Test class for functions defined at module level
'''
def test_urisplit(self):
def check_uri(uri, proto, host, path):
_proto, _host, _path = transports_nb.urisplit(uri)
self.assertEqual(proto, _proto)
self.assertEqual(host, _host)
self.assertEqual(path, _path)
check_uri('http://httpcm.jabber.org/webclient',
proto='http', host='httpcm.jabber.org', path='/webclient')
def test_get_proxy_data_from_dict(self):
def check_dict(proxy_dict, host, port, user, passwd):
_host, _port, _user, _passwd = transports_nb.get_proxy_data_from_dict(
proxy_dict)
self.assertEqual(_host, host)
self.assertEqual(_port, port)
self.assertEqual(_user, user)
self.assertEqual(_passwd, passwd)
bosh_dict = {'bosh_content': u'text/xml; charset=utf-8',
'bosh_hold': 2,
'bosh_http_pipelining': False,
'bosh_port': 5280,
'bosh_uri': u'http://gajim.org:5280/http-bind',
'bosh_useproxy': False,
'bosh_wait': 30,
'bosh_wait_for_restart_response': False,
'host': u'172.16.99.11',
'pass': u'pass',
'port': 3128,
'type': u'bosh',
'useauth': True,
'user': u'user'}
check_dict(bosh_dict, host=u'gajim.org', port=80, user=u'user',
passwd=u'pass')
proxy_dict = {'bosh_content': u'text/xml; charset=utf-8',
'bosh_hold': 2,
'bosh_http_pipelining': False,
'bosh_port': 5280,
'bosh_uri': u'',
'bosh_useproxy': True,
'bosh_wait': 30,
'bosh_wait_for_restart_response': False,
'host': u'172.16.99.11',
'pass': u'pass',
'port': 3128,
'type': 'socks5',
'useauth': True,
'user': u'user'}
check_dict(proxy_dict, host=u'172.16.99.11', port=3128, user=u'user',
passwd=u'pass')
class AbstractTransportTest(unittest.TestCase):
''' Encapsulates Idlequeue instantiation for transports and more...'''
def setUp(self):
''' IdleQueue thread is run and dummy connection is created. '''
self.idlequeue_thread = IdleQueueThread()
self.idlequeue_thread.start()
self._setup_hook()
def tearDown(self):
''' IdleQueue thread is stopped. '''
self._teardown_hook()
self.idlequeue_thread.stop_thread()
self.idlequeue_thread.join()
def _setup_hook(self):
pass
def _teardown_hook(self):
pass
def expect_receive(self, expected, count=1, msg=None):
'''
Returns a callback function that will assert whether the data passed to
it equals the one specified when calling this function.
Can be used to make sure transport dispatch correct data.
'''
def receive(data, *args, **kwargs):
self.assertEqual(data, expected, msg=msg)
self._expected_count -= 1
self._expected_count = count
return receive
def have_received_expected(self):
'''
Plays together with expect_receive(). Will return true if expected_rcv
callback was called as often as specified
'''
return self._expected_count == 0
class TestNonBlockingTCP(AbstractTransportTest):
'''
Test class for NonBlockingTCP. Will actually try to connect to an existing
XMPP server.
'''
class MockClient(IdleMock):
''' Simple client to test transport functionality '''
def __init__(self, idlequeue, testcase):
self.idlequeue = idlequeue
self.testcase = testcase
IdleMock.__init__(self)
def do_connect(self, establish_tls=False, proxy_dict=None):
try:
ips = socket.getaddrinfo('gajim.org', 5222,
socket.AF_UNSPEC,socket.SOCK_STREAM)
ip = ips[0]
except socket.error, e:
self.testcase.fail(msg=str(e))
self.socket = transports_nb.NonBlockingTCP(
raise_event=lambda event_type, data: self.testcase.assertTrue(
event_type and data),
on_disconnect=lambda: self.on_success(mode='SocketDisconnect'),
idlequeue=self.idlequeue,
estabilish_tls=establish_tls,
certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'),
proxy_dict=proxy_dict)
self.socket.PlugIn(self)
self.socket.connect(conn_5tuple=ip,
on_connect=lambda: self.on_success(mode='TCPconnect'),
on_connect_failure=self.on_failure)
self.testcase.assertTrue(self.wait(), msg='Connection timed out')
def do_disconnect(self):
self.socket.disconnect()
self.testcase.assertTrue(self.wait(), msg='Disconnect timed out')
def on_failure(self, err_message):
self.set_event()
self.testcase.fail(msg=err_message)
def on_success(self, mode, data=None):
if mode == "TCPconnect":
pass
if mode == "SocketDisconnect":
pass
self.set_event()
def _setup_hook(self):
self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq,
testcase=self)
def _teardown_hook(self):
if self.client.socket.state == 'CONNECTED':
self.client.do_disconnect()
def test_connect_disconnect_plain(self):
''' Establish plain connection '''
self.client.do_connect(establish_tls=False)
self.assert_(self.client.socket.state == 'CONNECTED')
self.client.do_disconnect()
self.assert_(self.client.socket.state == 'DISCONNECTED')
# FIXME: testcase not working...
#def test_connect_disconnect_ssl(self):
# ''' Establish SSL (not TLS) connection '''
# self.client.do_connect(establish_tls=True)
# self.assert_(self.client.socket.state == 'CONNECTED')
# self.client.do_disconnect()
# self.assert_(self.client.socket.state == 'DISCONNECTED')
def test_do_receive(self):
''' Test _do_receive method by overwriting socket.recv '''
self.client.do_connect()
sock = self.client.socket
# transport shall receive data
data = "Please don't fail"
sock._recv = lambda buffer: data
sock.onreceive(self.expect_receive(data))
sock._do_receive()
self.assertTrue(self.have_received_expected(), msg='Did not receive data')
self.assert_(self.client.socket.state == 'CONNECTED')
# transport shall do nothing as an non-fatal SSL is simulated
sock._recv = lambda buffer: None
sock.onreceive(self.assertFalse) # we did not receive anything...
sock._do_receive()
self.assert_(self.client.socket.state == 'CONNECTED')
# transport shall disconnect as remote side closed the connection
sock._recv = lambda buffer: ''
sock.onreceive(self.assertFalse) # we did not receive anything...
sock._do_receive()
self.assert_(self.client.socket.state == 'DISCONNECTED')
def test_do_send(self):
''' Test _do_send method by overwriting socket.send '''
self.client.do_connect()
sock = self.client.socket
outgoing = [] # what we have actually send to our socket.socket
data_part1 = "Please don't "
data_part2 = "fail!"
data_complete = data_part1 + data_part2
# Simulate everything could be send in one go
def _send_all(data):
outgoing.append(data)
return len(data)
sock._send = _send_all
sock.send(data_part1)
sock.send(data_part2)
sock._do_send()
sock._do_send()
self.assertTrue(self.client.socket.state == 'CONNECTED')
self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
self.assertFalse(sock.sendqueue and sock.sendbuff,
msg='There is still unsend data in buffers')
# Simulate data could only be sent in chunks
self.chunk_count = 0
outgoing = []
def _send_chunks(data):
if self.chunk_count == 0:
outgoing.append(data_part1)
self.chunk_count += 1
return len(data_part1)
else:
outgoing.append(data_part2)
return len(data_part2)
sock._send = _send_chunks
sock.send(data_complete)
sock._do_send() # process first chunk
sock._do_send() # process the second one
self.assertTrue(self.client.socket.state == 'CONNECTED')
self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
self.assertFalse(sock.sendqueue and sock.sendbuff,
msg='There is still unsend data in buffers')
class TestNonBlockingHTTP(AbstractTransportTest):
''' Test class for NonBlockingHTTP transport'''
bosh_http_dict = {
'http_uri': 'http://httpcm.jabber.org/webclient',
'http_port': 1010,
'http_version': 'HTTP/1.1',
'http_persistent': True,
'add_proxy_headers': False
}
def _get_transport(self, http_dict, proxy_dict=None):
return transports_nb.NonBlockingHTTP(
raise_event=None,
on_disconnect=None,
idlequeue=self.idlequeue_thread.iq,
estabilish_tls=False,
certs=None,
on_http_request_possible=lambda: None,
on_persistent_fallback=None,
http_dict=http_dict,
proxy_dict=proxy_dict,
)
def test_parse_own_http_message(self):
''' Build a HTTP message and try to parse it afterwards '''
transport = self._get_transport(self.bosh_http_dict)
data = "<test>Please don't fail!</test>"
http_message = transport.build_http_message(data)
statusline, headers, http_body = transport.parse_http_message(
http_message)
self.assertTrue(statusline and isinstance(statusline, list))
self.assertTrue(headers and isinstance(headers, dict))
self.assertEqual(data, http_body, msg='Input and output are different')
def test_receive_http_message(self):
''' Let _on_receive handle some http messages '''
transport = self._get_transport(self.bosh_http_dict)
header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +
"Content-Length: 88\r\n\r\n")
payload = "<test>Please don't fail!</test>"
body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
% payload
message = "%s%s" % (header, body)
# try to receive in one go
transport.onreceive(self.expect_receive(body, msg='Failed: In one go'))
transport._on_receive(message)
self.assertTrue(self.have_received_expected(), msg='Failed: In one go')
# try to receive in chunks
chunk1, chunk2, chunk3 = message[:20], message[20:73], message[73:]
nextmessage_chunk = "\r\n\r\nHTTP/1.1 200 OK\r\nContent-Type: text/x"
chunks = (chunk1, chunk2, chunk3, nextmessage_chunk)
transport.onreceive(self.expect_receive(body, msg='Failed: In chunks'))
for chunk in chunks:
transport._on_receive(chunk)
self.assertTrue(self.have_received_expected(), msg='Failed: In chunks')
if __name__ == '__main__':
unittest.main()
# vim: se ts=3: