Add a second timeout to differenciate keepalive and ping alive packets. We now wait 55s before sending a keepalive (whitespace packet) and 2 minutes before sendind a xmpp ping (and then we wait for 1min the answer before reconnecting) Fixes #4643
This commit is contained in:
parent
38b4e54011
commit
a7f0022bb4
3 changed files with 69 additions and 20 deletions
|
@ -800,6 +800,11 @@ class Connection(ConnectionHandlers):
|
|||
return
|
||||
common.xmpp.features_nb.getPrivacyLists(self.connection)
|
||||
|
||||
def send_keepalive(self):
|
||||
# nothing received for the last foo seconds
|
||||
if self.connection:
|
||||
self.connection.send(' ')
|
||||
|
||||
def sendPing(self, pingTo=None):
|
||||
'''Send XMPP Ping (XEP-0199) request. If pingTo is not set, ping is sent
|
||||
to server to detect connection failure at application level.'''
|
||||
|
@ -1018,7 +1023,8 @@ class Connection(ConnectionHandlers):
|
|||
self.connection = con
|
||||
if not self.connection:
|
||||
return
|
||||
self.connection.set_send_timeout(self.keepalives, self.sendPing)
|
||||
self.connection.set_send_timeout(self.keepalives, self.send_keepalive)
|
||||
self.connection.set_send_timeout2(self.keepalives * 2, self.sendPing)
|
||||
self.connection.onreceive(None)
|
||||
iq = common.xmpp.Iq('get', common.xmpp.NS_PRIVACY, xmlns = '')
|
||||
id_ = self.connection.getAnID()
|
||||
|
|
|
@ -205,6 +205,9 @@ class IdleQueue:
|
|||
|
||||
# when there is a timeout it executes obj.read_timeout()
|
||||
# timeout is not removed automatically!
|
||||
# {fd1: {timeout1: func1, timeout2: func2}}
|
||||
# timout are unique (timeout1 must be != timeout2)
|
||||
# If func1 is None, read_time function is called
|
||||
self.read_timeouts = {}
|
||||
|
||||
# cb, which are executed after XX sec., alarms are removed automatically
|
||||
|
@ -247,39 +250,57 @@ class IdleQueue:
|
|||
else:
|
||||
return False
|
||||
|
||||
def remove_timeout(self, fd):
|
||||
def remove_timeout(self, fd, timeout=None):
|
||||
''' Removes the read timeout '''
|
||||
log.info('read timeout removed for fd %s' % fd)
|
||||
if fd in self.read_timeouts:
|
||||
del(self.read_timeouts[fd])
|
||||
if timeout:
|
||||
if timeout in self.read_timeouts[fd]:
|
||||
del(self.read_timeouts[fd][timeout])
|
||||
if len(self.read_timeouts[fd]) == 0:
|
||||
del(self.read_timeouts[fd])
|
||||
else:
|
||||
del(self.read_timeouts[fd])
|
||||
|
||||
def set_read_timeout(self, fd, seconds):
|
||||
def set_read_timeout(self, fd, seconds, func=None):
|
||||
'''
|
||||
Sets a new timeout. If it is not removed after specified seconds,
|
||||
obj.read_timeout() will be called.
|
||||
func or obj.read_timeout() will be called.
|
||||
|
||||
A filedescriptor fd can have only one timeout.
|
||||
A filedescriptor fd can have several timeouts.
|
||||
'''
|
||||
log.info('read timeout set for fd %s on %s seconds' % (fd, seconds))
|
||||
log_txt = 'read timeout set for fd %s on %s seconds' % (fd, seconds)
|
||||
if func:
|
||||
log_txt += ' with function ' + str(func)
|
||||
log.info(log_txt)
|
||||
timeout = self.current_time() + seconds
|
||||
self.read_timeouts[fd] = timeout
|
||||
if fd in self.read_timeouts:
|
||||
self.read_timeouts[fd][timeout] = func
|
||||
else:
|
||||
self.read_timeouts[fd] = {timeout: func}
|
||||
|
||||
def _check_time_events(self):
|
||||
'''
|
||||
Execute and remove alarm callbacks and execute read_timeout() for plugged
|
||||
objects if specified time has ellapsed.
|
||||
Execute and remove alarm callbacks and execute func() or read_timeout()
|
||||
for plugged objects if specified time has ellapsed.
|
||||
'''
|
||||
log.info('check time evs')
|
||||
current_time = self.current_time()
|
||||
|
||||
for fd, timeout in self.read_timeouts.items():
|
||||
if timeout > current_time:
|
||||
continue
|
||||
if fd in self.queue:
|
||||
log.debug('Calling read_timeout for fd %s' % fd)
|
||||
self.queue[fd].read_timeout()
|
||||
else:
|
||||
for fd, timeouts in self.read_timeouts.items():
|
||||
if fd not in self.queue:
|
||||
self.remove_timeout(fd)
|
||||
continue
|
||||
for timeout, func in timeouts.items():
|
||||
if timeout > current_time:
|
||||
continue
|
||||
if func:
|
||||
log.debug('Calling %s for fd %s' % (func, fd))
|
||||
func()
|
||||
else:
|
||||
log.debug('Calling read_timeout for fd %s' % fd)
|
||||
self.queue[fd].read_timeout()
|
||||
self.remove_timeout(fd, timeout)
|
||||
|
||||
times = self.alarms.keys()
|
||||
for alarm_time in times:
|
||||
|
|
|
@ -127,13 +127,15 @@ class NonBlockingTransport(PlugIn):
|
|||
# type of used ssl lib (if any) will be assigned to this member var
|
||||
self.ssl_lib = None
|
||||
self._exported_methods=[self.onreceive, self.set_send_timeout,
|
||||
self.set_timeout, self.remove_timeout, self.start_disconnect]
|
||||
self.set_send_timeout2, self.set_timeout, self.remove_timeout,
|
||||
self.start_disconnect]
|
||||
|
||||
# time to wait for SOME stanza to come and then send keepalive
|
||||
self.sendtimeout = 0
|
||||
|
||||
# in case we want to something different than sending keepalives
|
||||
self.on_timeout = None
|
||||
self.on_timeout2 = None
|
||||
|
||||
def plugin(self, owner):
|
||||
owner.Connection = self
|
||||
|
@ -218,15 +220,26 @@ class NonBlockingTransport(PlugIn):
|
|||
self.on_timeout()
|
||||
self.renew_send_timeout()
|
||||
|
||||
def read_timeout2(self):
|
||||
''' called when there's no response from server in defined timeout '''
|
||||
if self.on_timeout2:
|
||||
self.on_timeout2()
|
||||
self.renew_send_timeout2()
|
||||
|
||||
def renew_send_timeout(self):
|
||||
if self.on_timeout and self.sendtimeout > 0:
|
||||
self.set_timeout(self.sendtimeout)
|
||||
else:
|
||||
self.remove_timeout()
|
||||
|
||||
def renew_send_timeout2(self):
|
||||
if self.on_timeout2 and self.sendtimeout2 > 0:
|
||||
self.set_timeout2(self.sendtimeout2)
|
||||
|
||||
def set_timeout(self, timeout):
|
||||
self.idlequeue.set_read_timeout(self.fd, timeout)
|
||||
|
||||
def set_timeout2(self, timeout2):
|
||||
self.idlequeue.set_read_timeout(self.fd, timeout2, self.read_timeout2)
|
||||
|
||||
def get_fd(self):
|
||||
pass
|
||||
|
||||
|
@ -240,6 +253,13 @@ class NonBlockingTransport(PlugIn):
|
|||
else:
|
||||
self.on_timeout = None
|
||||
|
||||
def set_send_timeout2(self, timeout2, on_timeout2):
|
||||
self.sendtimeout2 = timeout2
|
||||
if self.sendtimeout2 > 0:
|
||||
self.on_timeout2 = on_timeout2
|
||||
else:
|
||||
self.on_timeout2 = None
|
||||
|
||||
# FIXME: where and why does this need to be called
|
||||
def start_disconnect(self):
|
||||
self.set_state(DISCONNECTING)
|
||||
|
@ -541,7 +561,9 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
|
|||
return
|
||||
|
||||
# we have received some bytes, stop the timeout!
|
||||
self.remove_timeout()
|
||||
self.renew_send_timeout()
|
||||
self.renew_send_timeout2()
|
||||
# pass received data to owner
|
||||
if self.on_receive:
|
||||
self.raise_event(DATA_RECEIVED, received)
|
||||
|
|
Loading…
Add table
Reference in a new issue