From 9cf9de2a141df9f4ca71cf0282cec79625d52747 Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Wed, 25 May 2011 21:24:01 -0400 Subject: [PATCH 02/16] Created Acks builder class --- src/common/xmpp/protocol.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/common/xmpp/protocol.py b/src/common/xmpp/protocol.py index 70027e647..e17ac9315 100644 --- a/src/common/xmpp/protocol.py +++ b/src/common/xmpp/protocol.py @@ -143,6 +143,7 @@ NS_DATA_LAYOUT = 'http://jabber.org/protocol/xdata-layout' # XEP-0141 NS_DATA_VALIDATE = 'http://jabber.org/protocol/xdata-validate' # XEP-0122 NS_XMPP_STREAMS = 'urn:ietf:params:xml:ns:xmpp-streams' NS_RECEIPTS = 'urn:xmpp:receipts' +NS_STREAM_MGMT = 'urn:xmpp:sm:3' # XEP-198 xmpp_stream_error_conditions = ''' bad-format -- -- -- The entity has sent XML that cannot be processed. @@ -978,6 +979,35 @@ class Iq(Protocol): iq.setQueryNS(self.getQueryNS()) return iq +class Acks(Node): + """ + Acknowledgement elements for Stream Management + """ + def __init__(self, nsp=NS_STREAM_MGMT): + + + Node.__init__(self, None, {}, [], None, None,False, None) + + self.setNamespace(nsp) + + def buildAnswer(self, handled): # handled is the number of stanzas handled + self.setName('a') + self.setAttr('h', handled) + + def buildRequest(self): + self.setName('r') + + def buildEnable(self, resume=False): + self.setName('enable') + if resume: + self.setAttr('resume', 'true') + + + def buildResume(self, handled, previd): + self.setName('resume') + self.setAttr('h', handled) + self.setAttr('previd', previd) + class ErrorNode(Node): """ XMPP-style error element From 5b1edd03b86330a5b410612278b535357624738c Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Mon, 30 May 2011 20:14:26 -0400 Subject: [PATCH 03/16] Smacks class added, responds to ack requests --- src/common/xmpp/auth_nb.py | 24 ++++++++++++++--- src/common/xmpp/dispatcher_nb.py | 14 +++++++--- src/common/xmpp/protocol.py | 2 +- src/common/xmpp/smacks.py | 44 ++++++++++++++++++++++++++++++++ 4 files changed, 76 insertions(+), 8 deletions(-) create mode 100644 src/common/xmpp/smacks.py diff --git a/src/common/xmpp/auth_nb.py b/src/common/xmpp/auth_nb.py index 670526bf9..a379c6bcd 100644 --- a/src/common/xmpp/auth_nb.py +++ b/src/common/xmpp/auth_nb.py @@ -21,7 +21,7 @@ Can be used both for client and transport authentication See client_nb.py """ -from protocol import NS_SASL, NS_SESSION, NS_STREAMS, NS_BIND, NS_AUTH +from protocol import NS_SASL, NS_SESSION, NS_STREAMS, NS_BIND, NS_AUTH, NS_STREAM_MGMT from protocol import Node, NodeProcessed, isResultNode, Iq, Protocol, JID from plugin import PlugIn import base64 @@ -31,7 +31,7 @@ import dispatcher_nb import hashlib import hmac import hashlib - +from smacks import Smacks import logging log = logging.getLogger('gajim.c.x.auth_nb') @@ -491,7 +491,8 @@ class NonBlockingNonSASL(PlugIn): self.password = password self.resource = resource self.on_auth = on_auth - + + def plugin(self, owner): """ Determine the best auth method (digest/0k/plain) and use it for auth. @@ -564,6 +565,7 @@ class NonBlockingBind(PlugIn): def __init__(self): PlugIn.__init__(self) self.bound = None + self.supports_sm = False def plugin(self, owner): ''' Start resource binding, if allowed at this time. Used internally. ''' @@ -580,8 +582,14 @@ class NonBlockingBind(PlugIn): def FeaturesHandler(self, conn, feats): """ Determine if server supports resource binding and set some internal - attributes accordingly + attributes accordingly. + + It also checks if server supports stream management """ + + if feats.getTag('sm', namespace=NS_STREAM_MGMT): + self.supports_sm = True # server supports stream management + if not feats.getTag('bind', namespace=NS_BIND): log.info('Server does not requested binding.') # we try to bind resource anyway @@ -625,6 +633,14 @@ class NonBlockingBind(PlugIn): jid = JID(resp.getTag('bind').getTagData('jid')) self._owner.User = jid.getNode() self._owner.Resource = jid.getResource() + # Only negociate stream management after bounded + if self.supports_sm: + # starts negociation + sm = Smacks(self._owner) + self._owner.Dispatcher.supports_sm = True + self._owner.Dispatcher.sm = sm + sm.negociate() + if hasattr(self, 'session') and self.session == -1: # Server don't want us to initialize a session log.info('No session required.') diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index cca56f33a..31fa86e3a 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -75,7 +75,7 @@ class XMPPDispatcher(PlugIn): stream headers (used by SASL f.e.). """ - def __init__(self): + def __init__(self): PlugIn.__init__(self) self.handlers = {} self._expected = {} @@ -89,6 +89,9 @@ class XMPPDispatcher(PlugIn): self.UnregisterHandler, self.RegisterProtocol, self.SendAndWaitForResponse, self.SendAndCallForResponse, self.getAnID, self.Event, self.send] + + # Let the dispatcher know if there is support for stream management + self.supports_sm = False def getAnID(self): global outgoingID @@ -110,6 +113,7 @@ class XMPPDispatcher(PlugIn): """ self.handlers = handlers + def _init(self): """ Register default namespaces/protocols/handlers. Used internally @@ -125,7 +129,7 @@ class XMPPDispatcher(PlugIn): self.RegisterDefaultHandler(self.returnStanzaHandler) self.RegisterEventHandler(self._owner._caller._event_dispatcher) self.on_responses = {} - + def plugin(self, owner): """ Plug the Dispatcher instance into Client class instance and send initial @@ -416,7 +420,11 @@ class XMPPDispatcher(PlugIn): typ = '' stanza.props = stanza.getProperties() ID = stanza.getID() - + if self.supports_sm and (stanza.getName() != 'r' and + stanza.getName() != 'a' and + stanza.getName() != 'enabled') : + # increments the number of stanzas that has been handled + self.sm.in_h = self.sm.in_h + 1 list_ = ['default'] # we will use all handlers: if typ in self.handlers[xmlns][name]: list_.append(typ) # from very common... diff --git a/src/common/xmpp/protocol.py b/src/common/xmpp/protocol.py index e17ac9315..c6cecbd01 100644 --- a/src/common/xmpp/protocol.py +++ b/src/common/xmpp/protocol.py @@ -143,7 +143,7 @@ NS_DATA_LAYOUT = 'http://jabber.org/protocol/xdata-layout' # XEP-0141 NS_DATA_VALIDATE = 'http://jabber.org/protocol/xdata-validate' # XEP-0122 NS_XMPP_STREAMS = 'urn:ietf:params:xml:ns:xmpp-streams' NS_RECEIPTS = 'urn:xmpp:receipts' -NS_STREAM_MGMT = 'urn:xmpp:sm:3' # XEP-198 +NS_STREAM_MGMT = 'urn:xmpp:sm:2' # XEP-198 xmpp_stream_error_conditions = ''' bad-format -- -- -- The entity has sent XML that cannot be processed. diff --git a/src/common/xmpp/smacks.py b/src/common/xmpp/smacks.py new file mode 100644 index 000000000..7c99ab5b8 --- /dev/null +++ b/src/common/xmpp/smacks.py @@ -0,0 +1,44 @@ +from protocol import Acks +from protocol import NS_STREAM_MGMT + +class Smacks(): + ''' + This is Smacks is the Stream Management class. It takes care of requesting + and sending acks. Also, it keeps track of the unhandled outgoing stanzas. + + The dispatcher has to be able to access this class to increment the + number of handled stanzas + ''' + + + def __init__(self, owner): + self._owner = owner + self.out_h = 0 # Outgoing stanzas handled + self.in_h = 0 # Incoming stanzas handled + self.uqueue = [] # Unhandled stanzas queue + + #Register handlers + owner.Dispatcher.RegisterNamespace(NS_STREAM_MGMT) + owner.Dispatcher.RegisterHandler('enabled', self._neg_response + ,xmlns=NS_STREAM_MGMT) + owner.Dispatcher.RegisterHandler('r', self.send_ack + ,xmlns=NS_STREAM_MGMT) + + + def negociate(self): + stanza = Acks() + stanza.buildEnable() + self._owner.Connection.send(stanza, True) + + def _neg_response(self, disp, stanza): + pass + + def send_ack(self, disp, stanza): + ack = Acks() + ack.buildAnswer(self.in_h) + self._owner.Connection.send(ack, False) + + def request_ack(self): + r = Acks() + r.buildRequest() + self._owner.Connection.send(r, False) \ No newline at end of file From afca629bb71bb96a89f0728f24e54d756f9c6bbf Mon Sep 17 00:00:00 2001 From: Yann Leboulanger Date: Tue, 31 May 2011 09:15:41 +0200 Subject: [PATCH 04/16] coding standards --- src/common/xmpp/auth_nb.py | 111 ++++++++++++++++++++----------------- 1 file changed, 60 insertions(+), 51 deletions(-) diff --git a/src/common/xmpp/auth_nb.py b/src/common/xmpp/auth_nb.py index a379c6bcd..90e8a421d 100644 --- a/src/common/xmpp/auth_nb.py +++ b/src/common/xmpp/auth_nb.py @@ -21,9 +21,11 @@ Can be used both for client and transport authentication See client_nb.py """ -from protocol import NS_SASL, NS_SESSION, NS_STREAMS, NS_BIND, NS_AUTH, NS_STREAM_MGMT +from protocol import NS_SASL, NS_SESSION, NS_STREAMS, NS_BIND, NS_AUTH +from protocol import NS_STREAM_MGMT from protocol import Node, NodeProcessed, isResultNode, Iq, Protocol, JID from plugin import PlugIn +from smacks import Smacks import base64 import random import itertools @@ -31,7 +33,7 @@ import dispatcher_nb import hashlib import hmac import hashlib -from smacks import Smacks + import logging log = logging.getLogger('gajim.c.x.auth_nb') @@ -142,7 +144,7 @@ class SASL(PlugIn): elif self._owner.Dispatcher.Stream.features: try: self.FeaturesHandler(self._owner.Dispatcher, - self._owner.Dispatcher.Stream.features) + self._owner.Dispatcher.Stream.features) except NodeProcessed: pass else: @@ -154,16 +156,16 @@ class SASL(PlugIn): """ if 'features' in self._owner.__dict__: self._owner.UnregisterHandler('features', self.FeaturesHandler, - xmlns=NS_STREAMS) + xmlns=NS_STREAMS) if 'challenge' in self._owner.__dict__: self._owner.UnregisterHandler('challenge', self.SASLHandler, - xmlns=NS_SASL) + xmlns=NS_SASL) if 'failure' in self._owner.__dict__: self._owner.UnregisterHandler('failure', self.SASLHandler, - xmlns=NS_SASL) + xmlns=NS_SASL) if 'success' in self._owner.__dict__: self._owner.UnregisterHandler('success', self.SASLHandler, - xmlns=NS_SASL) + xmlns=NS_SASL) def auth(self): """ @@ -178,12 +180,12 @@ class SASL(PlugIn): elif self._owner.Dispatcher.Stream.features: try: self.FeaturesHandler(self._owner.Dispatcher, - self._owner.Dispatcher.Stream.features) + self._owner.Dispatcher.Stream.features) except NodeProcessed: pass else: self._owner.RegisterHandler('features', - self.FeaturesHandler, xmlns=NS_STREAMS) + self.FeaturesHandler, xmlns=NS_STREAMS) def FeaturesHandler(self, conn, feats): """ @@ -198,7 +200,8 @@ class SASL(PlugIn): 'mechanism'): self.mecs.append(mec.getData()) - self._owner.RegisterHandler('challenge', self.SASLHandler, xmlns=NS_SASL) + self._owner.RegisterHandler('challenge', self.SASLHandler, + xmlns=NS_SASL) self._owner.RegisterHandler('failure', self.SASLHandler, xmlns=NS_SASL) self._owner.RegisterHandler('success', self.SASLHandler, xmlns=NS_SASL) self.MechanismHandler() @@ -206,7 +209,8 @@ class SASL(PlugIn): def MechanismHandler(self): if 'ANONYMOUS' in self.mecs and self.username is None: self.mecs.remove('ANONYMOUS') - node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'ANONYMOUS'}) + node = Node('auth', attrs={'xmlns': NS_SASL, + 'mechanism': 'ANONYMOUS'}) self.mechanism = 'ANONYMOUS' self.startsasl = SASL_IN_PROCESS self._owner.send(str(node)) @@ -226,11 +230,11 @@ class SASL(PlugIn): self.mecs.remove('GSSAPI') try: self.gss_vc = kerberos.authGSSClientInit('xmpp@' + \ - self._owner.xmpp_hostname)[1] + self._owner.xmpp_hostname)[1] kerberos.authGSSClientStep(self.gss_vc, '') response = kerberos.authGSSClientResponse(self.gss_vc) - node=Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'GSSAPI'}, - payload=(response or '')) + node=Node('auth', attrs={'xmlns': NS_SASL, + 'mechanism': 'GSSAPI'}, payload=(response or '')) self.mechanism = 'GSSAPI' self.gss_step = GSS_STATE_STEP self.startsasl = SASL_IN_PROCESS @@ -247,7 +251,8 @@ class SASL(PlugIn): raise NodeProcessed if 'DIGEST-MD5' in self.mecs: self.mecs.remove('DIGEST-MD5') - node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'DIGEST-MD5'}) + node = Node('auth', attrs={'xmlns': NS_SASL, + 'mechanism': 'DIGEST-MD5'}) self.mechanism = 'DIGEST-MD5' self.startsasl = SASL_IN_PROCESS self._owner.send(str(node)) @@ -294,13 +299,13 @@ class SASL(PlugIn): handlers = self._owner.Dispatcher.dumpHandlers() # Bosh specific dispatcher replugging - # save old features. They will be used in case we won't get response on - # stream restart after SASL auth (happens with XMPP over BOSH with - # Openfire) + # save old features. They will be used in case we won't get response + # on stream restart after SASL auth (happens with XMPP over BOSH + # with Openfire) old_features = self._owner.Dispatcher.Stream.features self._owner.Dispatcher.PlugOut() dispatcher_nb.Dispatcher.get_instance().PlugIn(self._owner, - after_SASL=True, old_features=old_features) + after_SASL=True, old_features=old_features) self._owner.Dispatcher.restoreHandlers(handlers) self._owner.User = self.username @@ -322,12 +327,12 @@ class SASL(PlugIn): rc = kerberos.authGSSClientUnwrap(self.gss_vc, incoming_data) response = kerberos.authGSSClientResponse(self.gss_vc) rc = kerberos.authGSSClientWrap(self.gss_vc, response, - kerberos.authGSSClientUserName(self.gss_vc)) + kerberos.authGSSClientUserName(self.gss_vc)) response = kerberos.authGSSClientResponse(self.gss_vc) if not response: response = '' self._owner.send(Node('response', attrs={'xmlns': NS_SASL}, - payload=response).__str__()) + payload=response).__str__()) raise NodeProcessed if self.mechanism == 'SCRAM-SHA-1': hashfn = hashlib.sha1 @@ -408,8 +413,8 @@ class SASL(PlugIn): else: self.resp['realm'] = self._owner.Server self.resp['nonce'] = chal['nonce'] - self.resp['cnonce'] = ''.join("%x" % randint(0, 2**28) for randint in - itertools.repeat(random.randint, 7)) + self.resp['cnonce'] = ''.join("%x" % randint(0, 2**28) for randint \ + in itertools.repeat(random.randint, 7)) self.resp['nc'] = ('00000001') self.resp['qop'] = 'auth' self.resp['digest-uri'] = 'xmpp/' + self._owner.Server @@ -449,10 +454,10 @@ class SASL(PlugIn): hash_realm = self._convert_to_iso88591(self.resp['realm']) hash_password = self._convert_to_iso88591(self.password) A1 = C([H(C([hash_username, hash_realm, hash_password])), - self.resp['nonce'], self.resp['cnonce']]) + self.resp['nonce'], self.resp['cnonce']]) A2 = C(['AUTHENTICATE', self.resp['digest-uri']]) response= HH(C([HH(A1), self.resp['nonce'], self.resp['nc'], - self.resp['cnonce'], self.resp['qop'], HH(A2)])) + self.resp['cnonce'], self.resp['qop'], HH(A2)])) self.resp['response'] = response sasl_data = u'' for key in ('charset', 'username', 'realm', 'nonce', 'nc', 'cnonce', @@ -462,14 +467,15 @@ class SASL(PlugIn): else: sasl_data += u'%s="%s",' % (key, self.resp[key]) sasl_data = sasl_data[:-1].encode('utf-8').encode('base64').replace( - '\r', '').replace('\n', '') - node = Node('response', attrs={'xmlns':NS_SASL}, payload=[sasl_data]) + '\r', '').replace('\n', '') + node = Node('response', attrs={'xmlns': NS_SASL}, + payload=[sasl_data]) elif self.mechanism == 'PLAIN': sasl_data = u'\x00%s\x00%s' % (self.username, self.password) sasl_data = sasl_data.encode('utf-8').encode('base64').replace( - '\n', '') + '\n', '') node = Node('auth', attrs={'xmlns': NS_SASL, 'mechanism': 'PLAIN'}, - payload=[sasl_data]) + payload=[sasl_data]) self._owner.send(str(node)) @@ -491,8 +497,8 @@ class NonBlockingNonSASL(PlugIn): self.password = password self.resource = resource self.on_auth = on_auth - - + + def plugin(self, owner): """ Determine the best auth method (digest/0k/plain) and use it for auth. @@ -502,8 +508,8 @@ class NonBlockingNonSASL(PlugIn): self.owner = owner owner.Dispatcher.SendAndWaitForResponse( - Iq('get', NS_AUTH, payload=[Node('username', payload=[self.user])]), - func=self._on_username) + Iq('get', NS_AUTH, payload=[Node('username', payload=[self.user])]), + func=self._on_username) def _on_username(self, resp): if not isResultNode(resp): @@ -518,8 +524,8 @@ class NonBlockingNonSASL(PlugIn): if query.getTag('digest'): log.info("Performing digest authentication") query.setTagData('digest', - hashlib.sha1(self.owner.Dispatcher.Stream._document_attrs['id'] - + self.password).hexdigest()) + hashlib.sha1(self.owner.Dispatcher.Stream._document_attrs['id'] + + self.password).hexdigest()) if query.getTag('password'): query.delChild('password') self._method = 'digest' @@ -534,23 +540,25 @@ class NonBlockingNonSASL(PlugIn): def hash_n_times(s, count): return count and hasher(hash_n_times(s, count-1)) or s - hash_ = hash_n_times(hasher(hasher(self.password) + token), int(seq)) + hash_ = hash_n_times(hasher(hasher(self.password) + token), + int(seq)) query.setTagData('hash', hash_) self._method='0k' else: log.warn("Secure methods unsupported, performing plain text \ - authentication") + authentication") query.setTagData('password', self.password) self._method = 'plain' - resp = self.owner.Dispatcher.SendAndWaitForResponse(iq, func=self._on_auth) + resp = self.owner.Dispatcher.SendAndWaitForResponse(iq, + func=self._on_auth) def _on_auth(self, resp): if isResultNode(resp): log.info('Sucessfully authenticated with remote host.') self.owner.User = self.user self.owner.Resource = self.resource - self.owner._registered_name = self.owner.User+'@'+self.owner.Server+\ - '/'+self.owner.Resource + self.owner._registered_name = self.owner.User + '@' + \ + self.owner.Server+ '/' + self.owner.Resource return self.on_auth(self._method) log.info('Authentication failed!') return self.on_auth(None) @@ -572,24 +580,24 @@ class NonBlockingBind(PlugIn): if self._owner.Dispatcher.Stream.features: try: self.FeaturesHandler(self._owner.Dispatcher, - self._owner.Dispatcher.Stream.features) + self._owner.Dispatcher.Stream.features) except NodeProcessed: pass else: self._owner.RegisterHandler('features', self.FeaturesHandler, - xmlns=NS_STREAMS) + xmlns=NS_STREAMS) def FeaturesHandler(self, conn, feats): """ Determine if server supports resource binding and set some internal attributes accordingly. - + It also checks if server supports stream management """ - + if feats.getTag('sm', namespace=NS_STREAM_MGMT): self.supports_sm = True # server supports stream management - + if not feats.getTag('bind', namespace=NS_BIND): log.info('Server does not requested binding.') # we try to bind resource anyway @@ -607,7 +615,7 @@ class NonBlockingBind(PlugIn): Remove Bind handler from owner's dispatcher. Used internally """ self._owner.UnregisterHandler('features', self.FeaturesHandler, - xmlns=NS_STREAMS) + xmlns=NS_STREAMS) def NonBlockingBind(self, resource=None, on_bound=None): """ @@ -622,8 +630,9 @@ class NonBlockingBind(PlugIn): self._owner.onreceive(None) self._owner.Dispatcher.SendAndWaitForResponse( - Protocol('iq', typ='set', payload=[Node('bind', attrs={'xmlns':NS_BIND}, - payload=self._resource)]), func=self._on_bound) + Protocol('iq', typ='set', payload=[Node('bind', + attrs={'xmlns': NS_BIND}, payload=self._resource)]), + func=self._on_bound) def _on_bound(self, resp): if isResultNode(resp): @@ -634,7 +643,7 @@ class NonBlockingBind(PlugIn): self._owner.User = jid.getNode() self._owner.Resource = jid.getResource() # Only negociate stream management after bounded - if self.supports_sm: + if self.supports_sm: # starts negociation sm = Smacks(self._owner) self._owner.Dispatcher.supports_sm = True @@ -647,8 +656,8 @@ class NonBlockingBind(PlugIn): self.on_bound('ok') else: self._owner.SendAndWaitForResponse(Protocol('iq', typ='set', - payload=[Node('session', attrs={'xmlns':NS_SESSION})]), - func=self._on_session) + payload=[Node('session', attrs={'xmlns':NS_SESSION})]), + func=self._on_session) return if resp: log.info('Binding failed: %s.' % resp.getTag('error')) From 9c8b63afc420d80a933ebfe5f2caf4100e850706 Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Tue, 31 May 2011 17:09:49 -0400 Subject: [PATCH 05/16] Checks for stanzas handled by server --- src/common/xmpp/dispatcher_nb.py | 7 +++++++ src/common/xmpp/smacks.py | 26 +++++++++++++++++++++++--- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index 31fa86e3a..f01f78a0a 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -533,6 +533,13 @@ class XMPPDispatcher(PlugIn): ID = stanza.getID() if self._owner._registered_name and not stanza.getAttr('from'): stanza.setAttr('from', self._owner._registered_name) + + if self.supports_sm: + self.sm.uqueue.append(stanza) + self.sm.out_h = self.sm.out_h + 1 + if len(self.sm.uqueue) > self.sm.max_queue: + self.sm.request_ack() + self._owner.Connection.send(stanza, now) return ID diff --git a/src/common/xmpp/smacks.py b/src/common/xmpp/smacks.py index 7c99ab5b8..005f2e375 100644 --- a/src/common/xmpp/smacks.py +++ b/src/common/xmpp/smacks.py @@ -1,5 +1,7 @@ from protocol import Acks from protocol import NS_STREAM_MGMT +import logging +log = logging.getLogger('gajim.c.x.smacks') class Smacks(): ''' @@ -16,13 +18,16 @@ class Smacks(): self.out_h = 0 # Outgoing stanzas handled self.in_h = 0 # Incoming stanzas handled self.uqueue = [] # Unhandled stanzas queue - - #Register handlers + # Max number of stanzas in queue before making a request + self.max_queue = 5 + # Register handlers owner.Dispatcher.RegisterNamespace(NS_STREAM_MGMT) owner.Dispatcher.RegisterHandler('enabled', self._neg_response ,xmlns=NS_STREAM_MGMT) owner.Dispatcher.RegisterHandler('r', self.send_ack ,xmlns=NS_STREAM_MGMT) + owner.Dispatcher.RegisterHandler('a', self.check_ack + ,xmlns=NS_STREAM_MGMT) def negociate(self): @@ -41,4 +46,19 @@ class Smacks(): def request_ack(self): r = Acks() r.buildRequest() - self._owner.Connection.send(r, False) \ No newline at end of file + self._owner.Connection.send(r, False) + + def check_ack(self, disp, stanza): + h = int(stanza.getAttr('h')) + diff = self.out_h - h + + + if len(self.uqueue) < diff or diff < 0: + log.error('Server and client number of stanzas handled mismatch ') + return + + while (len(self.uqueue) > diff): + self.uqueue.pop(0) + + + From 9128e6e367a31187594a16e8f8d2051e4d5b1e2b Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Tue, 31 May 2011 18:03:28 -0400 Subject: [PATCH 06/16] minor changes --- src/common/xmpp/smacks.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/common/xmpp/smacks.py b/src/common/xmpp/smacks.py index 005f2e375..ad191857e 100644 --- a/src/common/xmpp/smacks.py +++ b/src/common/xmpp/smacks.py @@ -18,6 +18,8 @@ class Smacks(): self.out_h = 0 # Outgoing stanzas handled self.in_h = 0 # Incoming stanzas handled self.uqueue = [] # Unhandled stanzas queue + self.sesion_id = None + self.supports_resume = False # If server supports resume # Max number of stanzas in queue before making a request self.max_queue = 5 # Register handlers @@ -32,11 +34,15 @@ class Smacks(): def negociate(self): stanza = Acks() - stanza.buildEnable() - self._owner.Connection.send(stanza, True) + stanza.buildEnable(resume=True) + self._owner.Connection.send(stanza, now=True) def _neg_response(self, disp, stanza): - pass + r = stanza.getAttr('resume') + if r == 'true': + self.supports_resume = True + self.sesion_id = stanza.getAttr(id) + def send_ack(self, disp, stanza): ack = Acks() @@ -49,6 +55,10 @@ class Smacks(): self._owner.Connection.send(r, False) def check_ack(self, disp, stanza): + ''' Checks if the number of stanzas sent are the same as the + number of stanzas received by the server. Pops stanzas that were + handled by the server from the queue. + ''' h = int(stanza.getAttr('h')) diff = self.out_h - h From 89cd4b2e45cb5e050ff29209a5302bddb3fe1d67 Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Mon, 6 Jun 2011 23:34:19 -0400 Subject: [PATCH 07/16] stream resumption (needs testing) --- src/common/connection.py | 19 +++++--- src/common/xmpp/__init__.py | 1 + src/common/xmpp/auth_nb.py | 13 ++++-- src/common/xmpp/dispatcher_nb.py | 3 +- src/common/xmpp/smacks.py | 77 +++++++++++++++++++++++++------- 5 files changed, 88 insertions(+), 25 deletions(-) diff --git a/src/common/connection.py b/src/common/connection.py index ffe694dd0..bbb2cb974 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -57,9 +57,9 @@ from common import gajim from common import gpg from common import passwords from common import exceptions - from connection_handlers import * +from xmpp import Smacks from string import Template import logging log = logging.getLogger('gajim.c.connection') @@ -711,6 +711,9 @@ class Connection(CommonConnection, ConnectionHandlers): self.private_storage_supported = True self.streamError = '' self.secret_hmac = str(random.random())[2:] + + self.sm = Smacks(self) + gajim.ged.register_event_handler('privacy-list-received', ged.CORE, self._nec_privacy_list_received) gajim.ged.register_event_handler('agent-info-error-received', ged.CORE, @@ -1587,12 +1590,18 @@ class Connection(CommonConnection, ConnectionHandlers): self.connection.set_send_timeout2(self.pingalives, self.sendPing) self.connection.onreceive(None) - self.request_message_archiving_preferences() + self.privacy_rules_requested = False - self.discoverInfo(gajim.config.get_per('accounts', self.name, 'hostname'), - id_prefix='Gajim_') - + # If we are not resuming, we ask for discovery info + # and archiving preferences + if not self.sm.resuming: + + self.request_message_archiving_preferences() + self.discoverInfo(gajim.config.get_per('accounts', self.name, 'hostname'), + id_prefix='Gajim_') + + self.sm.resuming = False # back to previous state # Discover Stun server(s) gajim.resolver.resolve('_stun._udp.' + helpers.idn_to_ascii( self.connected_hostname), self._on_stun_resolved) diff --git a/src/common/xmpp/__init__.py b/src/common/xmpp/__init__.py index 4ebc4cfa3..46037a1cb 100644 --- a/src/common/xmpp/__init__.py +++ b/src/common/xmpp/__init__.py @@ -15,3 +15,4 @@ import simplexml, protocol, auth_nb, transports_nb, roster_nb import dispatcher_nb, features_nb, idlequeue, bosh, tls_nb, proxy_connectors from client_nb import NonBlockingClient from plugin import PlugIn +from smacks import Smacks diff --git a/src/common/xmpp/auth_nb.py b/src/common/xmpp/auth_nb.py index 90e8a421d..9039b12f5 100644 --- a/src/common/xmpp/auth_nb.py +++ b/src/common/xmpp/auth_nb.py @@ -643,13 +643,18 @@ class NonBlockingBind(PlugIn): self._owner.User = jid.getNode() self._owner.Resource = jid.getResource() # Only negociate stream management after bounded + sm = self._owner._caller.sm if self.supports_sm: # starts negociation - sm = Smacks(self._owner) - self._owner.Dispatcher.supports_sm = True + if sm._owner and sm.resumption: + sm.set_owner(self._owner) + sm.resume_request() + else: + sm.set_owner(self._owner) + sm.negociate() + self._owner.Dispatcher.sm = sm - sm.negociate() - + if hasattr(self, 'session') and self.session == -1: # Server don't want us to initialize a session log.info('No session required.') diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index f01f78a0a..e0b5d1538 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -420,7 +420,8 @@ class XMPPDispatcher(PlugIn): typ = '' stanza.props = stanza.getProperties() ID = stanza.getID() - if self.supports_sm and (stanza.getName() != 'r' and + # If server supports stream management + if self.sm != None and (stanza.getName() != 'r' and stanza.getName() != 'a' and stanza.getName() != 'enabled') : # increments the number of stanzas that has been handled diff --git a/src/common/xmpp/smacks.py b/src/common/xmpp/smacks.py index ad191857e..9f360cd65 100644 --- a/src/common/xmpp/smacks.py +++ b/src/common/xmpp/smacks.py @@ -13,15 +13,21 @@ class Smacks(): ''' - def __init__(self, owner): - self._owner = owner + def __init__(self, con): + self.con = con # Connection object self.out_h = 0 # Outgoing stanzas handled self.in_h = 0 # Incoming stanzas handled self.uqueue = [] # Unhandled stanzas queue - self.sesion_id = None - self.supports_resume = False # If server supports resume + self.session_id = None + self.resumption = False # If server supports resume # Max number of stanzas in queue before making a request self.max_queue = 5 + self._owner = None + self.resuming = False + + def set_owner(self, owner): + self._owner = owner + # Register handlers owner.Dispatcher.RegisterNamespace(NS_STREAM_MGMT) owner.Dispatcher.RegisterHandler('enabled', self._neg_response @@ -30,19 +36,41 @@ class Smacks(): ,xmlns=NS_STREAM_MGMT) owner.Dispatcher.RegisterHandler('a', self.check_ack ,xmlns=NS_STREAM_MGMT) - - - def negociate(self): - stanza = Acks() - stanza.buildEnable(resume=True) - self._owner.Connection.send(stanza, now=True) + owner.Dispatcher.RegisterHandler('resumed', self.check_ack + ,xmlns=NS_STREAM_MGMT) + owner.Dispatcher.RegisterHandler('failed', self.error_handling + ,xmlns=NS_STREAM_MGMT) + def _neg_response(self, disp, stanza): r = stanza.getAttr('resume') - if r == 'true': - self.supports_resume = True - self.sesion_id = stanza.getAttr(id) - + if r == 'true' or r == 'True' or r == '1': + self.resumption = True + self.session_id = stanza.getAttr('id') + + if r == 'false' or r == 'False' or r == '0': + self.negociate(False) + + def negociate(self, resume=True): + # Every time we attempt to negociate, we must erase all previous info + # about any previous session + self.uqueue = [] + self.in_h = 0 + self.out_h = 0 + self.session_id = None + + stanza = Acks() + stanza.buildEnable(resume) + self._owner.Connection.send(stanza, now=True) + + def resume_request(self): + if not self.session_id: + self.resuming = False + log.error('Attempted to resume without a valid session id ') + return + resume = Acks() + resume.buildResume(self.in_h, self.session_id) + self._owner.Connection.send(resume, True) def send_ack(self, disp, stanza): ack = Acks() @@ -70,5 +98,24 @@ class Smacks(): while (len(self.uqueue) > diff): self.uqueue.pop(0) - + if stanza.getName() == 'resumed': + self.resuming = True + if self.uqueue != []: + for i in self.uqueue: + self._owner.Connection.send(i, False) + + def error_handling(self, disp, stanza): # NEEDS TESTING + + tag = stanza.getTag('item-not-found') + # If the server doesn't recognize previd, forget about resuming + # Ask for service discovery, etc.. + if tag: + self.negociate() + self.resuming = False + self.con._discover_server_at_connection(self.con.connection) + + tag = stanza.getTag('feature-not-implemented') + # Doesn't support resumption + if tag: + self.negociate(False) \ No newline at end of file From 2090b9b900a2e7d1d0a7f39bd884d1e1ec85930c Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Mon, 6 Jun 2011 23:43:13 -0400 Subject: [PATCH 08/16] fixed small bug in dispatcher --- src/common/xmpp/dispatcher_nb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index e0b5d1538..d3f5b7cb3 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -91,7 +91,7 @@ class XMPPDispatcher(PlugIn): self.getAnID, self.Event, self.send] # Let the dispatcher know if there is support for stream management - self.supports_sm = False + self.sm = None def getAnID(self): global outgoingID @@ -535,7 +535,7 @@ class XMPPDispatcher(PlugIn): if self._owner._registered_name and not stanza.getAttr('from'): stanza.setAttr('from', self._owner._registered_name) - if self.supports_sm: + if self.sm: self.sm.uqueue.append(stanza) self.sm.out_h = self.sm.out_h + 1 if len(self.sm.uqueue) > self.sm.max_queue: From 7194260f9803002afcc48ed5c6f93f8fef5cb656 Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Tue, 7 Jun 2011 18:48:53 -0400 Subject: [PATCH 09/16] connects when server can't resume stream --- src/common/xmpp/dispatcher_nb.py | 2 +- src/common/xmpp/smacks.py | 32 +++++++++++++++++++++++--------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index d3f5b7cb3..ca321e878 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -535,7 +535,7 @@ class XMPPDispatcher(PlugIn): if self._owner._registered_name and not stanza.getAttr('from'): stanza.setAttr('from', self._owner._registered_name) - if self.sm: + if self.sm and self.sm.enabled: self.sm.uqueue.append(stanza) self.sm.out_h = self.sm.out_h + 1 if len(self.sm.uqueue) > self.sm.max_queue: diff --git a/src/common/xmpp/smacks.py b/src/common/xmpp/smacks.py index 9f360cd65..c0fa77c0e 100644 --- a/src/common/xmpp/smacks.py +++ b/src/common/xmpp/smacks.py @@ -24,7 +24,8 @@ class Smacks(): self.max_queue = 5 self._owner = None self.resuming = False - + self.enabled = False # If SM is enabled + def set_owner(self, owner): self._owner = owner @@ -50,6 +51,8 @@ class Smacks(): if r == 'false' or r == 'False' or r == '0': self.negociate(False) + + self.enabled = True def negociate(self, resume=True): # Every time we attempt to negociate, we must erase all previous info @@ -69,7 +72,7 @@ class Smacks(): log.error('Attempted to resume without a valid session id ') return resume = Acks() - resume.buildResume(self.in_h, self.session_id) + resume.buildResume(self.in_h, None)#self.session_id) self._owner.Connection.send(resume, True) def send_ack(self, disp, stanza): @@ -105,17 +108,28 @@ class Smacks(): for i in self.uqueue: self._owner.Connection.send(i, False) - def error_handling(self, disp, stanza): # NEEDS TESTING + def error_handling(self, disp, stanza): - tag = stanza.getTag('item-not-found') # If the server doesn't recognize previd, forget about resuming # Ask for service discovery, etc.. - if tag: - self.negociate() + if stanza.getTag('item-not-found'): + self.enabled = False self.resuming = False + self.negociate() self.con._discover_server_at_connection(self.con.connection) + return - tag = stanza.getTag('feature-not-implemented') # Doesn't support resumption - if tag: - self.negociate(False) \ No newline at end of file + if stanza.getTag('feature-not-implemented'): + self.enabled = False + self.negociate(False) + return + + if stanza.getTag('unexpected-request'): + self.enabled = False + log.error('Gajim failed to negociate Stream Management') + return + + + + \ No newline at end of file From 39a960f3e71530e819ff38a70abe39965f9eca89 Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Tue, 7 Jun 2011 23:50:45 -0400 Subject: [PATCH 10/16] keeps old status after disconnection --- src/common/connection.py | 11 ++++++++++- src/common/xmpp/dispatcher_nb.py | 7 ++++--- src/common/xmpp/smacks.py | 3 ++- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/common/connection.py b/src/common/connection.py index bbb2cb974..46daadfa0 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -784,7 +784,16 @@ class Connection(CommonConnection, ConnectionHandlers): self.connection.disconnect() self.last_connection = None self.connection = None - + def set_oldst(self): # Set old state + if self.old_show: + self.connected = gajim.SHOW_LIST.index(self.old_show) + gajim.nec.push_incoming_event(OurShowEvent(None, conn=self, + show=self.connected)) + else: # we default to online + self.connected = 2 + gajim.nec.push_incoming_event(OurShowEvent(None, conn=self, + show=gajim.SHOW_LIST[self.connected])) + def _disconnectedReconnCB(self): """ Called when we are disconnected diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index ca321e878..0f84c196a 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -421,9 +421,10 @@ class XMPPDispatcher(PlugIn): stanza.props = stanza.getProperties() ID = stanza.getID() # If server supports stream management - if self.sm != None and (stanza.getName() != 'r' and - stanza.getName() != 'a' and - stanza.getName() != 'enabled') : + if self.sm and self.sm.enabled and (stanza.getName() != 'r' or + stanza.getName() != 'a' or + stanza.getName() != 'enabled' or + stanza.getName() != 'resumed'): # increments the number of stanzas that has been handled self.sm.in_h = self.sm.in_h + 1 list_ = ['default'] # we will use all handlers: diff --git a/src/common/xmpp/smacks.py b/src/common/xmpp/smacks.py index c0fa77c0e..b1b6dac03 100644 --- a/src/common/xmpp/smacks.py +++ b/src/common/xmpp/smacks.py @@ -72,7 +72,7 @@ class Smacks(): log.error('Attempted to resume without a valid session id ') return resume = Acks() - resume.buildResume(self.in_h, None)#self.session_id) + resume.buildResume(self.in_h, self.session_id) self._owner.Connection.send(resume, True) def send_ack(self, disp, stanza): @@ -104,6 +104,7 @@ class Smacks(): if stanza.getName() == 'resumed': self.resuming = True + self.con.set_oldst() if self.uqueue != []: for i in self.uqueue: self._owner.Connection.send(i, False) From d73e277f40508e107cb1e887be111ea4121b8b08 Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Fri, 10 Jun 2011 23:07:34 -0400 Subject: [PATCH 11/16] resuming before binding --- src/common/connection.py | 7 ++++--- src/common/xmpp/auth_nb.py | 16 ++++++++-------- src/common/xmpp/client_nb.py | 14 +++++++++++++- src/common/xmpp/smacks.py | 2 +- 4 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/common/connection.py b/src/common/connection.py index 46daadfa0..e5ac4b86d 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -712,7 +712,7 @@ class Connection(CommonConnection, ConnectionHandlers): self.streamError = '' self.secret_hmac = str(random.random())[2:] - self.sm = Smacks(self) + self.sm = Smacks(self) # Stream Management gajim.ged.register_event_handler('privacy-list-received', ged.CORE, self._nec_privacy_list_received) @@ -805,8 +805,9 @@ class Connection(CommonConnection, ConnectionHandlers): self.old_show = gajim.SHOW_LIST[self.connected] self.connected = 0 if not self.on_purpose: - gajim.nec.push_incoming_event(OurShowEvent(None, conn=self, - show='offline')) + if not (self.sm and self.sm.resumption): + gajim.nec.push_incoming_event(OurShowEvent(None, conn=self, + show='offline')) self.disconnect() if gajim.config.get_per('accounts', self.name, 'autoreconnect'): self.connected = -1 diff --git a/src/common/xmpp/auth_nb.py b/src/common/xmpp/auth_nb.py index 9039b12f5..79b967c20 100644 --- a/src/common/xmpp/auth_nb.py +++ b/src/common/xmpp/auth_nb.py @@ -574,6 +574,7 @@ class NonBlockingBind(PlugIn): PlugIn.__init__(self) self.bound = None self.supports_sm = False + self.resuming = False def plugin(self, owner): ''' Start resource binding, if allowed at this time. Used internally. ''' @@ -597,6 +598,8 @@ class NonBlockingBind(PlugIn): if feats.getTag('sm', namespace=NS_STREAM_MGMT): self.supports_sm = True # server supports stream management + if self.resuming: + self._owner._caller.sm.resume_request() if not feats.getTag('bind', namespace=NS_BIND): log.info('Server does not requested binding.') @@ -621,6 +624,8 @@ class NonBlockingBind(PlugIn): """ Perform binding. Use provided resource name or random (if not provided). """ + if self.resuming: # We don't bind if we resume the stream + return self.on_bound = on_bound self._resource = resource if self._resource: @@ -646,13 +651,8 @@ class NonBlockingBind(PlugIn): sm = self._owner._caller.sm if self.supports_sm: # starts negociation - if sm._owner and sm.resumption: - sm.set_owner(self._owner) - sm.resume_request() - else: - sm.set_owner(self._owner) - sm.negociate() - + sm.set_owner(self._owner) + sm.negociate() self._owner.Dispatcher.sm = sm if hasattr(self, 'session') and self.session == -1: @@ -670,7 +670,7 @@ class NonBlockingBind(PlugIn): else: log.info('Binding failed: timeout expired.') self.on_bound(None) - + def _on_session(self, resp): self._owner.onreceive(None) if isResultNode(resp): diff --git a/src/common/xmpp/client_nb.py b/src/common/xmpp/client_nb.py index dc4c4476b..aafb5f775 100644 --- a/src/common/xmpp/client_nb.py +++ b/src/common/xmpp/client_nb.py @@ -521,8 +521,20 @@ class NonBlockingClient: self.connected = None # FIXME: is this intended? We use ''elsewhere self._on_sasl_auth(None) elif self.SASL.startsasl == 'success': - auth_nb.NonBlockingBind.get_instance().PlugIn(self) + nb_bind = auth_nb.NonBlockingBind.get_instance() + sm = self._caller.sm + if sm._owner and sm.resumption: + nb_bind.resuming = True + sm.set_owner(self) + self.Dispatcher.sm = sm + nb_bind.PlugIn(self) + return + + + nb_bind.PlugIn(self) self.onreceive(self._on_auth_bind) + + return True def _on_auth_bind(self, data): diff --git a/src/common/xmpp/smacks.py b/src/common/xmpp/smacks.py index b1b6dac03..182ef0074 100644 --- a/src/common/xmpp/smacks.py +++ b/src/common/xmpp/smacks.py @@ -73,7 +73,7 @@ class Smacks(): return resume = Acks() resume.buildResume(self.in_h, self.session_id) - self._owner.Connection.send(resume, True) + self._owner.Connection.send(resume, False) def send_ack(self, disp, stanza): ack = Acks() From 985746d65a4152dc006ffcda85b4ea0c74b5f8ea Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Sat, 11 Jun 2011 16:29:08 -0400 Subject: [PATCH 12/16] only resume when disconnected not on purpose --- src/common/connection.py | 2 ++ src/common/xmpp/dispatcher_nb.py | 6 +++--- src/common/xmpp/smacks.py | 6 +++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/common/connection.py b/src/common/connection.py index e5ac4b86d..9913cf737 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -776,6 +776,8 @@ class Connection(CommonConnection, ConnectionHandlers): self.connected = 0 self.time_to_reconnect = None self.privacy_rules_supported = False + if on_purpose: + self.sm = Smacks(self) if self.connection: # make sure previous connection is completely closed gajim.proxy65_manager.disconnect(self.connection) diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index 0f84c196a..c3174c615 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -421,9 +421,9 @@ class XMPPDispatcher(PlugIn): stanza.props = stanza.getProperties() ID = stanza.getID() # If server supports stream management - if self.sm and self.sm.enabled and (stanza.getName() != 'r' or - stanza.getName() != 'a' or - stanza.getName() != 'enabled' or + if self.sm and self.sm.enabled and (stanza.getName() != 'r' and + stanza.getName() != 'a' and + stanza.getName() != 'enabled' and stanza.getName() != 'resumed'): # increments the number of stanzas that has been handled self.sm.in_h = self.sm.in_h + 1 diff --git a/src/common/xmpp/smacks.py b/src/common/xmpp/smacks.py index 182ef0074..b4e2ed46f 100644 --- a/src/common/xmpp/smacks.py +++ b/src/common/xmpp/smacks.py @@ -96,10 +96,10 @@ class Smacks(): if len(self.uqueue) < diff or diff < 0: log.error('Server and client number of stanzas handled mismatch ') - return + else: - while (len(self.uqueue) > diff): - self.uqueue.pop(0) + while (len(self.uqueue) > diff): + self.uqueue.pop(0) if stanza.getName() == 'resumed': From b8aa5185923a9412745eec33ac73ceb362d0528e Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Mon, 13 Jun 2011 22:37:29 -0400 Subject: [PATCH 13/16] whitespaces aren't added to the SM queue --- src/common/xmpp/dispatcher_nb.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index c3174c615..69b7363c2 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -536,7 +536,8 @@ class XMPPDispatcher(PlugIn): if self._owner._registered_name and not stanza.getAttr('from'): stanza.setAttr('from', self._owner._registered_name) - if self.sm and self.sm.enabled: + # If no ID then it is a whitespace + if self.sm and self.sm.enabled and ID: self.sm.uqueue.append(stanza) self.sm.out_h = self.sm.out_h + 1 if len(self.sm.uqueue) > self.sm.max_queue: From e02088e91c418d1f24555f0df14d6226045bd967 Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Wed, 15 Jun 2011 19:12:55 -0400 Subject: [PATCH 14/16] added tests for smacks.py --- test/unit/test_xmpp_smacks.py | 141 ++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 test/unit/test_xmpp_smacks.py diff --git a/test/unit/test_xmpp_smacks.py b/test/unit/test_xmpp_smacks.py new file mode 100644 index 000000000..feff0e97b --- /dev/null +++ b/test/unit/test_xmpp_smacks.py @@ -0,0 +1,141 @@ +''' +Tests for smacks.py Stream Management +''' +import unittest + +import lib +lib.setup_env() + +from mock import Mock + +from common.xmpp import dispatcher_nb +from common.xmpp import protocol +from common.xmpp import smacks + +class TestDispatcherNB(unittest.TestCase): + ''' + Test class for NonBlocking dispatcher. Tested dispatcher will be plugged + into a mock client + ''' + def setUp(self): + self.dispatcher = dispatcher_nb.XMPPDispatcher() + + # Setup mock client + self.client = Mock() + self.client.__str__ = lambda: 'Mock' # FIXME: why do I need this one? + self.client._caller = Mock() + self.client.defaultNamespace = protocol.NS_CLIENT + self.client.Connection = Mock() # mock transport + self.con = self.client.Connection + self.con.sm = smacks.Smacks(self.con) + + + + + + def tearDown(self): + # Unplug if needed + if hasattr(self.dispatcher, '_owner'): + self.dispatcher.PlugOut() + + def _simulate_connect(self): + self.dispatcher.PlugIn(self.client) # client is owner + self.con.sm.set_owner(self.client) + self.dispatcher.sm = self.con.sm + # Simulate that we have established a connection + self.dispatcher.StreamInit() + self.dispatcher.ProcessNonBlocking("") + self.dispatcher.ProcessNonBlocking(" ") + self.con.sm.negociate() + self.dispatcher.ProcessNonBlocking("") + assert(self.con.sm.enabled) + + + def _simulate_resume(self): + + self.con.sm.resume_request() + # Resuming acknowledging 5 stanzas + self.dispatcher.ProcessNonBlocking("") + assert(self.con.sm.resuming) + + + def _send(self, send, r, stanza): + for i in range(r): + send(stanza) + def test_messages(self): + + message = 'Helloo ' + iq = ''' + + + + + ''' + presence = ''' + 24 + + + In love Kakashi Sensei :P + + db4b7c52e39ba28562c74542d5988d47f09108a3 + + ''' + + self._simulate_connect() + uqueue = self.con.sm.uqueue + self.assertEqual(self.con.sm.out_h, 0) + self.assertEqual(self.con.sm.in_h, 0) + + # The server sends 10 stanzas + self._send(self.dispatcher.ProcessNonBlocking, 5, message) + self._send(self.dispatcher.ProcessNonBlocking, 4, iq) + self._send(self.dispatcher.ProcessNonBlocking, 1, presence) + + # The client has recieved 10 stanzas and sent none + self.assertEqual(self.con.sm.in_h, 10) + self.assertEqual(self.con.sm.out_h, 0) + + m = protocol.Message() + + # The client sends 10 stanzas + for i in range(10): + m = protocol.Message(body=str(i)) + self.dispatcher.send(m) + + # Client sends 10 stanzas and put them in the queue + self.assertEqual(self.con.sm.out_h, 10) + self.assertEqual(len(uqueue), 10) + + # The server acknowledges that it recieved 5 stanzas + self.dispatcher.ProcessNonBlocking("") + # 5 stanzas are removed from the queue, only 5 stanzas are left + + self.assertEqual(len(uqueue), 5) + + # Check for the right order of stanzas in the queue + l = ['5', '6', '7', '8', '9'] + for i in uqueue: + self.assertEqual(i.getBody(), l[0]) + l.pop(0) + + + def test_resumption(self): + + self._simulate_connect() + + m = protocol.Message() + + # The client sends 5 stanzas + for i in range(5): + m = protocol.Message(body=str(i)) + self.dispatcher.send(m) + + self._simulate_resume() + # No stanzas left + self.assertEqual(len(self.con.sm.uqueue), 0) + + + + +if __name__ == '__main__': + unittest.main() From db7276752b267c4ad794c321ed6849c95656f95b Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Wed, 15 Jun 2011 20:37:07 -0400 Subject: [PATCH 15/16] resuming from prefered location --- src/common/connection.py | 12 ++++++++++-- src/common/xmpp/smacks.py | 6 +++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/common/connection.py b/src/common/connection.py index 9913cf737..c543c49f5 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -996,8 +996,16 @@ class Connection(CommonConnection, ConnectionHandlers): """ if self.connection: return self.connection, '' - - if data: + + + if self.sm.resuming and self.sm.location: + # If resuming and server gave a location, connect from there + hostname = self.sm.location + self.try_connecting_for_foo_secs = gajim.config.get_per('accounts', + self.name, 'try_connecting_for_foo_secs') + use_custom = False + + elif data: hostname = data['hostname'] self.try_connecting_for_foo_secs = 45 p = data['proxy'] diff --git a/src/common/xmpp/smacks.py b/src/common/xmpp/smacks.py index b4e2ed46f..658133390 100644 --- a/src/common/xmpp/smacks.py +++ b/src/common/xmpp/smacks.py @@ -25,6 +25,7 @@ class Smacks(): self._owner = None self.resuming = False self.enabled = False # If SM is enabled + self.location = None def set_owner(self, owner): self._owner = owner @@ -51,7 +52,10 @@ class Smacks(): if r == 'false' or r == 'False' or r == '0': self.negociate(False) - + + l = stanza.getAttr('location') + if l: + self.location = l self.enabled = True def negociate(self, resume=True): From 4971f7d8cc03e94b409d278c4be09720082dcbec Mon Sep 17 00:00:00 2001 From: Jefry Lagrange Date: Thu, 16 Jun 2011 18:59:21 -0400 Subject: [PATCH 16/16] enable SM before sending negociation request --- src/common/xmpp/smacks.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/common/xmpp/smacks.py b/src/common/xmpp/smacks.py index 658133390..e64c14d34 100644 --- a/src/common/xmpp/smacks.py +++ b/src/common/xmpp/smacks.py @@ -56,7 +56,7 @@ class Smacks(): l = stanza.getAttr('location') if l: self.location = l - self.enabled = True + def negociate(self, resume=True): # Every time we attempt to negociate, we must erase all previous info @@ -65,6 +65,7 @@ class Smacks(): self.in_h = 0 self.out_h = 0 self.session_id = None + self.enabled = True stanza = Acks() stanza.buildEnable(resume) @@ -118,7 +119,6 @@ class Smacks(): # If the server doesn't recognize previd, forget about resuming # Ask for service discovery, etc.. if stanza.getTag('item-not-found'): - self.enabled = False self.resuming = False self.negociate() self.con._discover_server_at_connection(self.con.connection) @@ -126,7 +126,6 @@ class Smacks(): # Doesn't support resumption if stanza.getTag('feature-not-implemented'): - self.enabled = False self.negociate(False) return