re-enable test suite

This commit is contained in:
Yann Leboulanger 2013-04-07 23:41:15 +02:00
parent aef4452ff5
commit d2992815ee
14 changed files with 161 additions and 872 deletions

View File

@ -6,6 +6,8 @@ import unittest
import lib
lib.setup_env()
import nbxmpp
from common import gajim
from common import contacts as contacts_module
from gajim import Interface
@ -46,8 +48,16 @@ class TestStatusChange(unittest.TestCase):
def contact_comes_online(self, account, jid, resource, prio):
'''a remote contact comes online'''
gajim.interface.handle_event_notify(account, (jid, 'online', "I'm back!",
resource, prio, None, time.time(), None))
xml = """<presence from='%s/%s' id='123'>
<c node='http://gajim.org' ver='pRCD6cgQ4SDqNMCjdhRV6TECx5o='
hash='sha-1' xmlns='http://jabber.org/protocol/caps'/>
<status>I'm back!</status>
</presence>
""" % (jid, resource)
msg = nbxmpp.protocol.Presence(node=nbxmpp.simplexml.XML2Node(xml))
gajim.connections[account]._presenceCB(None, msg)
# gajim.interface.handle_event_notify(account, (jid, 'online', "I'm back!",
# resource, prio, None, time.time(), None))
contact = None
for c in gajim.contacts.get_contacts(account, jid):

View File

@ -1,169 +0,0 @@
'''
Testing script for NonBlockingClient class (src/common/xmpp/client_nb.py)
It actually connects to a xmpp server.
'''
import unittest
import lib
lib.setup_env()
from xmpp_mocks import MockConnection, IdleQueueThread
from mock import Mock
from common.xmpp import client_nb
#import logging
#log = logging.getLogger('gajim')
#log.setLevel(logging.DEBUG)
# (XMPP server hostname, c2s port). Script will connect to the machine.
xmpp_server_port = ('gajim.org', 5222)
# [username, password, passphrase]. Script will authenticate to server above
credentials = ['unittest', 'testtest', 'res']
class TestNonBlockingClient(unittest.TestCase):
'''
Test Cases class for NonBlockingClient.
'''
def setUp(self):
''' IdleQueue thread is run and dummy connection is created. '''
self.idlequeue_thread = IdleQueueThread()
self.connection = MockConnection() # for dummy callbacks
self.idlequeue_thread.start()
def tearDown(self):
''' IdleQueue thread is stopped. '''
self.idlequeue_thread.stop_thread()
self.idlequeue_thread.join()
self.client = None
def open_stream(self, server_port, wrong_pass=False):
'''
Method opening the XMPP connection. It returns when <stream:features>
is received from server.
:param server_port: tuple of (hostname, port) for where the client should
connect.
'''
class TempConnection():
def get_password(self, cb, mechanism):
if wrong_pass:
cb('wrong pass')
else:
cb(credentials[1])
def on_connect_failure(self):
pass
self.client = client_nb.NonBlockingClient(
domain=server_port[0],
idlequeue=self.idlequeue_thread.iq,
caller=Mock(realClass=TempConnection))
self.client.connect(
hostname=server_port[0],
port=server_port[1],
on_connect=lambda *args: self.connection.on_connect(True, *args),
on_connect_failure=lambda *args: self.connection.on_connect(
False, *args))
self.assert_(self.connection.wait(),
msg='waiting for callback from client constructor')
# if on_connect was called, client has to be connected and vice versa
if self.connection.connect_succeeded:
self.assert_(self.client.get_connect_type())
else:
self.assert_(not self.client.get_connect_type())
def client_auth(self, username, password, resource, sasl):
'''
Method authenticating connected client with supplied credentials. Returns
when authentication is over.
:param sasl: whether to use sasl (sasl=1) or old (sasl=0) authentication
:todo: to check and be more specific about when it returns
(bind, session..)
'''
self.client.auth(username, password, resource, sasl,
on_auth=self.connection.on_auth)
self.assert_(self.connection.wait(), msg='waiting for authentication')
def do_disconnect(self):
'''
Does disconnecting of connected client. Returns when TCP connection is
closed.
'''
self.client.RegisterDisconnectHandler(self.connection.set_event)
self.client.disconnect()
self.assertTrue(self.connection.wait(), msg='waiting for disconnecting')
def test_proper_connect_sasl(self):
'''
The ideal testcase - client is connected, authenticated with SASL and
then disconnected.
'''
self.open_stream(xmpp_server_port)
# if client is not connected, lets raise the AssertionError
self.assert_(self.client.get_connect_type())
# client.disconnect() is already called from NBClient via
# _on_connected_failure, no need to call it here
self.client_auth(credentials[0], credentials[1], credentials[2], sasl=1)
self.assert_(self.connection.con)
self.assert_(self.connection.auth=='sasl', msg='Unable to auth via SASL')
self.do_disconnect()
def test_proper_connect_oldauth(self):
'''
The ideal testcase - client is connected, authenticated with old auth and
then disconnected.
'''
self.open_stream(xmpp_server_port)
self.assert_(self.client.get_connect_type())
self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
self.assert_(self.connection.con)
features = self.client.Dispatcher.Stream.features
if not features.getTag('auth'):
print("Server doesn't support old authentication type, ignoring test")
else:
self.assert_(self.connection.auth=='old_auth',
msg='Unable to auth via old_auth')
self.do_disconnect()
def test_connect_to_nonexisting_host(self):
'''
Connect to nonexisting host. DNS request for A records should return
nothing.
'''
self.open_stream(('fdsfsdf.fdsf.fss', 5222))
self.assert_(not self.client.get_connect_type())
def test_connect_to_wrong_port(self):
'''
Connect to nonexisting server. DNS request for A records should return an
IP but there shouldn't be XMPP server running on specified port.
'''
self.open_stream((xmpp_server_port[0], 31337))
self.assert_(not self.client.get_connect_type())
def test_connect_with_wrong_creds(self):
'''
Connecting with invalid password.
'''
self.open_stream(xmpp_server_port, wrong_pass=True)
self.assert_(self.client.get_connect_type())
self.client_auth(credentials[0], 'wrong pass', credentials[2], sasl=1)
self.assert_(self.connection.auth is None)
self.do_disconnect()
if __name__ == '__main__':
unittest.main()

