269 lines
		
	
	
	
		
			7.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			269 lines
		
	
	
	
		
			7.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import unittest
 | 
						|
 | 
						|
import sys
 | 
						|
import os.path
 | 
						|
 | 
						|
gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
 | 
						|
 | 
						|
sys.path.append(gajim_root + '/src')
 | 
						|
 | 
						|
# a temporary version of ~/.gajim for testing
 | 
						|
configdir = gajim_root + '/test/tmp'
 | 
						|
 | 
						|
import time
 | 
						|
 | 
						|
from mock import Mock
 | 
						|
 | 
						|
# define _ for i18n
 | 
						|
import __builtin__
 | 
						|
__builtin__._ = lambda x: x
 | 
						|
 | 
						|
# wipe config directory
 | 
						|
import os
 | 
						|
if os.path.isdir(configdir):
 | 
						|
	import shutil
 | 
						|
	shutil.rmtree(configdir)
 | 
						|
 | 
						|
os.mkdir(configdir)
 | 
						|
 | 
						|
import common.configpaths
 | 
						|
common.configpaths.gajimpaths.init(configdir)
 | 
						|
common.configpaths.gajimpaths.init_profile()
 | 
						|
 | 
						|
# for some reason common.gajim needs to be imported before xmpppy?
 | 
						|
from common import gajim
 | 
						|
from common import xmpp
 | 
						|
 | 
						|
gajim.DATA_DIR = gajim_root + '/data'
 | 
						|
 | 
						|
from common.stanza_session import StanzaSession
 | 
						|
 | 
						|
# name to use for the test account
 | 
						|
account_name = 'test'
 | 
						|
 | 
						|
class MockConnection(Mock):
 | 
						|
	def __init__(self, *args):
 | 
						|
		Mock.__init__(self, *args)
 | 
						|
		self.name = account_name
 | 
						|
		gajim.connections[self.name] = self
 | 
						|
 | 
						|
class TestStanzaSession(unittest.TestCase):
 | 
						|
	def setUp(self):
 | 
						|
		self.jid = 'test@example.org/Gajim'
 | 
						|
		self.conn = MockConnection({'send_stanza': None})
 | 
						|
		self.sess = StanzaSession(self.conn, self.jid, None, 'chat')
 | 
						|
 | 
						|
	def test_generate_thread_id(self):
 | 
						|
		# thread_id is a string
 | 
						|
		self.assert_(isinstance(self.sess.thread_id, str))
 | 
						|
 | 
						|
		# it should be somewhat long, to avoid clashes
 | 
						|
		self.assert_(len(self.sess.thread_id) >= 32)
 | 
						|
 | 
						|
	def test_is_loggable(self):
 | 
						|
		# by default a session should be loggable
 | 
						|
		# (unless the no_log_for setting says otherwise)
 | 
						|
		self.assert_(self.sess.is_loggable())
 | 
						|
 | 
						|
	def test_terminate(self):
 | 
						|
		# termination is sent by default
 | 
						|
		self.sess.last_send = time.time()
 | 
						|
		self.sess.terminate()
 | 
						|
 | 
						|
		self.assertEqual(None, self.sess.status)
 | 
						|
 | 
						|
		calls = self.conn.mockGetNamedCalls('send_stanza')
 | 
						|
		msg = calls[0].getParam(0)
 | 
						|
 | 
						|
		self.assertEqual(msg.getThread(), self.sess.thread_id)
 | 
						|
 | 
						|
	def test_terminate_without_sendng(self):
 | 
						|
		# no termination is sent if no messages have been sent in the session
 | 
						|
		self.sess.terminate()
 | 
						|
 | 
						|
		self.assertEqual(None, self.sess.status)
 | 
						|
 | 
						|
		calls = self.conn.mockGetNamedCalls('send_stanza')
 | 
						|
		self.assertEqual(0, len(calls))
 | 
						|
 | 
						|
	def test_terminate_no_remote_xep_201(self):
 | 
						|
		# no termination is sent if only messages without thread ids have been
 | 
						|
		# received
 | 
						|
		self.sess.last_send = time.time()
 | 
						|
		self.sess.last_receive = time.time()
 | 
						|
		self.sess.terminate()
 | 
						|
 | 
						|
		self.assertEqual(None, self.sess.status)
 | 
						|
 | 
						|
		calls = self.conn.mockGetNamedCalls('send_stanza')
 | 
						|
		self.assertEqual(0, len(calls))
 | 
						|
 | 
						|
