''' Tests for dispatcher_nb.py ''' import unittest import lib lib.setup_env() from mock import Mock from common.xmpp import dispatcher_nb from common.xmpp import protocol class TestDispatcherNB(unittest.TestCase): ''' Test class for NonBlocking dispatcher. Tested dispatcher will be plugged into a mock client ''' def setUp(self): self.dispatcher = dispatcher_nb.XMPPDispatcher() # Setup mock client self.client = Mock() self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one? self.client._caller = Mock() self.client.defaultNamespace = protocol.NS_CLIENT self.client.Connection = Mock() # mock transport self.con = self.client.Connection def tearDown(self): # Unplug if needed if hasattr(self.dispatcher, '_owner'): self.dispatcher.PlugOut() def _simulate_connect(self): self.dispatcher.PlugIn(self.client) # client is owner # Simulate that we have established a connection self.dispatcher.StreamInit() self.dispatcher.ProcessNonBlocking("") def test_unbound_namespace_prefix(self): '''tests our handling of a message with an unbound namespace prefix''' self._simulate_connect() msgs = [] def _got_message(conn, msg): msgs.append(msg) self.dispatcher.RegisterHandler('message', _got_message) # should be able to parse a normal message self.dispatcher.ProcessNonBlocking('hello') self.assertEqual(1, len(msgs)) self.dispatcher.ProcessNonBlocking('') self.assertEqual(2, len(msgs)) # we should not have been disconnected after that message self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend'))) self.assertEqual(0, len(self.con.mockGetNamedCalls('disconnect'))) # we should be able to keep parsing self.dispatcher.ProcessNonBlocking('still here?') self.assertEqual(3, len(msgs)) def test_process_non_blocking(self): ''' Check for ProcessNonBlocking return types ''' self._simulate_connect() process = self.dispatcher.ProcessNonBlocking # length of data expected data = "Please don't fail" result = process(data) self.assertEqual(result, len(data)) # no data processed, link shall still be active result = process('') self.assertEqual(result, '0') self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')) + len(self.con.mockGetNamedCalls('disconnect'))) # simulate disconnect result = process('') self.assertEqual(1, len(self.client.mockGetNamedCalls('disconnect'))) def test_return_stanza_handler(self): ''' Test sasl_error_conditions transformation in protocol.py ''' # quick'n dirty...I wasn't aware of it existance and thought it would # always fail :-) self._simulate_connect() stanza = "" def send(data): self.assertEqual(str(data), 'The feature requested is not implemented by the recipient or server and therefore cannot be processed.') self.client.send = send self.dispatcher.ProcessNonBlocking(stanza) if __name__ == '__main__': unittest.main()