View File

@ -1,276 +0,0 @@
'''
Integration test for tranports classes. See unit for the ordinary
unit tests of this module.
'''
import unittest
import socket
import lib
lib.setup_env()
from xmpp_mocks import IdleQueueThread, IdleMock
from common.xmpp import transports_nb
class AbstractTransportTest(unittest.TestCase):
''' Encapsulates Idlequeue instantiation for transports and more...'''
def setUp(self):
''' IdleQueue thread is run and dummy connection is created. '''
self.idlequeue_thread = IdleQueueThread()
self.idlequeue_thread.start()
self._setup_hook()
def tearDown(self):
''' IdleQueue thread is stopped. '''
self._teardown_hook()
self.idlequeue_thread.stop_thread()
self.idlequeue_thread.join()
def _setup_hook(self):
pass
def _teardown_hook(self):
pass
def expect_receive(self, expected, count=1, msg=None):
'''
Returns a callback function that will assert whether the data passed to
it equals the one specified when calling this function.
Can be used to make sure transport dispatch correct data.
'''
def receive(data, *args, **kwargs):
self.assertEqual(data, expected, msg=msg)
self._expected_count -= 1
self._expected_count = count
return receive
def have_received_expected(self):
'''
Plays together with expect_receive(). Will return true if expected_rcv
callback was called as often as specified
'''
return self._expected_count == 0
class TestNonBlockingTCP(AbstractTransportTest):
'''
Test class for NonBlockingTCP. Will actually try to connect to an existing
XMPP server.
'''
class MockClient(IdleMock):
''' Simple client to test transport functionality '''
def __init__(self, idlequeue, testcase):
self.idlequeue = idlequeue
self.testcase = testcase
IdleMock.__init__(self)
def do_connect(self, establish_tls=False, proxy_dict=None):
try:
ips = socket.getaddrinfo('gajim.org', 5222,
socket.AF_UNSPEC, socket.SOCK_STREAM)
ip = ips[0]
except socket.error as e:
self.testcase.fail(msg=str(e))
self.socket = transports_nb.NonBlockingTCP(
raise_event=lambda event_type, data: self.testcase.assertTrue(
event_type and data),
on_disconnect=lambda: self.on_success(mode='SocketDisconnect'),
idlequeue=self.idlequeue,
estabilish_tls=establish_tls,
certs=('../data/other/cacerts.pem', 'tmp/cacerts.pem'),
proxy_dict=proxy_dict)
self.socket.PlugIn(self)
self.socket.connect(conn_5tuple=ip,
on_connect=lambda: self.on_success(mode='TCPconnect'),
on_connect_failure=self.on_failure)
self.testcase.assertTrue(self.wait(), msg='Connection timed out')
def do_disconnect(self):
self.socket.disconnect()
self.testcase.assertTrue(self.wait(), msg='Disconnect timed out')
def on_failure(self, err_message):
self.set_event()
self.testcase.fail(msg=err_message)
def on_success(self, mode, data=None):
if mode == "TCPconnect":
pass
if mode == "SocketDisconnect":
pass
self.set_event()
def _setup_hook(self):
self.client = self.MockClient(idlequeue=self.idlequeue_thread.iq,
testcase=self)
def _teardown_hook(self):
if self.client.socket.state == 'CONNECTED':
self.client.do_disconnect()
def test_connect_disconnect_plain(self):
''' Establish plain connection '''
self.client.do_connect(establish_tls=False)
self.assertEquals(self.client.socket.state, 'CONNECTED')
self.client.do_disconnect()
self.assertEquals(self.client.socket.state, 'DISCONNECTED')
# def test_connect_disconnect_ssl(self):
# ''' Establish SSL (not TLS) connection '''
# self.client.do_connect(establish_tls=True)
# self.assertEquals(self.client.socket.state, 'CONNECTED')
# self.client.do_disconnect()
# self.assertEquals(self.client.socket.state, 'DISCONNECTED')
def test_do_receive(self):
''' Test _do_receive method by overwriting socket.recv '''
self.client.do_connect()
sock = self.client.socket
# transport shall receive data
data = "Please don't fail"
sock._recv = lambda buffer: data
sock.onreceive(self.expect_receive(data))
sock._do_receive()
self.assertTrue(self.have_received_expected(), msg='Did not receive data')
self.assert_(self.client.socket.state == 'CONNECTED')
# transport shall do nothing as an non-fatal SSL is simulated
sock._recv = lambda buffer: None
sock.onreceive(self.assertFalse) # we did not receive anything...
sock._do_receive()
self.assert_(self.client.socket.state == 'CONNECTED')
# transport shall disconnect as remote side closed the connection
sock._recv = lambda buffer: ''
sock.onreceive(self.assertFalse) # we did not receive anything...
sock._do_receive()
self.assert_(self.client.socket.state == 'DISCONNECTED')
def test_do_send(self):
''' Test _do_send method by overwriting socket.send '''
self.client.do_connect()
sock = self.client.socket
outgoing = [] # what we have actually send to our socket.socket
data_part1 = "Please don't "
data_part2 = "fail!"
data_complete = data_part1 + data_part2
# Simulate everything could be send in one go
def _send_all(data):
outgoing.append(data)
return len(data)
sock._send = _send_all
sock.send(data_part1)
sock.send(data_part2)
sock._do_send()
sock._do_send()
self.assertTrue(self.client.socket.state == 'CONNECTED')
self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
self.assertFalse(sock.sendqueue and sock.sendbuff,
msg='There is still unsend data in buffers')
# Simulate data could only be sent in chunks
self.chunk_count = 0
outgoing = []
def _send_chunks(data):
if self.chunk_count == 0:
outgoing.append(data_part1)
self.chunk_count += 1
return len(data_part1)
else:
outgoing.append(data_part2)
return len(data_part2)
sock._send = _send_chunks
sock.send(data_complete)
sock._do_send() # process first chunk
sock._do_send() # process the second one
self.assertTrue(self.client.socket.state == 'CONNECTED')
self.assertTrue(data_part1 in outgoing and data_part2 in outgoing)
self.assertFalse(sock.sendqueue and sock.sendbuff,
msg='There is still unsend data in buffers')
class TestNonBlockingHTTP(AbstractTransportTest):
''' Test class for NonBlockingHTTP transport'''
bosh_http_dict = {
'http_uri': 'http://gajim.org:5280/http-bind',
'http_version': 'HTTP/1.1',
'http_persistent': True,
'add_proxy_headers': False
}
def _get_transport(self, http_dict, proxy_dict=None):
return transports_nb.NonBlockingHTTP(
raise_event=None,
on_disconnect=None,
idlequeue=self.idlequeue_thread.iq,
estabilish_tls=False,
certs=None,
on_http_request_possible=lambda: None,
on_persistent_fallback=None,
http_dict=http_dict,
proxy_dict=proxy_dict,
)
def test_parse_own_http_message(self):
''' Build a HTTP message and try to parse it afterwards '''
transport = self._get_transport(self.bosh_http_dict)
data = "<test>Please don't fail!</test>"
http_message = transport.build_http_message(data)
statusline, headers, http_body, buffer_rest = transport.parse_http_message(
http_message)
self.assertFalse(bool(buffer_rest))
self.assertTrue(statusline and isinstance(statusline, list))
self.assertTrue(headers and isinstance(headers, dict))
self.assertEqual(data, http_body, msg='Input and output are different')
def test_receive_http_message(self):
''' Let _on_receive handle some http messages '''
transport = self._get_transport(self.bosh_http_dict)
header = ("HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +
"Content-Length: 88\r\n\r\n")
payload = "<test>Please don't fail!</test>"
body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
% payload
message = "%s%s" % (header, body)
# try to receive in one go
transport.onreceive(self.expect_receive(body, msg='Failed: In one go'))
transport._on_receive(message)
self.assertTrue(self.have_received_expected(), msg='Failed: In one go')
def test_receive_http_message_in_chunks(self):
''' Let _on_receive handle some chunked http messages '''
transport = self._get_transport(self.bosh_http_dict)
payload = "<test>Please don't fail!\n\n</test>"
body = "<body xmlns='http://jabber.org/protocol/httpbind'>%s</body>" \
% payload
header = "HTTP/1.1 200 OK\r\nContent-Type: text/xml; charset=utf-8\r\n" +\
"Content-Length: %i\r\n\r\n" % len(body)
message = "%s%s" % (header, body)
chunk1, chunk2, chunk3, chunk4 = message[:20], message[20:73], \
message[73:85], message[85:]
nextmessage_chunk = "\r\n\r\nHTTP/1.1 200 OK\r\nContent-Type: text/x"
chunks = (chunk1, chunk2, chunk3, chunk4, nextmessage_chunk)
transport.onreceive(self.expect_receive(body, msg='Failed: In chunks'))
for chunk in chunks:
transport._on_receive(chunk)
self.assertTrue(self.have_received_expected(), msg='Failed: In chunks')
if __name__ == '__main__':
unittest.main()

