97 lines
		
	
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			97 lines
		
	
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
'''
 | 
						|
Tests for dispatcher_nb.py
 | 
						|
'''
 | 
						|
import unittest
 | 
						|
 | 
						|
import lib
 | 
						|
lib.setup_env()
 | 
						|
 | 
						|
from mock import Mock
 | 
						|
 | 
						|
from common.xmpp import dispatcher_nb
 | 
						|
from common.xmpp import protocol
 | 
						|
 | 
						|
class TestDispatcherNB(unittest.TestCase):
 | 
						|
    '''
 | 
						|
    Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
 | 
						|
    into a mock client
 | 
						|
    '''
 | 
						|
    def setUp(self):
 | 
						|
        self.dispatcher = dispatcher_nb.XMPPDispatcher()
 | 
						|
 | 
						|
        # Setup mock client
 | 
						|
        self.client = Mock()
 | 
						|
        self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one?
 | 
						|
        self.client._caller = Mock()
 | 
						|
        self.client.defaultNamespace = protocol.NS_CLIENT
 | 
						|
        self.client.Connection = Mock() # mock transport
 | 
						|
        self.con = self.client.Connection
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        # Unplug if needed
 | 
						|
        if hasattr(self.dispatcher, '_owner'):
 | 
						|
            self.dispatcher.PlugOut()
 | 
						|
 | 
						|
    def _simulate_connect(self):
 | 
						|
        self.dispatcher.PlugIn(self.client) # client is owner
 | 
						|
        # Simulate that we have established a connection
 | 
						|
        self.dispatcher.StreamInit()
 | 
						|
        self.dispatcher.ProcessNonBlocking("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'>")
 | 
						|
 | 
						|
    def test_unbound_namespace_prefix(self):
 | 
						|
        '''tests our handling of a message with an unbound namespace prefix'''
 | 
						|
        self._simulate_connect()
 | 
						|
 | 
						|
        msgs = []
 | 
						|
        def _got_message(conn, msg):
 | 
						|
            msgs.append(msg)
 | 
						|
        self.dispatcher.RegisterHandler('message', _got_message)
 | 
						|
 | 
						|
        # should be able to parse a normal message
 | 
						|
        self.dispatcher.ProcessNonBlocking('<message><body>hello</body></message>')
 | 
						|
        self.assertEqual(1, len(msgs))
 | 
						|
 | 
						|
        self.dispatcher.ProcessNonBlocking('<message><x:y/></message>')
 | 
						|
        self.assertEqual(2, len(msgs))
 | 
						|
        # we should not have been disconnected after that message
 | 
						|
        self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')))
 | 
						|
        self.assertEqual(0, len(self.con.mockGetNamedCalls('disconnect')))
 | 
						|
 | 
						|
        # we should be able to keep parsing
 | 
						|
        self.dispatcher.ProcessNonBlocking('<message><body>still here?</body></message>')
 | 
						|
        self.assertEqual(3, len(msgs))
 | 
						|
 | 
						|
    def test_process_non_blocking(self):
 | 
						|
        ''' Check for ProcessNonBlocking return types '''
 | 
						|
        self._simulate_connect()
 | 
						|
        process = self.dispatcher.ProcessNonBlocking
 | 
						|
 | 
						|
        # length of data expected
 | 
						|
        data = "Please don't fail"
 | 
						|
        result = process(data)
 | 
						|
        self.assertEqual(result, len(data))
 | 
						|
 | 
						|
        # no data processed, link shall still be active
 | 
						|
        result = process('')
 | 
						|
        self.assertEqual(result, '0')
 | 
						|
        self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')) +
 | 
						|
                len(self.con.mockGetNamedCalls('disconnect')))
 | 
						|
 | 
						|
        # simulate disconnect
 | 
						|
        result = process('</stream:stream>')
 | 
						|
        self.assertEqual(1, len(self.client.mockGetNamedCalls('disconnect')))
 | 
						|
 | 
						|
    def test_return_stanza_handler(self):
 | 
						|
        ''' Test sasl_error_conditions transformation in protocol.py '''
 | 
						|
        # quick'n dirty...I wasn't aware of it existance and thought it would
 | 
						|
        # always fail :-)
 | 
						|
        self._simulate_connect()
 | 
						|
        stanza = "<iq type='get' />"
 | 
						|
        def send(data):
 | 
						|
            self.assertEqual(str(data), '<iq xmlns="jabber:client" type="error"><error code="501" type="cancel"><feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" /><text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">The feature requested is not implemented by the recipient or server and therefore cannot be processed.</text></error></iq>')
 | 
						|
        self.client.send = send
 | 
						|
        self.dispatcher.ProcessNonBlocking(stanza)
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    unittest.main()
 |