merge XEP-0198 implementation from gajim-XEP-198 branch to trunk. Thanks Jefry for your work during GSOC

This commit is contained in:
Yann Leboulanger 2011-08-22 09:45:51 +02:00
commit f0a0929d5c
8 changed files with 424 additions and 51 deletions

View File

@ -57,9 +57,9 @@ from common import gajim
from common import gpg
from common import passwords
from common import exceptions
from connection_handlers import *
from xmpp import Smacks
from string import Template
import logging
log = logging.getLogger('gajim.c.connection')
@ -717,6 +717,9 @@ class Connection(CommonConnection, ConnectionHandlers):
self.privacy_rules_requested = False
self.streamError = ''
self.secret_hmac = str(random.random())[2:]
self.sm = Smacks(self) # Stream Management
gajim.ged.register_event_handler('privacy-list-received', ged.CORE,
self._nec_privacy_list_received)
gajim.ged.register_event_handler('agent-info-error-received', ged.CORE,
@ -780,6 +783,8 @@ class Connection(CommonConnection, ConnectionHandlers):
self.connected = 0
self.time_to_reconnect = None
self.privacy_rules_supported = False
if on_purpose:
self.sm = Smacks(self)
if self.connection:
# make sure previous connection is completely closed
gajim.proxy65_manager.disconnect(self.connection)
@ -788,6 +793,15 @@ class Connection(CommonConnection, ConnectionHandlers):
self.connection.disconnect()
self.last_connection = None
self.connection = None
def set_oldst(self): # Set old state
if self.old_show:
self.connected = gajim.SHOW_LIST.index(self.old_show)
gajim.nec.push_incoming_event(OurShowEvent(None, conn=self,
show=self.connected))
else: # we default to online
self.connected = 2
gajim.nec.push_incoming_event(OurShowEvent(None, conn=self,
show=gajim.SHOW_LIST[self.connected]))
def _disconnectedReconnCB(self):
"""
@ -800,6 +814,7 @@ class Connection(CommonConnection, ConnectionHandlers):
self.old_show = gajim.SHOW_LIST[self.connected]
self.connected = 0
if not self.on_purpose:
if not (self.sm and self.sm.resumption):
gajim.nec.push_incoming_event(OurShowEvent(None, conn=self,
show='offline'))
self.disconnect()
@ -989,7 +1004,14 @@ class Connection(CommonConnection, ConnectionHandlers):
if self.connection:
return self.connection, ''
if data:
if self.sm.resuming and self.sm.location:
# If resuming and server gave a location, connect from there
hostname = self.sm.location
self.try_connecting_for_foo_secs = gajim.config.get_per('accounts',
self.name, 'try_connecting_for_foo_secs')
use_custom = False
elif data:
hostname = data['hostname']
self.try_connecting_for_foo_secs = 45
p = data['proxy']
@ -1607,12 +1629,16 @@ class Connection(CommonConnection, ConnectionHandlers):
self.connection.set_send_timeout2(self.pingalives, self.sendPing)
self.connection.onreceive(None)
self.request_message_archiving_preferences()
self.privacy_rules_requested = False
self.discoverInfo(gajim.config.get_per('accounts', self.name, 'hostname'),
id_prefix='Gajim_')
# If we are not resuming, we ask for discovery info
# and archiving preferences
if not self.sm.resuming:
self.request_message_archiving_preferences()
self.discoverInfo(gajim.config.get_per('accounts', self.name,
'hostname'), id_prefix='Gajim_')
self.sm.resuming = False # back to previous state
# Discover Stun server(s)
gajim.resolver.resolve('_stun._udp.' + helpers.idn_to_ascii(
self.connected_hostname), self._on_stun_resolved)

View File

@ -15,3 +15,4 @@ import simplexml, protocol, auth_nb, transports_nb, roster_nb
import dispatcher_nb, features_nb, idlequeue, bosh, tls_nb, proxy_connectors
from client_nb import NonBlockingClient
from plugin import PlugIn
from smacks import Smacks

View File

@ -22,8 +22,10 @@ See client_nb.py
"""
from protocol import NS_SASL, NS_SESSION, NS_STREAMS, NS_BIND, NS_AUTH
from protocol import NS_STREAM_MGMT
from protocol import Node, NodeProcessed, isResultNode, Iq, Protocol, JID
from plugin import PlugIn
from smacks import Smacks
import base64
import random
import itertools
@ -198,7 +200,8 @@ class SASL(PlugIn):
'mechanism'):
self.mecs.append(mec.getData())
self._owner.RegisterHandler('challenge', self.SASLHandler, xmlns=NS_SASL)
self._owner.RegisterHandler('challenge', self.SASLHandler,
xmlns=NS_SASL)
self._owner.RegisterHandler('failure', self.SASLHandler, xmlns=NS_SASL)
self._owner.RegisterHandler('success', self.SASLHandler, xmlns=NS_SASL)
self.MechanismHandler()
@ -206,7 +209,8 @@ class SASL(PlugIn):
def MechanismHandler(self):
if 'ANONYMOUS' in self.mecs and self.username is None:
self.mecs.remove('ANONYMOUS')
node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'ANONYMOUS'})
node = Node('auth', attrs={'xmlns': NS_SASL,
'mechanism': 'ANONYMOUS'})
self.mechanism = 'ANONYMOUS'
self.startsasl = SASL_IN_PROCESS
self._owner.send(str(node))
@ -229,8 +233,8 @@ class SASL(PlugIn):
self._owner.xmpp_hostname)[1]
kerberos.authGSSClientStep(self.gss_vc, '')
response = kerberos.authGSSClientResponse(self.gss_vc)
node=Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'GSSAPI'},
payload=(response or ''))
node=Node('auth', attrs={'xmlns': NS_SASL,
'mechanism': 'GSSAPI'}, payload=(response or ''))
self.mechanism = 'GSSAPI'
self.gss_step = GSS_STATE_STEP
self.startsasl = SASL_IN_PROCESS
@ -247,7 +251,8 @@ class SASL(PlugIn):
raise NodeProcessed
if 'DIGEST-MD5' in self.mecs:
self.mecs.remove('DIGEST-MD5')
node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'DIGEST-MD5'})
node = Node('auth', attrs={'xmlns': NS_SASL,
'mechanism': 'DIGEST-MD5'})
self.mechanism = 'DIGEST-MD5'
self.startsasl = SASL_IN_PROCESS
self._owner.send(str(node))
@ -307,9 +312,9 @@ class SASL(PlugIn):
handlers = self._owner.Dispatcher.dumpHandlers()
# Bosh specific dispatcher replugging
# save old features. They will be used in case we won't get response on
# stream restart after SASL auth (happens with XMPP over BOSH with
# Openfire)
# save old features. They will be used in case we won't get response
# on stream restart after SASL auth (happens with XMPP over BOSH
# with Openfire)
old_features = self._owner.Dispatcher.Stream.features
self._owner.Dispatcher.PlugOut()
dispatcher_nb.Dispatcher.get_instance().PlugIn(self._owner,
@ -416,8 +421,8 @@ class SASL(PlugIn):
else:
self.resp['realm'] = self._owner.Server
self.resp['nonce'] = chal['nonce']
self.resp['cnonce'] = ''.join("%x" % randint(0, 2**28) for randint in
itertools.repeat(random.randint, 7))
self.resp['cnonce'] = ''.join("%x" % randint(0, 2**28) for randint \
in itertools.repeat(random.randint, 7))
self.resp['nc'] = ('00000001')
self.resp['qop'] = 'auth'
self.resp['digest-uri'] = 'xmpp/' + self._owner.Server
@ -478,7 +483,8 @@ class SASL(PlugIn):
sasl_data += u'%s="%s",' % (key, self.resp[key])
sasl_data = sasl_data[:-1].encode('utf-8').encode('base64').replace(
'\r', '').replace('\n', '')
node = Node('response', attrs={'xmlns':NS_SASL}, payload=[sasl_data])
node = Node('response', attrs={'xmlns': NS_SASL},
payload=[sasl_data])
elif self.mechanism == 'PLAIN':
sasl_data = u'\x00%s\x00%s' % (self.username, self.password)
sasl_data = sasl_data.encode('utf-8').encode('base64').replace(
@ -548,7 +554,8 @@ class NonBlockingNonSASL(PlugIn):
def hash_n_times(s, count):
return count and hasher(hash_n_times(s, count-1)) or s
hash_ = hash_n_times(hasher(hasher(self.password) + token), int(seq))
hash_ = hash_n_times(hasher(hasher(self.password) + token),
int(seq))
query.setTagData('hash', hash_)
self._method='0k'
else:
@ -556,15 +563,16 @@ class NonBlockingNonSASL(PlugIn):
authentication")
query.setTagData('password', self.password)
self._method = 'plain'
resp = self.owner.Dispatcher.SendAndWaitForResponse(iq, func=self._on_auth)
resp = self.owner.Dispatcher.SendAndWaitForResponse(iq,
func=self._on_auth)
def _on_auth(self, resp):
if isResultNode(resp):
log.info('Sucessfully authenticated with remote host.')
self.owner.User = self.user
self.owner.Resource = self.resource
self.owner._registered_name = self.owner.User+'@'+self.owner.Server+\
'/'+self.owner.Resource
self.owner._registered_name = self.owner.User + '@' + \
self.owner.Server+ '/' + self.owner.Resource
return self.on_auth(self._method)
log.info('Authentication failed!')
return self.on_auth(None)
@ -579,6 +587,8 @@ class NonBlockingBind(PlugIn):
def __init__(self):
PlugIn.__init__(self)
self.bound = None
self.supports_sm = False
self.resuming = False
def plugin(self, owner):
''' Start resource binding, if allowed at this time. Used internally. '''
@ -595,8 +605,16 @@ class NonBlockingBind(PlugIn):
def FeaturesHandler(self, conn, feats):
"""
Determine if server supports resource binding and set some internal
attributes accordingly
attributes accordingly.
It also checks if server supports stream management
"""
if feats.getTag('sm', namespace=NS_STREAM_MGMT):
self.supports_sm = True # server supports stream management
if self.resuming:
self._owner._caller.sm.resume_request()
if not feats.getTag('bind', namespace=NS_BIND):
log.info('Server does not requested binding.')
# we try to bind resource anyway
@ -620,6 +638,8 @@ class NonBlockingBind(PlugIn):
"""
Perform binding. Use provided resource name or random (if not provided).
"""
if self.resuming: # We don't bind if we resume the stream
return
self.on_bound = on_bound
self._resource = resource
if self._resource:
@ -629,8 +649,9 @@ class NonBlockingBind(PlugIn):
self._owner.onreceive(None)
self._owner.Dispatcher.SendAndWaitForResponse(
Protocol('iq', typ='set', payload=[Node('bind', attrs={'xmlns':NS_BIND},
payload=self._resource)]), func=self._on_bound)
Protocol('iq', typ='set', payload=[Node('bind',
attrs={'xmlns': NS_BIND}, payload=self._resource)]),
func=self._on_bound)
def _on_bound(self, resp):
if isResultNode(resp):
@ -640,6 +661,14 @@ class NonBlockingBind(PlugIn):
jid = JID(resp.getTag('bind').getTagData('jid'))
self._owner.User = jid.getNode()
self._owner.Resource = jid.getResource()
# Only negociate stream management after bounded
sm = self._owner._caller.sm
if self.supports_sm:
# starts negociation
sm.set_owner(self._owner)
sm.negociate()
self._owner.Dispatcher.sm = sm
if hasattr(self, 'session') and self.session == -1:
# Server don't want us to initialize a session
log.info('No session required.')

View File

@ -521,7 +521,16 @@ class NonBlockingClient:
self.connected = None # FIXME: is this intended? We use ''elsewhere
self._on_sasl_auth(None)
elif self.SASL.startsasl == 'success':
auth_nb.NonBlockingBind.get_instance().PlugIn(self)
nb_bind = auth_nb.NonBlockingBind.get_instance()
sm = self._caller.sm
if sm._owner and sm.resumption:
nb_bind.resuming = True
sm.set_owner(self)
self.Dispatcher.sm = sm
nb_bind.PlugIn(self)
return
nb_bind.PlugIn(self)
self.onreceive(self._on_auth_bind)
return True

View File

@ -90,6 +90,9 @@ class XMPPDispatcher(PlugIn):
self.SendAndWaitForResponse, self.SendAndCallForResponse,
self.getAnID, self.Event, self.send]
# Let the dispatcher know if there is support for stream management
self.sm = None
def getAnID(self):
global outgoingID
outgoingID += 1
@ -417,6 +420,12 @@ class XMPPDispatcher(PlugIn):
stanza.props = stanza.getProperties()
ID = stanza.getID()
# If server supports stream management
if self.sm and self.sm.enabled and (stanza.getName() != 'r' and
stanza.getName() != 'a' and stanza.getName() != 'enabled' and
stanza.getName() != 'resumed'):
# increments the number of stanzas that has been handled
self.sm.in_h = self.sm.in_h + 1
list_ = ['default'] # we will use all handlers:
if typ in self.handlers[xmlns][name]:
list_.append(typ) # from very common...
@ -525,6 +534,14 @@ class XMPPDispatcher(PlugIn):
ID = stanza.getID()
if self._owner._registered_name and not stanza.getAttr('from'):
stanza.setAttr('from', self._owner._registered_name)
# If no ID then it is a whitespace
if self.sm and self.sm.enabled and ID:
self.sm.uqueue.append(stanza)
self.sm.out_h = self.sm.out_h + 1
if len(self.sm.uqueue) > self.sm.max_queue:
self.sm.request_ack()
self._owner.Connection.send(stanza, now)
return ID