View File

@ -6,12 +6,12 @@ from mock import Mock
from common import gajim
from common import ged
from common.connection_handlers import ConnectionHandlersBase
from common.connection_handlers import ConnectionHandlers
class MockConnection(Mock, ConnectionHandlersBase):
class MockConnection(Mock, ConnectionHandlers):
def __init__(self, account, *args):
Mock.__init__(self, *args)
ConnectionHandlersBase.__init__(self)
ConnectionHandlers.__init__(self)
self.name = account
self.connected = 2
@ -19,6 +19,7 @@ class MockConnection(Mock, ConnectionHandlersBase):
self.blocked_contacts = {}
self.blocked_groups = {}
self.sessions = {}
self.nested_group_delimiter = '::'
gajim.interface.instances[account] = {'infos': {}, 'disco': {},
'gc_config': {}, 'search': {}, 'sub_request': {}}
@ -41,6 +42,9 @@ class MockConnection(Mock, ConnectionHandlersBase):
gajim.connections[account] = self
def request_vcard(self, jid):
pass
class MockWindow(Mock):
def __init__(self, *args):
Mock.__init__(self, *args)
@ -105,14 +109,14 @@ class MockInterface(Mock):
self.status_sent_to_users = Mock()
if gajim.use_x:
self.jabber_state_images = {'16': {}, '32': {}, 'opened': {},
'closed': {}}
self.jabber_state_images = {'16': {}, '24': {}, '32': {},
'opened': {}, 'closed': {}}
import gtkgui_helpers
gtkgui_helpers.make_jabber_state_images()
else:
self.jabber_state_images = {'16': Mock(), '32': Mock(),
'opened': Mock(), 'closed': Mock()}
self.jabber_state_images = {'16': Mock(), '24': Mock(),
'32': Mock(), 'opened': Mock(), 'closed': Mock()}
class MockLogger(Mock):