from session import ChatControlSession
 | 
						|
 | 
						|
class MockWindow(Mock):
 | 
						|
	def __init__(self, *args):
 | 
						|
		Mock.__init__(self, *args)
 | 
						|
		self.window = Mock()
 | 
						|
 | 
						|
class MockChatControl(Mock):
 | 
						|
	def __init__(self, *args):
 | 
						|
		Mock.__init__(self, *args)
 | 
						|
 | 
						|
		self.parent_win = MockWindow({'get_active_control': self})
 | 
						|
		self.session = None
 | 
						|
 | 
						|
	def set_session(self, sess):
 | 
						|
		self.session = sess
 | 
						|
 | 
						|
	def __nonzero__(self):
 | 
						|
		return True
 | 
						|
 | 
						|
	def __eq__(self, other):
 | 
						|
		return self is other
 | 
						|
 | 
						|
class MockInterface(Mock):
 | 
						|
	def __init__(self, *args):
 | 
						|
		Mock.__init__(self, *args)
 | 
						|
		self.msg_win_mgr = Mock()
 | 
						|
		self.roster = Mock()
 | 
						|
 | 
						|
		self.remote_ctrl = None
 | 
						|
		self.minimized_controls = { account_name: {} }
 | 
						|
 | 
						|
class MockLogger(Mock):
 | 
						|
	def __init__(self):
 | 
						|
		Mock.__init__(self, {'write': None})
 | 
						|
 | 
						|
class MockContact(Mock):
 | 
						|
	def __nonzero__(self):
 | 
						|
		return True
 | 
						|
 | 
						|
gajim.interface = MockInterface()
 | 
						|
gajim.contacts.add_account(account_name)
 | 
						|
 | 
						|
import notify
 | 
						|
 | 
						|