View File

@ -149,6 +149,7 @@ NS_DATA_LAYOUT = 'http://jabber.org/protocol/xdata-layout' # XEP-0141
NS_DATA_VALIDATE = 'http://jabber.org/protocol/xdata-validate' # XEP-0122
NS_XMPP_STREAMS = 'urn:ietf:params:xml:ns:xmpp-streams'
NS_RECEIPTS = 'urn:xmpp:receipts'
NS_STREAM_MGMT = 'urn:xmpp:sm:2' # XEP-198
xmpp_stream_error_conditions = '''
bad-format -- -- -- The entity has sent XML that cannot be processed.
@ -984,6 +985,34 @@ class Iq(Protocol):
iq.setQueryNS(self.getQueryNS())
return iq
class Acks(Node):
"""
Acknowledgement elements for Stream Management
"""
def __init__(self, nsp=NS_STREAM_MGMT):
Node.__init__(self, None, {}, [], None, None,False, None)
self.setNamespace(nsp)
def buildAnswer(self, handled):
"""
handled is the number of stanzas handled
"""
self.setName('a')
self.setAttr('h', handled)
def buildRequest(self):
self.setName('r')
def buildEnable(self, resume=False):
self.setName('enable')
if resume:
self.setAttr('resume', 'true')
def buildResume(self, handled, previd):
self.setName('resume')
self.setAttr('h', handled)
self.setAttr('previd', previd)
class ErrorNode(Node):
"""
XMPP-style error element

129
src/common/xmpp/smacks.py Normal file
View File

@ -0,0 +1,129 @@
from protocol import Acks
from protocol import NS_STREAM_MGMT
import logging
log = logging.getLogger('gajim.c.x.smacks')
class Smacks():
'''
This is Smacks is the Stream Management class. It takes care of requesting
and sending acks. Also, it keeps track of the unhandled outgoing stanzas.
The dispatcher has to be able to access this class to increment the
number of handled stanzas
'''
def __init__(self, con):
self.con = con # Connection object
self.out_h = 0 # Outgoing stanzas handled
self.in_h = 0 # Incoming stanzas handled
self.uqueue = [] # Unhandled stanzas queue
self.session_id = None
self.resumption = False # If server supports resume
# Max number of stanzas in queue before making a request
self.max_queue = 5
self._owner = None
self.resuming = False
self.enabled = False # If SM is enabled
self.location = None
def set_owner(self, owner):
self._owner = owner
# Register handlers
owner.Dispatcher.RegisterNamespace(NS_STREAM_MGMT)
owner.Dispatcher.RegisterHandler('enabled', self._neg_response,
xmlns=NS_STREAM_MGMT)
owner.Dispatcher.RegisterHandler('r', self.send_ack,
xmlns=NS_STREAM_MGMT)
owner.Dispatcher.RegisterHandler('a', self.check_ack,
xmlns=NS_STREAM_MGMT)
owner.Dispatcher.RegisterHandler('resumed', self.check_ack,
xmlns=NS_STREAM_MGMT)
owner.Dispatcher.RegisterHandler('failed', self.error_handling,
xmlns=NS_STREAM_MGMT)
def _neg_response(self, disp, stanza):
r = stanza.getAttr('resume')
if r == 'true' or r == 'True' or r == '1':
self.resumption = True
self.session_id = stanza.getAttr('id')
if r == 'false' or r == 'False' or r == '0':
self.negociate(False)
l = stanza.getAttr('location')
if l:
self.location = l
def negociate(self, resume=True):
# Every time we attempt to negociate, we must erase all previous info
# about any previous session
self.uqueue = []
self.in_h = 0
self.out_h = 0
self.session_id = None
self.enabled = True
stanza = Acks()
stanza.buildEnable(resume)
self._owner.Connection.send(stanza, now=True)
def resume_request(self):
if not self.session_id:
self.resuming = False
log.error('Attempted to resume without a valid session id ')
return
resume = Acks()
resume.buildResume(self.in_h, self.session_id)
self._owner.Connection.send(resume, False)
def send_ack(self, disp, stanza):
ack = Acks()
ack.buildAnswer(self.in_h)
self._owner.Connection.send(ack, False)
def request_ack(self):
r = Acks()
r.buildRequest()
self._owner.Connection.send(r, False)
def check_ack(self, disp, stanza):
'''
Checks if the number of stanzas sent are the same as the
number of stanzas received by the server. Pops stanzas that were
handled by the server from the queue.
'''
h = int(stanza.getAttr('h'))
diff = self.out_h - h
if len(self.uqueue) < diff or diff < 0:
log.error('Server and client number of stanzas handled mismatch ')
else:
while (len(self.uqueue) > diff):
self.uqueue.pop(0)
if stanza.getName() == 'resumed':
self.resuming = True
self.con.set_oldst()
if self.uqueue != []:
for i in self.uqueue:
self._owner.Connection.send(i, False)
def error_handling(self, disp, stanza):
# If the server doesn't recognize previd, forget about resuming
# Ask for service discovery, etc..
if stanza.getTag('item-not-found'):
self.resuming = False
self.negociate()
self.con._discover_server_at_connection(self.con.connection)
return
# Doesn't support resumption
if stanza.getTag('feature-not-implemented'):
self.negociate(False)
return
if stanza.getTag('unexpected-request'):
self.enabled = False
log.error('Gajim failed to negociate Stream Management')
return

View File

@ -0,0 +1,133 @@
'''
Tests for smacks.py Stream Management
'''
import unittest
import lib
lib.setup_env()
from mock import Mock
from common.xmpp import dispatcher_nb
from common.xmpp import protocol
from common.xmpp import smacks
class TestDispatcherNB(unittest.TestCase):
'''
Test class for NonBlocking dispatcher. Tested dispatcher will be plugged
into a mock client
'''
def setUp(self):
self.dispatcher = dispatcher_nb.XMPPDispatcher()
# Setup mock client
self.client = Mock()
self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one?
self.client._caller = Mock()
self.client.defaultNamespace = protocol.NS_CLIENT
self.client.Connection = Mock() # mock transport
self.con = self.client.Connection
self.con.sm = smacks.Smacks(self.con)
def tearDown(self):
# Unplug if needed
if hasattr(self.dispatcher, '_owner'):
self.dispatcher.PlugOut()
def _simulate_connect(self):
self.dispatcher.PlugIn(self.client) # client is owner
self.con.sm.set_owner(self.client)
self.dispatcher.sm = self.con.sm
# Simulate that we have established a connection
self.dispatcher.StreamInit()
self.dispatcher.ProcessNonBlocking("<stream:stream "
"xmlns:stream='http://etherx.jabber.org/streams' "
"xmlns='jabber:client'>")
self.dispatcher.ProcessNonBlocking("<stream:features> "
"<sm xmlns='urn:xmpp:sm:2'> <optional/> </sm> </stream:features>")
self.con.sm.negociate()
self.dispatcher.ProcessNonBlocking("<enabled xmlns='urn:xmpp:sm:2' "
"id='some-long-sm-id' resume='true'/>")
assert(self.con.sm.enabled)
def _simulate_resume(self):
self.con.sm.resume_request()
# Resuming acknowledging 5 stanzas
self.dispatcher.ProcessNonBlocking("<resumed xmlns='urn:xmpp:sm:2' "
"id='some-long-sm-id' h='5'/>")
assert(self.con.sm.resuming)
def _send(self, send, r, stanza):
for i in range(r):
send(stanza)
def test_messages(self):
message = '<message><body>Helloo </body></message>'
iq = '''<iq from='proxy.jabber.ru' to='j.xxxxxxxx.org/Gajim' type='error' id='18'>
<query xmlns='http://jabber.org/protocol/bytestreams'/>
<error code='403' type='auth'>
<forbidden xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>
</error>
</iq>'''
presence = '''<presence from='xxxxxxxxx.com/Talk.v1044194B1E2' to='j.xxxxxxxx.org'>
<priority>24</priority>
<c node="http://www.google.com/xmpp/client/caps" ver="1.0.0.104" ext="share-v1 voice-v1" xmlns="http://jabber.org/protocol/caps"/>
<x stamp="20110614T23:17:51" xmlns="jabber:x:delay"/>
<status>In love Kakashi Sensei :P</status>
<x xmlns="vcard-temp:x:update">
<photo>db4b7c52e39ba28562c74542d5988d47f09108a3</photo>
</x>
</presence> '''
self._simulate_connect()
uqueue = self.con.sm.uqueue
self.assertEqual(self.con.sm.out_h, 0)
self.assertEqual(self.con.sm.in_h, 0)
# The server sends 10 stanzas
self._send(self.dispatcher.ProcessNonBlocking, 5, message)
self._send(self.dispatcher.ProcessNonBlocking, 4, iq)
self._send(self.dispatcher.ProcessNonBlocking, 1, presence)
# The client has recieved 10 stanzas and sent none
self.assertEqual(self.con.sm.in_h, 10)
self.assertEqual(self.con.sm.out_h, 0)
m = protocol.Message()
# The client sends 10 stanzas
for i in range(10):
m = protocol.Message(body=str(i))
self.dispatcher.send(m)
# Client sends 10 stanzas and put them in the queue
self.assertEqual(self.con.sm.out_h, 10)
self.assertEqual(len(uqueue), 10)
# The server acknowledges that it recieved 5 stanzas
self.dispatcher.ProcessNonBlocking("<a xmlns='urn:xmpp:sm:2' h='5'/>")
# 5 stanzas are removed from the queue, only 5 stanzas are left
self.assertEqual(len(uqueue), 5)
# Check for the right order of stanzas in the queue
l = ['5', '6', '7', '8', '9']
for i in uqueue:
self.assertEqual(i.getBody(), l[0])
l.pop(0)
def test_resumption(self):
self._simulate_connect()
m = protocol.Message()
# The client sends 5 stanzas
for i in range(5):
m = protocol.Message(body=str(i))
self.dispatcher.send(m)
self._simulate_resume()
# No stanzas left
self.assertEqual(len(self.con.sm.uqueue), 0)
if __name__ == '__main__':
unittest.main()