View File

@ -1,7 +1,26 @@
# mock notify module
from common import gajim
from common import ged
notifications = []
class Notification:
def _nec_notification(self, obj):
global notifications
notifications.append(obj)
def clean(self):
global notifications
notifications = []
gajim.ged.remove_event_handler('notification', ged.GUI2,
self._nec_notification)
def __init__(self):
gajim.ged.register_event_handler('notification', ged.GUI2,
self._nec_notification)
def notify(event, jid, account, parameters, advanced_notif_num = None):
notifications.append((event, jid, account, parameters, advanced_notif_num))

View File

@ -6,7 +6,7 @@ import threading, time
from mock import Mock
from common.xmpp import idlequeue
from nbxmpp import idlequeue
IDLEQUEUE_INTERVAL = 0.2 # polling interval. 200ms is used in Gajim as default
IDLEMOCK_TIMEOUT = 30 # how long we wait for an event
@ -55,7 +55,6 @@ class IdleMock:
def set_event(self):
self._event.set()
class MockConnection(IdleMock, Mock):
'''
Class simulating Connection class from src/common/connection.py

View File

@ -35,23 +35,18 @@ for o, a in opts:
sys.exit(2)
# new test modules need to be added manually
modules = ( 'unit.test_xmpp_dispatcher_nb',
'unit.test_xmpp_transports_nb',
'unit.test_protocol_caps',
modules = ( 'unit.test_protocol_caps',
'unit.test_caps_cache',
'unit.test_contacts',
'unit.test_sessions',
'unit.test_account',
'unit.test_gui_interface',
)
#modules = ()
if use_x:
modules += ('integration.test_gui_event_integration',
'integration.test_roster',
'integration.test_resolver',
'integration.test_xmpp_client_nb',
'integration.test_xmpp_transports_nb'
modules += ( 'unit.test_sessions',
#'integration.test_gui_event_integration',
'integration.test_roster',
'integration.test_resolver',
)
nb_errors = 0

View File

@ -6,7 +6,7 @@ import unittest
import lib
lib.setup_env()
from common.xmpp import NS_MUC, NS_PING, NS_XHTML_IM
from nbxmpp import NS_MUC, NS_PING, NS_XHTML_IM
from common import caps_cache as caps
from common.contacts import Contact

View File

@ -7,7 +7,7 @@ import lib
lib.setup_env()
from common.contacts import CommonContact, Contact, GC_Contact, LegacyContactsAPI
from common.xmpp import NS_MUC
from nbxmpp import NS_MUC
from common import caps_cache

View File

@ -6,18 +6,23 @@ import unittest
import lib
lib.setup_env()
from common import gajim
from common import nec
from common import ged
from common import caps_cache
from common.connection_handlers import ConnectionHandlers
from common.protocol import caps
from common.contacts import Contact
from common.connection_handlers_events import CapsPresenceReceivedEvent
from mock import Mock
from common.xmpp import simplexml
from common.xmpp import protocol
import nbxmpp
class TestableConnectionCaps(caps.ConnectionCaps):
class TestableConnectionCaps(ConnectionHandlers, caps.ConnectionCaps):
def __init__(self, *args, **kwargs):
self.name = 'account'
self._mocked_contacts = {}
caps.ConnectionCaps.__init__(self, *args, **kwargs)
@ -39,35 +44,30 @@ class TestableConnectionCaps(caps.ConnectionCaps):
class TestConnectionCaps(unittest.TestCase):
def setUp(self):
gajim.nec = nec.NetworkEventsController()
gajim.ged.register_event_handler('caps-presence-received', ged.GUI2,
self._nec_caps_presence_received)
def _nec_caps_presence_received(self, obj):
self.assertFalse(isinstance(obj.client_caps, caps_cache.NullClientCaps),
msg="On receive of proper caps, we must not use the fallback")
def test_capsPresenceCB(self):
jid = "user@server.com/a"
connection_caps = TestableConnectionCaps("account",
self._build_assertering_dispatcher_function("CAPS_RECEIVED", jid),
Mock(), caps_cache.create_suitable_client_caps)
fjid = "user@server.com/a"
connection_caps = TestableConnectionCaps("account", Mock(),
caps_cache.create_suitable_client_caps)
contact = connection_caps._get_contact_or_gc_contact_for_jid(fjid)
xml = """<presence from='user@server.com/a' to='%s' id='123'>
<c node='http://gajim.org' ver='pRCD6cgQ4SDqNMCjdhRV6TECx5o='
hash='sha-1' xmlns='http://jabber.org/protocol/caps'/>
</presence>
""" % (jid)
iq = protocol.Iq(node=simplexml.XML2Node(xml))
connection_caps._capsPresenceCB(None, iq)
self.assertTrue(self._dispatcher_called, msg="Must have received caps")
client_caps = connection_caps.get_mocked_contact_for_jid(jid).client_caps
self.assertTrue(client_caps, msg="Client caps must be set")
self.assertFalse(isinstance(client_caps, caps_cache.NullClientCaps),
msg="On receive of proper caps, we must not use the fallback")
def _build_assertering_dispatcher_function(self, expected_event, jid):
self._dispatcher_called = False
def dispatch(event, data):
self.assertFalse(self._dispatcher_called, msg="Must only be called once")
self._dispatcher_called = True
self.assertEqual(expected_event, event)
self.assertEqual(jid, data[0])
return dispatch
<c node='http://gajim.org' ver='pRCD6cgQ4SDqNMCjdhRV6TECx5o='
hash='sha-1' xmlns='http://jabber.org/protocol/caps'/>
</presence>
""" % (fjid)
msg = nbxmpp.protocol.Presence(node=nbxmpp.simplexml.XML2Node(xml))
connection_caps._presenceCB(None, msg)
if __name__ == '__main__':
unittest.main()

View File

@ -9,10 +9,16 @@ lib.setup_env()
import notify
from common import gajim
from common import xmpp
from common import nec
from common import ged
from common.nec import NetworkEvent
from common.connection_handlers_events import MessageReceivedEvent
from common.connection_handlers_events import DecryptedMessageReceivedEvent
import nbxmpp
from common.stanza_session import StanzaSession
from session import ChatControlSession
from roster_window import RosterWindow
from mock import Mock, expectParams
from gajim_mocks import *
@ -27,7 +33,7 @@ class TestStanzaSession(unittest.TestCase):
''' Testclass for common/stanzasession.py '''
def setUp(self):
self.jid = 'test@example.org/Gajim'
self.jid = nbxmpp.JID('test@example.org/Gajim')
self.conn = MockConnection(account_name, {'send_stanza': None})
self.sess = StanzaSession(self.conn, self.jid, None, 'chat')
@ -80,64 +86,71 @@ class TestStanzaSession(unittest.TestCase):
class TestChatControlSession(unittest.TestCase):
''' Testclass for session.py '''
def setUp(self):
self.jid = 'test@example.org/Gajim'
self.conn = MockConnection(account_name, {'send_stanza': None})
self.sess = ChatControlSession(self.conn, self.jid, None)
@classmethod
def setUpClass(cls):
gajim.nec = nec.NetworkEventsController()
cls.conn = MockConnection(account_name, {'send_stanza': None})
gajim.logger = MockLogger()
gajim.default_session_type = ChatControlSession
# initially there are no events
self.assertEqual(0, len(gajim.events.get_events(account_name)))
def setUp(self):
gajim.notification = notify.Notification()
# no notifications have been sent
self.assertEqual(0, len(notify.notifications))
def tearDown(self):
# remove all events and notifications that were added
gajim.events._events = {}
notify.notifications = []
gajim.notification.clean()
def receive_chat_msg(self, jid, msgtxt):
'''simulate receiving a chat message from jid'''
msg = xmpp.Message()
msg = nbxmpp.Message()
msg.setBody(msgtxt)
msg.setType('chat')
tim = time.localtime()
encrypted = False
self.sess.received(jid, msgtxt, tim, encrypted, msg)
xml = """<message from='%s' id='1' type='chat'><body>%s</body>
<thread>123</thread></message>""" % (jid, msgtxt)
stanza = nbxmpp.protocol.Message(node=nbxmpp.simplexml.XML2Node(xml))
self.conn._messageCB(None, stanza)
# ----- custom assertions -----
def assert_new_message_notification(self):
'''a new_message notification has been sent'''
self.assertEqual(1, len(notify.notifications))
notif = notify.notifications[0]
self.assertEqual('new_message', notif[0])
notif = notify.notifications[-1]
self.assertEqual('New Message', notif.popup_event_type)
def assert_first_message_notification(self):
'''this message was treated as a first message'''
self.assert_new_message_notification()
notif = notify.notifications[0]
params = notif[3]
first = params[1]
self.assert_(first, 'message should have been treated as a first message')
notif = notify.notifications[-1]
first = notif.first_unread
self.assert_(first,
'message should have been treated as a first message')
def assert_not_first_message_notification(self):
'''this message was not treated as a first message'''
self.assert_new_message_notification()
notif = notify.notifications[0]
params = notif[3]
first = params[1]
notif = notify.notifications[-1]
first = notif.first_unread
self.assert_(not first,
'message was unexpectedly treated as a first message')
'message was unexpectedly treated as a first message')
# ----- tests -----
def test_receive_nocontrol(self):
def test_receive_1nocontrol(self):
'''test receiving a message in a blank state'''
jid = 'bct@necronomicorp.com/Gajim'
msgtxt = 'testing one two three'
jid = 'bct@necronomicorp.com'
fjid = 'bct@necronomicorp.com/Gajim'
msgtxt = 'testing one'
self.receive_chat_msg(jid, msgtxt)
self.receive_chat_msg(fjid, msgtxt)
# session is created
self.assert_((jid in self.conn.sessions) and (
'123' in self.conn.sessions[jid]), 'session is not created')
sess = self.conn.sessions[jid]['123']
# message was logged
calls = gajim.logger.mockGetNamedCalls('write')
@ -150,63 +163,65 @@ class TestChatControlSession(unittest.TestCase):
self.assert_first_message_notification()
# no control is attached to the session
self.assertEqual(None, self.sess.control)
self.assertEqual(None, sess.control)
def test_receive_already_has_control(self):
def test_receive_2already_has_control(self):
'''test receiving a message with a session already attached to a
control'''
jid = 'bct@necronomicorp.com/Gajim'
msgtxt = 'testing one two three'
self.sess.control = MockChatControl(jid, account_name)
self.receive_chat_msg(jid, msgtxt)
# message was logged
calls = gajim.logger.mockGetNamedCalls('write')
self.assertEqual(1, len(calls))
# the message does not go into the event queue
self.assertEqual(0, len(gajim.events.get_events(account_name)))
self.assert_not_first_message_notification()
# message was printed to the control
calls = self.sess.control.mockGetNamedCalls('print_conversation')
self.assertEqual(1, len(calls))
def test_received_orphaned_control(self):
'''test receiving a message when a control that doesn't have a session
attached exists'''
jid = 'bct@necronomicorp.com'
fjid = jid + '/Gajim'
msgtxt = 'testing one two three'
fjid = 'bct@necronomicorp.com/Gajim'
msgtxt = 'testing two'
roster = RosterWindow()
ctrl = MockChatControl(jid, account_name)
gajim.interface.msg_win_mgr = Mock({'get_control': ctrl})
gajim.interface.msg_win_mgr.mockSetExpectation('get_control',
expectParams(jid, account_name))
sess = self.conn.sessions[jid]['123']
sess.control = MockChatControl(fjid, account_name)
self.receive_chat_msg(fjid, msgtxt)
# message was logged
calls = gajim.logger.mockGetNamedCalls('write')
self.assertEqual(1, len(calls))
self.assertEqual(2, len(calls))
# the message does not go into the event queue
self.assertEqual(0, len(gajim.events.get_events(account_name)))
self.assertEqual(1, len(gajim.events.get_events(account_name)))
self.assert_not_first_message_notification()
# this session is now attached to that control
self.assertEqual(self.sess, ctrl.session)
self.assertEqual(ctrl, self.sess.control, 'foo')
# message was printed to the control
calls = ctrl.mockGetNamedCalls('print_conversation')
calls = sess.control.mockGetNamedCalls('print_conversation')
self.assertEqual(1, len(calls))
#def test_received_3orphaned_control(self):
#'''test receiving a message when a control that doesn't have a session
#attached exists'''
#jid = 'bct@necronomicorp.com'
#fjid = jid + '/Gajim'
#msgtxt = 'testing three'
#ctrl = MockChatControl(jid, account_name)
#gajim.interface.msg_win_mgr = Mock({'get_control': ctrl})
#gajim.interface.msg_win_mgr.mockSetExpectation('get_control',
#expectParams(jid, account_name))
#self.receive_chat_msg(fjid, msgtxt)
## message was logged
#calls = gajim.logger.mockGetNamedCalls('write')
#self.assertEqual(1, len(calls))
## the message does not go into the event queue
#self.assertEqual(0, len(gajim.events.get_events(account_name)))
#self.assert_not_first_message_notification()
## this session is now attached to that control
#self.assertEqual(self.sess, ctrl.session)
#self.assertEqual(ctrl, self.sess.control, 'foo')
## message was printed to the control
#calls = ctrl.mockGetNamedCalls('print_conversation')
#self.assertEqual(1, len(calls))
if __name__ == '__main__':
unittest.main()