class TestChatControlSession(unittest.TestCase):
 | 
						|
	def setUp(self):
 | 
						|
		self.jid = 'test@example.org/Gajim'
 | 
						|
		self.conn = MockConnection({'send_stanza': None})
 | 
						|
		self.sess = ChatControlSession(self.conn, self.jid, None)
 | 
						|
		gajim.logger = MockLogger()
 | 
						|
 | 
						|
		# initially there are no events
 | 
						|
		self.assertEqual(0, len(gajim.events.get_events(account_name)))
 | 
						|
 | 
						|
		# no notifications have been sent
 | 
						|
		self.assertEqual(0, len(notify.notifications))
 | 
						|
 | 
						|
	def tearDown(self):
 | 
						|
		# remove all events and notifications that were added
 | 
						|
		gajim.events._events = {}
 | 
						|
		notify.notifications = []
 | 
						|
 | 
						|
	def receive_chat_msg(self, jid, msgtxt):
 | 
						|
		'''simulate receiving a chat message from jid'''
 | 
						|
		msg = xmpp.Message()
 | 
						|
		msg.setBody(msgtxt)
 | 
						|
		msg.setType('chat')
 | 
						|
 | 
						|
		tim = time.localtime()
 | 
						|
		encrypted = False
 | 
						|
		self.sess.received(jid, msgtxt, tim, encrypted, msg)
 | 
						|
 | 
						|
	# ----- custom assertions -----
 | 
						|
	def assert_new_message_notification(self):
 | 
						|
		'''a new_message notification has been sent'''
 | 
						|
		self.assertEqual(1, len(notify.notifications))
 | 
						|
		notif = notify.notifications[0]
 | 
						|
		self.assertEqual('new_message', notif[0])
 | 
						|
 | 
						|
	def assert_first_message_notification(self):
 | 
						|
		'''this message was treated as a first message'''
 | 
						|
		self.assert_new_message_notification()
 | 
						|
		notif = notify.notifications[0]
 | 
						|
		params = notif[3]
 | 
						|
		first = params[1]
 | 
						|
		self.assert_(first, 'message should have been treated as a first message')
 | 
						|
 | 
						|
	def assert_not_first_message_notification(self):
 | 
						|
		'''this message was not treated as a first message'''
 | 
						|
		self.assert_new_message_notification()
 | 
						|
		notif = notify.notifications[0]
 | 
						|
		params = notif[3]
 | 
						|
		first = params[1]
 | 
						|
		self.assert_(not first, 'message was unexpectedly treated as a first message')
 | 
						|
 | 
						|
	# ----- tests -----
 | 
						|
	def test_receive_nocontrol(self):
 | 
						|
		'''test receiving a message in a blank state'''
 | 
						|
		jid = 'bct@necronomicorp.com/Gajim'
 | 
						|
		msgtxt = 'testing one two three'
 | 
						|
 | 
						|
		self.receive_chat_msg(jid, msgtxt)
 | 
						|
 | 
						|
		# message was logged
 | 
						|
		calls = gajim.logger.mockGetNamedCalls('write')
 | 
						|
		self.assertEqual(1, len(calls))
 | 
						|
 | 
						|
		# no ChatControl was open and autopopup was off
 | 
						|
		# so the message goes into the event queue
 | 
						|
		self.assertEqual(1, len(gajim.events.get_events(account_name)))
 | 
						|
 | 
						|
		self.assert_first_message_notification()
 | 
						|
 | 
						|
		# no control is attached to the session
 | 
						|
		self.assertEqual(None, self.sess.control)
 | 
						|
 | 
						|
	def test_receive_already_has_control(self):
 | 
						|
		'''test receiving a message with a session already attached to a control'''
 | 
						|
		jid = 'bct@necronomicorp.com/Gajim'
 | 
						|
		msgtxt = 'testing one two three'
 | 
						|
 | 
						|
		self.sess.control = MockChatControl()
 | 
						|
 | 
						|
		self.receive_chat_msg(jid, msgtxt)
 | 
						|
 | 
						|
		# message was logged
 | 
						|
		calls = gajim.logger.mockGetNamedCalls('write')
 | 
						|
		self.assertEqual(1, len(calls))
 | 
						|
 | 
						|
		# the message does not go into the event queue
 | 
						|
		self.assertEqual(0, len(gajim.events.get_events(account_name)))
 | 
						|
 | 
						|
		self.assert_not_first_message_notification()
 | 
						|
 | 
						|
		# message was printed to the control
 | 
						|
		calls = self.sess.control.mockGetNamedCalls('print_conversation')
 | 
						|
		self.assertEqual(1, len(calls))
 | 
						|
 | 
						|
	def test_received_orphaned_control(self):
 | 
						|
		'''test receiving a message when a control that doesn't have a session attached exists'''
 | 
						|
 | 
						|
		jid = 'bct@necronomicorp.com/Gajim'
 | 
						|
		msgtxt = 'testing one two three'
 | 
						|
 | 
						|
		ctrl = MockChatControl()
 | 
						|
		gajim.interface.msg_win_mgr = Mock({'get_sessionless_ctrl': ctrl})
 | 
						|
 | 
						|
		self.receive_chat_msg(jid, msgtxt)
 | 
						|
 | 
						|
		# message was logged
 | 
						|
		calls = gajim.logger.mockGetNamedCalls('write')
 | 
						|
		self.assertEqual(1, len(calls))
 | 
						|
 | 
						|
		# the message does not go into the event queue
 | 
						|
		self.assertEqual(0, len(gajim.events.get_events(account_name)))
 | 
						|
 | 
						|
		self.assert_not_first_message_notification()
 | 
						|
 | 
						|
		# this session is now attached to that control
 | 
						|
		self.assertEqual(self.sess, ctrl.session)
 | 
						|
		self.assertEqual(ctrl, self.sess.control, 'foo')
 | 
						|
 | 
						|
		# message was printed to the control
 | 
						|
		calls = ctrl.mockGetNamedCalls('print_conversation')
 | 
						|
		self.assertEqual(1, len(calls))
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    unittest.main()
 |