99 lines
3.3 KiB
Python
99 lines
3.3 KiB
Python
'''
|
|
Tests for dispatcher_nb.py
|
|
'''
|
|
import unittest
|
|
|
|
import lib
|
|
lib.setup_env()
|
|
|
|
from mock import Mock
|
|
|
|
from common.xmpp import dispatcher_nb
|
|
from common.xmpp import protocol
|
|
|
|
class TestDispatcherNB(unittest.TestCase):
|
|
'''
|
|
Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
|
|
into a mock client
|
|
'''
|
|
def setUp(self):
|
|
self.dispatcher = dispatcher_nb.XMPPDispatcher()
|
|
|
|
# Setup mock client
|
|
self.client = Mock()
|
|
self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one?
|
|
self.client._caller = Mock()
|
|
self.client.defaultNamespace = protocol.NS_CLIENT
|
|
self.client.Connection = Mock() # mock transport
|
|
self.con = self.client.Connection
|
|
|
|
def tearDown(self):
|
|
# Unplug if needed
|
|
if hasattr(self.dispatcher, '_owner'):
|
|
self.dispatcher.PlugOut()
|
|
|
|
def _simulate_connect(self):
|
|
self.dispatcher.PlugIn(self.client) # client is owner
|
|
# Simulate that we have established a connection
|
|
self.dispatcher.ProcessNonBlocking("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'>")
|
|
|
|
def test_unbound_namespace_prefix(self):
|
|
'''tests our handling of a message with an unbound namespace prefix'''
|
|
self._simulate_connect()
|
|
|
|
msgs = []
|
|
def _got_message(conn, msg):
|
|
msgs.append(msg)
|
|
self.dispatcher.RegisterHandler('message', _got_message)
|
|
|
|
# should be able to parse a normal message
|
|
self.dispatcher.ProcessNonBlocking('<message><body>hello</body></message>')
|
|
self.assertEqual(1, len(msgs))
|
|
|
|
self.dispatcher.ProcessNonBlocking('<message><x:y/></message>')
|
|
self.assertEqual(2, len(msgs))
|
|
# we should not have been disconnected after that message
|
|
self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')))
|
|
self.assertEqual(0, len(self.con.mockGetNamedCalls('disconnect')))
|
|
|
|
# we should be able to keep parsing
|
|
self.dispatcher.ProcessNonBlocking('<message><body>still here?</body></message>')
|
|
self.assertEqual(3, len(msgs))
|
|
|
|
def test_process_non_blocking(self):
|
|
''' Check for ProcessNonBlocking return types '''
|
|
self._simulate_connect()
|
|
process = self.dispatcher.ProcessNonBlocking
|
|
|
|
# length of data expected
|
|
data = "Please don't fail"
|
|
result = process(data)
|
|
self.assertEqual(result, len(data))
|
|
|
|
# no data processed, link shall still be active
|
|
result = process('')
|
|
self.assertEqual(result, '0')
|
|
self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')) +
|
|
len(self.con.mockGetNamedCalls('disconnect')))
|
|
|
|
# simulate disconnect
|
|
result = process('</stream:stream>')
|
|
self.assertEqual(1, len(self.client.mockGetNamedCalls('disconnect')))
|
|
|
|
def test_return_stanza_handler(self):
|
|
''' Test sasl_error_conditions transformation in protocol.py '''
|
|
# quick'n dirty...I wasn't aware of it existance and thought it would
|
|
# always fail :-)
|
|
self._simulate_connect()
|
|
stanza = "<iq type='get' />"
|
|
def send(data):
|
|
self.assertEqual(str(data), '<iq xmlns="jabber:client" type="error"><error code="501" type="cancel"><feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" /><text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">The feature requested is not implemented by the recipient or server and therefore cannot be processed.</text></error></iq>')
|
|
self.client.send = send
|
|
self.dispatcher.ProcessNonBlocking(stanza)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
|
|
# vim: se ts=3:
|