View File

@ -1,97 +0,0 @@
'''
Tests for dispatcher_nb.py
'''
import unittest
import lib
lib.setup_env()
from mock import Mock
from common.xmpp import dispatcher_nb
from common.xmpp import protocol
class TestDispatcherNB(unittest.TestCase):
'''
Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
into a mock client
'''
def setUp(self):
self.dispatcher = dispatcher_nb.XMPPDispatcher()
# Setup mock client
self.client = Mock()
self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one?
self.client._caller = Mock()
self.client.defaultNamespace = protocol.NS_CLIENT
self.client.Connection = Mock() # mock transport
self.con = self.client.Connection
def tearDown(self):
# Unplug if needed
if hasattr(self.dispatcher, '_owner'):
self.dispatcher.PlugOut()
def _simulate_connect(self):
self.dispatcher.PlugIn(self.client) # client is owner
# Simulate that we have established a connection
self.dispatcher.StreamInit()
self.dispatcher.ProcessNonBlocking("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:client'>")
def test_unbound_namespace_prefix(self):
'''tests our handling of a message with an unbound namespace prefix'''
self._simulate_connect()
msgs = []
def _got_message(conn, msg):
msgs.append(msg)
self.dispatcher.RegisterHandler('message', _got_message)
# should be able to parse a normal message
self.dispatcher.ProcessNonBlocking('<message><body>hello</body></message>')
self.assertEqual(1, len(msgs))
self.dispatcher.ProcessNonBlocking('<message><x:y/></message>')
self.assertEqual(2, len(msgs))
# we should not have been disconnected after that message
self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')))
self.assertEqual(0, len(self.con.mockGetNamedCalls('disconnect')))
# we should be able to keep parsing
self.dispatcher.ProcessNonBlocking('<message><body>still here?</body></message>')
self.assertEqual(3, len(msgs))
def test_process_non_blocking(self):
''' Check for ProcessNonBlocking return types '''
self._simulate_connect()
process = self.dispatcher.ProcessNonBlocking
# length of data expected
data = "Please don't fail"
result = process(data)
self.assertEqual(result, len(data))
# no data processed, link shall still be active
result = process('')
self.assertEqual(result, '0')
self.assertEqual(0, len(self.con.mockGetNamedCalls('pollend')) +
len(self.con.mockGetNamedCalls('disconnect')))
# simulate disconnect
result = process('</stream:stream>')
self.assertEqual(1, len(self.client.mockGetNamedCalls('disconnect')))
def test_return_stanza_handler(self):
''' Test sasl_error_conditions transformation in protocol.py '''
# quick'n dirty...I wasn't aware of it existance and thought it would
# always fail :-)
self._simulate_connect()
stanza = "<iq type='get' />"
def send(data):
self.assertEqual(str(data), '<iq xmlns="jabber:client" type="error"><error code="501" type="cancel"><feature-not-implemented xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" /><text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">The feature requested is not implemented by the recipient or server and therefore cannot be processed.</text></error></iq>')
self.client.send = send
self.dispatcher.ProcessNonBlocking(stanza)
if __name__ == '__main__':
unittest.main()

