99 lines
		
	
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			99 lines
		
	
	
	
		
			3.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| '''
 | |
| Tests for dispatcher_nb.py
 | |
| '''
 | |
| import unittest
 | |
| 
 | |
| import lib
 | |
| lib.setup_env()
 | |
| 
 | |
| from mock import Mock
 | |
| 
 | |
| from common.xmpp import dispatcher_nb
 | |
| from common.xmpp import protocol
 | |
| 
 | |
| class TestDispatcherNB(unittest.TestCase):
 | |
| 	'''
 | |
| 	Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
 | |
| 	into a mock client
 | |
| 	'''
 | |
| 	def setUp(self):
 | |
| 		self.dispatcher = dispatcher_nb.XMPPDispatcher()
 | |
| 
 | |
| 		# Setup mock client
 | |
| 		self.client = Mock()
 | |
| 		self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one?
 | |
| 		self.client._caller = Mock()
 | |
| 		self.client.defaultNamespace = protocol.NS_CLIENT
 | |
| 		self.client.Connection = Mock() # mock transport
 | |
| 		self.con = self.client.Connection
 | |
| 	
 | |
| 	def tearDown(self):
 | |
| 		# Unplug if needed
 | |
| 		if hasattr(self.dispatcher, '_owner'):
 | |
| 			self.dispatcher.PlugOut()
 | |
| 
 | |
| 	def _simulate_connect(self):
 | |
| 		self.dispatcher.PlugIn(self.client) # client is owner
 | |
| 		# Simulate that we have established a connection
 | |
| 		self.dispatcher.StreamInit()
 | |
| 		self.dispatcher.ProcessNonBlocking("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'>")
 | |
| 
 | |
| 	def test_unbound_namespace_prefix(self):
 | |
| 		'''tests our handling of a message with an unbound namespace prefix'''
 | |
| 		self._simulate_connect()
 | |
| 		
 | |
| 		msgs = []
 | |
| 		def _got_message(conn, msg):
 | |
| 			msgs.append(msg)
 | |
| 		self.dispatcher.RegisterHandler('message', _got_message)
 | |
| 
 | |
| 		# should be able to parse a normal message
 | |
| 		self.dispatcher.ProcessNonBlocking('<message><body>hello</body></message>')
 | |
| 		self.assertEqual(1, len(msgs))
 | |
| 
 | |
| 		self.dispatcher.ProcessNonBlocking('<message><x:y/></message>')
 | |
| 		self.assertEqual(2, len(msgs))
 | |
| 		# we should not have been disconnected after that message
 | |
| 		self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')))
 | |
| 		self.assertEqual(0, len(self.con.mockGetNamedCalls('disconnect')))
 | |
| 
 | |
| 		# we should be able to keep parsing
 | |
| 		self.dispatcher.ProcessNonBlocking('<message><body>still here?</body></message>')
 | |
| 		self.assertEqual(3, len(msgs))
 | |
| 
 | |
| 	def test_process_non_blocking(self):
 | |
| 		''' Check for ProcessNonBlocking return types '''
 | |
| 		self._simulate_connect()
 | |
| 		process = self.dispatcher.ProcessNonBlocking
 | |
| 
 | |
| 		# length of data expected
 | |
| 		data = "Please don't fail"
 | |
| 		result = process(data)
 | |
| 		self.assertEqual(result, len(data))
 | |
| 
 | |
| 		# no data processed, link shall still be active
 | |
| 		result = process('')
 | |
| 		self.assertEqual(result, '0')
 | |
| 		self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')) + 
 | |
| 			len(self.con.mockGetNamedCalls('disconnect')))
 | |
| 
 | |
| 		# simulate disconnect
 | |
| 		result = process('</stream:stream>')
 | |
| 		self.assertEqual(1, len(self.client.mockGetNamedCalls('disconnect')))
 | |
| 
 | |
| 	def test_return_stanza_handler(self):
 | |
| 		''' Test sasl_error_conditions transformation in protocol.py '''
 | |
| 		# quick'n dirty...I wasn't aware of it existance and thought it would
 | |
| 		# always fail :-)
 | |
| 		self._simulate_connect()
 | |
| 		stanza = "<iq type='get' />"
 | |
| 		def send(data):
 | |
| 			self.assertEqual(str(data), '<iq xmlns="jabber:client" type="error"><error code="501" type="cancel"><feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" /><text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">The feature requested is not implemented by the recipient or server and therefore cannot be processed.</text></error></iq>')
 | |
| 		self.client.send = send
 | |
| 		self.dispatcher.ProcessNonBlocking(stanza)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
| 	unittest.main()
 | |
| 
 | |
| # vim: se ts=3:
 |