stream resumption (needs testing)

This commit is contained in:
Jefry Lagrange 2011-06-06 23:34:19 -04:00
parent e53f95c87e
commit 89cd4b2e45
5 changed files with 88 additions and 25 deletions

View File

@ -57,9 +57,9 @@ from common import gajim
from common import gpg from common import gpg
from common import passwords from common import passwords
from common import exceptions from common import exceptions
from connection_handlers import * from connection_handlers import *
from xmpp import Smacks
from string import Template from string import Template
import logging import logging
log = logging.getLogger('gajim.c.connection') log = logging.getLogger('gajim.c.connection')
@ -711,6 +711,9 @@ class Connection(CommonConnection, ConnectionHandlers):
self.private_storage_supported = True self.private_storage_supported = True
self.streamError = '' self.streamError = ''
self.secret_hmac = str(random.random())[2:] self.secret_hmac = str(random.random())[2:]
self.sm = Smacks(self)
gajim.ged.register_event_handler('privacy-list-received', ged.CORE, gajim.ged.register_event_handler('privacy-list-received', ged.CORE,
self._nec_privacy_list_received) self._nec_privacy_list_received)
gajim.ged.register_event_handler('agent-info-error-received', ged.CORE, gajim.ged.register_event_handler('agent-info-error-received', ged.CORE,
@ -1587,12 +1590,18 @@ class Connection(CommonConnection, ConnectionHandlers):
self.connection.set_send_timeout2(self.pingalives, self.sendPing) self.connection.set_send_timeout2(self.pingalives, self.sendPing)
self.connection.onreceive(None) self.connection.onreceive(None)
self.request_message_archiving_preferences()
self.privacy_rules_requested = False self.privacy_rules_requested = False
# If we are not resuming, we ask for discovery info
# and archiving preferences
if not self.sm.resuming:
self.request_message_archiving_preferences()
self.discoverInfo(gajim.config.get_per('accounts', self.name, 'hostname'), self.discoverInfo(gajim.config.get_per('accounts', self.name, 'hostname'),
id_prefix='Gajim_') id_prefix='Gajim_')
self.sm.resuming = False # back to previous state
# Discover Stun server(s) # Discover Stun server(s)
gajim.resolver.resolve('_stun._udp.' + helpers.idn_to_ascii( gajim.resolver.resolve('_stun._udp.' + helpers.idn_to_ascii(
self.connected_hostname), self._on_stun_resolved) self.connected_hostname), self._on_stun_resolved)

View File

@ -15,3 +15,4 @@ import simplexml, protocol, auth_nb, transports_nb, roster_nb
import dispatcher_nb, features_nb, idlequeue, bosh, tls_nb, proxy_connectors import dispatcher_nb, features_nb, idlequeue, bosh, tls_nb, proxy_connectors
from client_nb import NonBlockingClient from client_nb import NonBlockingClient
from plugin import PlugIn from plugin import PlugIn
from smacks import Smacks

View File

@ -643,13 +643,18 @@ class NonBlockingBind(PlugIn):
self._owner.User = jid.getNode() self._owner.User = jid.getNode()
self._owner.Resource = jid.getResource() self._owner.Resource = jid.getResource()
# Only negociate stream management after bounded # Only negociate stream management after bounded
sm = self._owner._caller.sm
if self.supports_sm: if self.supports_sm:
# starts negociation # starts negociation
sm = Smacks(self._owner) if sm._owner and sm.resumption:
self._owner.Dispatcher.supports_sm = True sm.set_owner(self._owner)
self._owner.Dispatcher.sm = sm sm.resume_request()
else:
sm.set_owner(self._owner)
sm.negociate() sm.negociate()
self._owner.Dispatcher.sm = sm
if hasattr(self, 'session') and self.session == -1: if hasattr(self, 'session') and self.session == -1:
# Server don't want us to initialize a session # Server don't want us to initialize a session
log.info('No session required.') log.info('No session required.')

View File

@ -420,7 +420,8 @@ class XMPPDispatcher(PlugIn):
typ = '' typ = ''
stanza.props = stanza.getProperties() stanza.props = stanza.getProperties()
ID = stanza.getID() ID = stanza.getID()
if self.supports_sm and (stanza.getName() != 'r' and # If server supports stream management
if self.sm != None and (stanza.getName() != 'r' and
stanza.getName() != 'a' and stanza.getName() != 'a' and
stanza.getName() != 'enabled') : stanza.getName() != 'enabled') :
# increments the number of stanzas that has been handled # increments the number of stanzas that has been handled

View File

@ -13,15 +13,21 @@ class Smacks():
''' '''
def __init__(self, owner): def __init__(self, con):
self._owner = owner self.con = con # Connection object
self.out_h = 0 # Outgoing stanzas handled self.out_h = 0 # Outgoing stanzas handled
self.in_h = 0 # Incoming stanzas handled self.in_h = 0 # Incoming stanzas handled
self.uqueue = [] # Unhandled stanzas queue self.uqueue = [] # Unhandled stanzas queue
self.sesion_id = None self.session_id = None
self.supports_resume = False # If server supports resume self.resumption = False # If server supports resume
# Max number of stanzas in queue before making a request # Max number of stanzas in queue before making a request
self.max_queue = 5 self.max_queue = 5
self._owner = None
self.resuming = False
def set_owner(self, owner):
self._owner = owner
# Register handlers # Register handlers
owner.Dispatcher.RegisterNamespace(NS_STREAM_MGMT) owner.Dispatcher.RegisterNamespace(NS_STREAM_MGMT)
owner.Dispatcher.RegisterHandler('enabled', self._neg_response owner.Dispatcher.RegisterHandler('enabled', self._neg_response
@ -30,19 +36,41 @@ class Smacks():
,xmlns=NS_STREAM_MGMT) ,xmlns=NS_STREAM_MGMT)
owner.Dispatcher.RegisterHandler('a', self.check_ack owner.Dispatcher.RegisterHandler('a', self.check_ack
,xmlns=NS_STREAM_MGMT) ,xmlns=NS_STREAM_MGMT)
owner.Dispatcher.RegisterHandler('resumed', self.check_ack
,xmlns=NS_STREAM_MGMT)
owner.Dispatcher.RegisterHandler('failed', self.error_handling
,xmlns=NS_STREAM_MGMT)
def negociate(self):
stanza = Acks()
stanza.buildEnable(resume=True)
self._owner.Connection.send(stanza, now=True)
def _neg_response(self, disp, stanza): def _neg_response(self, disp, stanza):
r = stanza.getAttr('resume') r = stanza.getAttr('resume')
if r == 'true': if r == 'true' or r == 'True' or r == '1':
self.supports_resume = True self.resumption = True
self.sesion_id = stanza.getAttr(id) self.session_id = stanza.getAttr('id')
if r == 'false' or r == 'False' or r == '0':
self.negociate(False)
def negociate(self, resume=True):
# Every time we attempt to negociate, we must erase all previous info
# about any previous session
self.uqueue = []
self.in_h = 0
self.out_h = 0
self.session_id = None
stanza = Acks()
stanza.buildEnable(resume)
self._owner.Connection.send(stanza, now=True)
def resume_request(self):
if not self.session_id:
self.resuming = False
log.error('Attempted to resume without a valid session id ')
return
resume = Acks()
resume.buildResume(self.in_h, self.session_id)
self._owner.Connection.send(resume, True)
def send_ack(self, disp, stanza): def send_ack(self, disp, stanza):
ack = Acks() ack = Acks()
@ -71,4 +99,23 @@ class Smacks():
self.uqueue.pop(0) self.uqueue.pop(0)
if stanza.getName() == 'resumed':
self.resuming = True
if self.uqueue != []:
for i in self.uqueue:
self._owner.Connection.send(i, False)
def error_handling(self, disp, stanza): # NEEDS TESTING
tag = stanza.getTag('item-not-found')
# If the server doesn't recognize previd, forget about resuming
# Ask for service discovery, etc..
if tag:
self.negociate()
self.resuming = False
self.con._discover_server_at_connection(self.con.connection)
tag = stanza.getTag('feature-not-implemented')
# Doesn't support resumption
if tag:
self.negociate(False)