View File

@ -1,133 +0,0 @@
'''
Tests for smacks.py Stream Management
'''
import unittest
import lib
lib.setup_env()
from mock import Mock
from common.xmpp import dispatcher_nb
from common.xmpp import protocol
from common.xmpp import smacks
class TestDispatcherNB(unittest.TestCase):
'''
Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
into a mock client
'''
def setUp(self):
self.dispatcher = dispatcher_nb.XMPPDispatcher()
# Setup mock client
self.client = Mock()
self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one?
self.client._caller = Mock()
self.client.defaultNamespace = protocol.NS_CLIENT
self.client.Connection = Mock() # mock transport
self.con = self.client.Connection
self.con.sm = smacks.Smacks(self.con)
def tearDown(self):
# Unplug if needed
if hasattr(self.dispatcher, '_owner'):
self.dispatcher.PlugOut()
def _simulate_connect(self):
self.dispatcher.PlugIn(self.client) # client is owner
self.con.sm.set_owner(self.client)
self.dispatcher.sm = self.con.sm
# Simulate that we have established a connection
self.dispatcher.StreamInit()
self.dispatcher.ProcessNonBlocking("<stream:stream "
"xmlns:stream='http://etherx.jabber.org/streams' "
"xmlns='jabber:client'>")
self.dispatcher.ProcessNonBlocking("<stream:features> "
"<sm xmlns='urn:xmpp:sm:2'> <optional/> </sm> </stream:features>")
self.con.sm.negociate()
self.dispatcher.ProcessNonBlocking("<enabled xmlns='urn:xmpp:sm:2' "
"id='some-long-sm-id' resume='true'/>")
assert(self.con.sm.enabled)
def _simulate_resume(self):
self.con.sm.resume_request()
# Resuming acknowledging 5 stanzas
self.dispatcher.ProcessNonBlocking("<resumed xmlns='urn:xmpp:sm:2' "
"id='some-long-sm-id' h='5'/>")
assert(self.con.sm.resuming)
def _send(self, send, r, stanza):
for i in range(r):
send(stanza)
def test_messages(self):
message = '<message><body>Helloo </body></message>'
iq = '''<iq from='proxy.jabber.ru' to='j.xxxxxxxx.org/Gajim' type='error' id='18'>
<query xmlns='http://jabber.org/protocol/bytestreams'/>
<error code='403' type='auth'>
<forbidden xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>
</error>
</iq>'''
presence = '''<presence from='xxxxxxxxx.com/Talk.v1044194B1E2' to='j.xxxxxxxx.org'>
<priority>24</priority>
<c node="http://www.google.com/xmpp/client/caps" ver="1.0.0.104" ext="share-v1 voice-v1" xmlns="http://jabber.org/protocol/caps"/>
<x stamp="20110614T23:17:51" xmlns="jabber:x:delay"/>
<status>In love Kakashi Sensei :P</status>
<x xmlns="vcard-temp:x:update">
<photo>db4b7c52e39ba28562c74542d5988d47f09108a3</photo>
</x>
</presence> '''
self._simulate_connect()
uqueue = self.con.sm.uqueue
self.assertEqual(self.con.sm.out_h, 0)
self.assertEqual(self.con.sm.in_h, 0)
# The server sends 10 stanzas
self._send(self.dispatcher.ProcessNonBlocking, 5, message)
self._send(self.dispatcher.ProcessNonBlocking, 4, iq)
self._send(self.dispatcher.ProcessNonBlocking, 1, presence)
# The client has recieved 10 stanzas and sent none
self.assertEqual(self.con.sm.in_h, 10)
self.assertEqual(self.con.sm.out_h, 0)
m = protocol.Message()
# The client sends 10 stanzas
for i in range(10):
m = protocol.Message(body=str(i))
self.dispatcher.send(m)
# Client sends 10 stanzas and put them in the queue
self.assertEqual(self.con.sm.out_h, 10)
self.assertEqual(len(uqueue), 10)
# The server acknowledges that it recieved 5 stanzas
self.dispatcher.ProcessNonBlocking("<a xmlns='urn:xmpp:sm:2' h='5'/>")
# 5 stanzas are removed from the queue, only 5 stanzas are left
self.assertEqual(len(uqueue), 5)
# Check for the right order of stanzas in the queue
l = ['5', '6', '7', '8', '9']
for i in uqueue:
self.assertEqual(i.getBody(), l[0])
l.pop(0)
def test_resumption(self):
self._simulate_connect()
m = protocol.Message()
# The client sends 5 stanzas
for i in range(5):
m = protocol.Message(body=str(i))
self.dispatcher.send(m)
self._simulate_resume()
# No stanzas left
self.assertEqual(len(self.con.sm.uqueue), 0)
if __name__ == '__main__':
unittest.main()

View File

@ -1,78 +0,0 @@
'''
Unit test for tranports classes.
'''
import unittest
import lib
lib.setup_env()
from common.xmpp import transports_nb
class TestModuleLevelFunctions(unittest.TestCase):
'''
Test class for functions defined at module level
'''
def test_urisplit(self):
def check_uri(uri, proto, host, port, path):
_proto, _host, _port, _path = transports_nb.urisplit(uri)
self.assertEqual(proto, _proto)
self.assertEqual(host, _host)
self.assertEqual(path, _path)
self.assertEqual(port, _port)
check_uri('http://httpcm.jabber.org:5280/webclient', proto='http',
host='httpcm.jabber.org', port=5280, path='/webclient')
check_uri('http://httpcm.jabber.org/webclient', proto='http',
host='httpcm.jabber.org', port=80, path='/webclient')
check_uri('https://httpcm.jabber.org/webclient', proto='https',
host='httpcm.jabber.org', port=443, path='/webclient')
def test_get_proxy_data_from_dict(self):
def check_dict(proxy_dict, host, port, user, passwd):
_host, _port, _user, _passwd = transports_nb.get_proxy_data_from_dict(
proxy_dict)
self.assertEqual(_host, host)
self.assertEqual(_port, port)
self.assertEqual(_user, user)
self.assertEqual(_passwd, passwd)
bosh_dict = {'bosh_content': 'text/xml; charset=utf-8',
'bosh_hold': 2,
'bosh_http_pipelining': False,
'bosh_uri': 'http://gajim.org:5280/http-bind',
'bosh_useproxy': False,
'bosh_wait': 30,
'bosh_wait_for_restart_response': False,
'host': '172.16.99.11',
'pass': 'pass',
'port': 3128,
'type': 'bosh',
'useauth': True,
'user': 'user'}
check_dict(bosh_dict, host='gajim.org', port=5280, user='user',
passwd='pass')
proxy_dict = {'bosh_content': 'text/xml; charset=utf-8',
'bosh_hold': 2,
'bosh_http_pipelining': False,
'bosh_port': 5280,
'bosh_uri': '',
'bosh_useproxy': True,
'bosh_wait': 30,
'bosh_wait_for_restart_response': False,
'host': '172.16.99.11',
'pass': 'pass',
'port': 3128,
'type': 'socks5',
'useauth': True,
'user': 'user'}
check_dict(proxy_dict, host='172.16.99.11', port=3128, user='user',
passwd='pass')
if __name__ == '__main__':
unittest.main()