Coding standards and documentation improvements in idlequeue.py.
Fix a potential bug of missing an alarm_callback. Separate public from internal API.
This commit is contained in:
parent
507bd7b8c6
commit
afbac3d3e6
|
@ -12,16 +12,32 @@
|
||||||
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
## GNU General Public License for more details.
|
## GNU General Public License for more details.
|
||||||
|
|
||||||
|
'''
|
||||||
|
Idlequeues are Gajim's network heartbeat. Transports can be plugged as
|
||||||
|
idle objects and be informed about possible IO.
|
||||||
|
'''
|
||||||
|
|
||||||
import select
|
import select
|
||||||
import logging
|
import logging
|
||||||
import gobject
|
import gobject
|
||||||
log = logging.getLogger('gajim.c.x.idlequeue')
|
log = logging.getLogger('gajim.c.x.idlequeue')
|
||||||
|
|
||||||
|
FLAG_WRITE = 20 # write only
|
||||||
|
FLAG_READ = 19 # read only
|
||||||
|
FLAG_READ_WRITE = 23 # read and write
|
||||||
|
FLAG_CLOSE = 16 # wait for close
|
||||||
|
|
||||||
|
PENDING_READ = 3 # waiting read event
|
||||||
|
PENDING_WRITE = 4 # waiting write event
|
||||||
|
IS_CLOSED = 16 # channel closed
|
||||||
|
|
||||||
|
|
||||||
class IdleObject:
|
class IdleObject:
|
||||||
''' base class for all idle listeners, these are the methods, which are called from IdleQueue
|
'''
|
||||||
|
Idle listener interface. Listed methods are called by IdleQueue.
|
||||||
'''
|
'''
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.fd = -1
|
self.fd = -1 #: filedescriptor, must be unique for each IdleObject
|
||||||
|
|
||||||
def pollend(self):
|
def pollend(self):
|
||||||
''' called on stream failure '''
|
''' called on stream failure '''
|
||||||
|
@ -39,7 +55,17 @@ class IdleObject:
|
||||||
''' called when timeout happened '''
|
''' called when timeout happened '''
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class IdleQueue:
|
class IdleQueue:
|
||||||
|
'''
|
||||||
|
IdleQueue provide three distinct time based features. Uses select.poll()
|
||||||
|
|
||||||
|
1. Alarm timeout: Execute a callback after foo seconds
|
||||||
|
2. Timeout event: Call read_timeout() of an plugged object if a timeout
|
||||||
|
has been set, but not removed in time.
|
||||||
|
3. Check file descriptor of plugged objects for read, write and error
|
||||||
|
events
|
||||||
|
'''
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.queue = {}
|
self.queue = {}
|
||||||
|
|
||||||
|
@ -49,18 +75,16 @@ class IdleQueue:
|
||||||
|
|
||||||
# cb, which are executed after XX sec., alarms are removed automatically
|
# cb, which are executed after XX sec., alarms are removed automatically
|
||||||
self.alarms = {}
|
self.alarms = {}
|
||||||
self.init_idle()
|
self._init_idle()
|
||||||
|
|
||||||
def init_idle(self):
|
def _init_idle(self):
|
||||||
|
''' Hook method for subclassed. Will be called by __init__. '''
|
||||||
self.selector = select.poll()
|
self.selector = select.poll()
|
||||||
|
|
||||||
def remove_timeout(self, fd):
|
|
||||||
log.info('read timeout removed for fd %s' % fd)
|
|
||||||
if fd in self.read_timeouts:
|
|
||||||
del(self.read_timeouts[fd])
|
|
||||||
|
|
||||||
def set_alarm(self, alarm_cb, seconds):
|
def set_alarm(self, alarm_cb, seconds):
|
||||||
''' set up a new alarm, to be called after alarm_cb sec. '''
|
'''
|
||||||
|
Sets up a new alarm. alarm_cb will be called after specified seconds.
|
||||||
|
'''
|
||||||
alarm_time = self.current_time() + seconds
|
alarm_time = self.current_time() + seconds
|
||||||
# almost impossible, but in case we have another alarm_cb at this time
|
# almost impossible, but in case we have another alarm_cb at this time
|
||||||
if alarm_time in self.alarms:
|
if alarm_time in self.alarms:
|
||||||
|
@ -70,12 +94,17 @@ class IdleQueue:
|
||||||
return alarm_time
|
return alarm_time
|
||||||
|
|
||||||
def remove_alarm(self, alarm_cb, alarm_time):
|
def remove_alarm(self, alarm_cb, alarm_time):
|
||||||
''' removes alarm callback alarm_cb scheduled on alarm_time'''
|
'''
|
||||||
if not self.alarms.has_key(alarm_time): return False
|
Removes alarm callback alarm_cb scheduled on alarm_time.
|
||||||
|
Returns True if it was removed sucessfully, otherwise False
|
||||||
|
'''
|
||||||
|
if not alarm_time in self.alarms:
|
||||||
|
return False
|
||||||
i = -1
|
i = -1
|
||||||
for i in range(len(self.alarms[alarm_time])):
|
for i in range(len(self.alarms[alarm_time])):
|
||||||
# let's not modify the list inside the loop
|
# let's not modify the list inside the loop
|
||||||
if self.alarms[alarm_time][i] is alarm_cb: break
|
if self.alarms[alarm_time][i] is alarm_cb:
|
||||||
|
break
|
||||||
if i != -1:
|
if i != -1:
|
||||||
del self.alarms[alarm_time][i]
|
del self.alarms[alarm_time][i]
|
||||||
if self.alarms[alarm_time] == []:
|
if self.alarms[alarm_time] == []:
|
||||||
|
@ -84,16 +113,31 @@ class IdleQueue:
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def remove_timeout(self, fd):
|
||||||
|
''' Removes the read timeout '''
|
||||||
|
log.info('read timeout removed for fd %s' % fd)
|
||||||
|
if fd in self.read_timeouts:
|
||||||
|
del(self.read_timeouts[fd])
|
||||||
|
|
||||||
def set_read_timeout(self, fd, seconds):
|
def set_read_timeout(self, fd, seconds):
|
||||||
''' set a new timeout, if it is not removed after 'seconds',
|
'''
|
||||||
then obj.read_timeout() will be called '''
|
Sets a new timeout. If it is not removed after specified seconds,
|
||||||
|
obj.read_timeout() will be called.
|
||||||
|
|
||||||
|
A filedescriptor fd can have only one timeout.
|
||||||
|
'''
|
||||||
log.info('read timeout set for fd %s on %s seconds' % (fd, seconds))
|
log.info('read timeout set for fd %s on %s seconds' % (fd, seconds))
|
||||||
timeout = self.current_time() + seconds
|
timeout = self.current_time() + seconds
|
||||||
self.read_timeouts[fd] = timeout
|
self.read_timeouts[fd] = timeout
|
||||||
|
|
||||||
def check_time_events(self):
|
def _check_time_events(self):
|
||||||
|
'''
|
||||||
|
Execute and remove alarm callbacks and execute read_timeout() for plugged
|
||||||
|
objects if specified time has ellapsed.
|
||||||
|
'''
|
||||||
log.info('check time evs')
|
log.info('check time evs')
|
||||||
current_time = self.current_time()
|
current_time = self.current_time()
|
||||||
|
|
||||||
for fd, timeout in self.read_timeouts.items():
|
for fd, timeout in self.read_timeouts.items():
|
||||||
if timeout > current_time:
|
if timeout > current_time:
|
||||||
continue
|
continue
|
||||||
|
@ -102,15 +146,25 @@ class IdleQueue:
|
||||||
self.queue[fd].read_timeout()
|
self.queue[fd].read_timeout()
|
||||||
else:
|
else:
|
||||||
self.remove_timeout(fd)
|
self.remove_timeout(fd)
|
||||||
|
|
||||||
times = self.alarms.keys()
|
times = self.alarms.keys()
|
||||||
for alarm_time in times:
|
for alarm_time in times:
|
||||||
if alarm_time > current_time:
|
if alarm_time > current_time:
|
||||||
break
|
continue
|
||||||
if self.alarms.has_key(alarm_time):
|
if alarm_time in self.alarms:
|
||||||
for cb in self.alarms[alarm_time]: cb()
|
for callback in self.alarms[alarm_time]:
|
||||||
if self.alarms.has_key(alarm_time): del(self.alarms[alarm_time])
|
callback()
|
||||||
|
if alarm_time in self.alarms:
|
||||||
|
del(self.alarms[alarm_time])
|
||||||
|
|
||||||
def plug_idle(self, obj, writable = True, readable = True):
|
def plug_idle(self, obj, writable=True, readable=True):
|
||||||
|
'''
|
||||||
|
Plug an IdleObject into idlequeue. Filedescriptor fd must be set.
|
||||||
|
|
||||||
|
:param obj: the IdleObject
|
||||||
|
:param writable: True if obj has data to sent
|
||||||
|
:param readable: True if obj expects data to be reiceived
|
||||||
|
'''
|
||||||
if obj.fd == -1:
|
if obj.fd == -1:
|
||||||
return
|
return
|
||||||
if obj.fd in self.queue:
|
if obj.fd in self.queue:
|
||||||
|
@ -118,48 +172,51 @@ class IdleQueue:
|
||||||
self.queue[obj.fd] = obj
|
self.queue[obj.fd] = obj
|
||||||
if writable:
|
if writable:
|
||||||
if not readable:
|
if not readable:
|
||||||
flags = 20 # write only
|
flags = FLAG_WRITE
|
||||||
else:
|
else:
|
||||||
flags = 23 # both readable and writable
|
flags = FLAG_READ_WRITE
|
||||||
else:
|
else:
|
||||||
if readable:
|
if readable:
|
||||||
flags = 19 # read only
|
flags = FLAG_READ
|
||||||
else:
|
else:
|
||||||
# when we paused a FT, we expect only a close event
|
# when we paused a FT, we expect only a close event
|
||||||
flags = 16
|
flags = FLAG_CLOSE
|
||||||
self.add_idle(obj.fd, flags)
|
self._add_idle(obj.fd, flags)
|
||||||
|
|
||||||
def add_idle(self, fd, flags):
|
def _add_idle(self, fd, flags):
|
||||||
|
''' Hook method for subclasses, called by plug_idle '''
|
||||||
self.selector.register(fd, flags)
|
self.selector.register(fd, flags)
|
||||||
|
|
||||||
def unplug_idle(self, fd):
|
def unplug_idle(self, fd):
|
||||||
|
''' Removed plugged IdleObject, specified by filedescriptor fd. '''
|
||||||
if fd in self.queue:
|
if fd in self.queue:
|
||||||
del(self.queue[fd])
|
del(self.queue[fd])
|
||||||
self.remove_idle(fd)
|
self._remove_idle(fd)
|
||||||
|
|
||||||
def current_time(self):
|
def current_time(self):
|
||||||
from time import time
|
from time import time
|
||||||
return time()
|
return time()
|
||||||
|
|
||||||
def remove_idle(self, fd):
|
def _remove_idle(self, fd):
|
||||||
|
''' Hook method for subclassed, called by unplug_idle '''
|
||||||
self.selector.unregister(fd)
|
self.selector.unregister(fd)
|
||||||
|
|
||||||
def process_events(self, fd, flags):
|
def _process_events(self, fd, flags):
|
||||||
obj = self.queue.get(fd)
|
obj = self.queue.get(fd)
|
||||||
if obj is None:
|
if obj is None:
|
||||||
self.unplug_idle(fd)
|
self.unplug_idle(fd)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if flags & 3: # waiting read event
|
if flags & PENDING_READ:
|
||||||
#print 'waiting read on %d, flags are %d' % (fd, flags)
|
#print 'waiting read on %d, flags are %d' % (fd, flags)
|
||||||
obj.pollin()
|
obj.pollin()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
elif flags & 4: # waiting write event
|
elif flags & PENDING_WRITE:
|
||||||
obj.pollout()
|
obj.pollout()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
elif flags & 16: # closed channel
|
elif flags & IS_CLOSED:
|
||||||
# io error, don't expect more events
|
# io error, don't expect more events
|
||||||
self.remove_timeout(obj.fd)
|
self.remove_timeout(obj.fd)
|
||||||
self.unplug_idle(obj.fd)
|
self.unplug_idle(obj.fd)
|
||||||
|
@ -167,9 +224,16 @@ class IdleQueue:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def process(self):
|
def process(self):
|
||||||
|
'''
|
||||||
|
Process idlequeue. Check for any pending timeout or alarm events.
|
||||||
|
Call IdleObjects on possible and requested read, write and error events
|
||||||
|
on their file descriptors.
|
||||||
|
|
||||||
|
Call this in regular intervals.
|
||||||
|
'''
|
||||||
if not self.queue:
|
if not self.queue:
|
||||||
# check for timeouts/alert also when there are no active fds
|
# check for timeouts/alert also when there are no active fds
|
||||||
self.check_time_events()
|
self._check_time_events()
|
||||||
return True
|
return True
|
||||||
try:
|
try:
|
||||||
waiting_descriptors = self.selector.poll(0)
|
waiting_descriptors = self.selector.poll(0)
|
||||||
|
@ -178,25 +242,27 @@ class IdleQueue:
|
||||||
if e[0] != 4: # interrupt
|
if e[0] != 4: # interrupt
|
||||||
raise
|
raise
|
||||||
for fd, flags in waiting_descriptors:
|
for fd, flags in waiting_descriptors:
|
||||||
self.process_events(fd, flags)
|
self._process_events(fd, flags)
|
||||||
self.check_time_events()
|
self._check_time_events()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
class SelectIdleQueue(IdleQueue):
|
class SelectIdleQueue(IdleQueue):
|
||||||
'''
|
'''
|
||||||
Extends IdleQueue to use select.select() for polling
|
Extends IdleQueue to use select.select() for polling
|
||||||
|
|
||||||
This class exisists for the sake of gtk2.8 on windows, which
|
This class exisists for the sake of gtk2.8 on windows, which
|
||||||
doesn't seem to support io_add_watch properly (yet)
|
doesn't seem to support io_add_watch properly (yet)
|
||||||
'''
|
'''
|
||||||
# TODO: remove this class and its reference gajim.py, when win-gtk2.8 is stable
|
def _init_idle(self):
|
||||||
def init_idle(self):
|
'''
|
||||||
''' this method is called at the end of class constructor.
|
Creates a dict, which maps file/pipe/sock descriptor to glib event id
|
||||||
Creates a dict, which maps file/pipe/sock descriptor to glib event id'''
|
'''
|
||||||
self.read_fds = {}
|
self.read_fds = {}
|
||||||
self.write_fds = {}
|
self.write_fds = {}
|
||||||
self.error_fds = {}
|
self.error_fds = {}
|
||||||
|
|
||||||
def add_idle(self, fd, flags):
|
def _add_idle(self, fd, flags):
|
||||||
''' this method is called when we plug a new idle object.
|
''' this method is called when we plug a new idle object.
|
||||||
Remove descriptor to read/write/error lists, according flags
|
Remove descriptor to read/write/error lists, according flags
|
||||||
'''
|
'''
|
||||||
|
@ -206,7 +272,7 @@ class SelectIdleQueue(IdleQueue):
|
||||||
self.write_fds[fd] = fd
|
self.write_fds[fd] = fd
|
||||||
self.error_fds[fd] = fd
|
self.error_fds[fd] = fd
|
||||||
|
|
||||||
def remove_idle(self, fd):
|
def _remove_idle(self, fd):
|
||||||
''' this method is called when we unplug a new idle object.
|
''' this method is called when we unplug a new idle object.
|
||||||
Remove descriptor from read/write/error lists
|
Remove descriptor from read/write/error lists
|
||||||
'''
|
'''
|
||||||
|
@ -219,7 +285,7 @@ class SelectIdleQueue(IdleQueue):
|
||||||
|
|
||||||
def process(self):
|
def process(self):
|
||||||
if not self.write_fds and not self.read_fds:
|
if not self.write_fds and not self.read_fds:
|
||||||
self.check_time_events()
|
self._check_time_events()
|
||||||
return True
|
return True
|
||||||
try:
|
try:
|
||||||
waiting_descriptors = select.select(self.read_fds.keys(),
|
waiting_descriptors = select.select(self.read_fds.keys(),
|
||||||
|
@ -240,24 +306,26 @@ class SelectIdleQueue(IdleQueue):
|
||||||
q = self.queue.get(fd)
|
q = self.queue.get(fd)
|
||||||
if q:
|
if q:
|
||||||
q.pollend()
|
q.pollend()
|
||||||
self.check_time_events()
|
self._check_time_events()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
class GlibIdleQueue(IdleQueue):
|
class GlibIdleQueue(IdleQueue):
|
||||||
'''
|
'''
|
||||||
Extends IdleQueue to use glib io_add_wath, instead of select/poll
|
Extends IdleQueue to use glib io_add_wath, instead of select/poll
|
||||||
In another, `non gui' implementation of Gajim IdleQueue can be used safetly.
|
In another 'non gui' implementation of Gajim IdleQueue can be used safetly.
|
||||||
|
'''
|
||||||
|
|
||||||
|
def _init_idle(self):
|
||||||
|
'''
|
||||||
|
Creates a dict, which maps file/pipe/sock descriptor to glib event id
|
||||||
'''
|
'''
|
||||||
def init_idle(self):
|
|
||||||
''' this method is called at the end of class constructor.
|
|
||||||
Creates a dict, which maps file/pipe/sock descriptor to glib event id'''
|
|
||||||
self.events = {}
|
self.events = {}
|
||||||
# time() is already called in glib, we just get the last value
|
# time() is already called in glib, we just get the last value
|
||||||
# overrides IdleQueue.current_time()
|
# overrides IdleQueue.current_time()
|
||||||
self.current_time = gobject.get_current_time
|
self.current_time = gobject.get_current_time
|
||||||
|
|
||||||
def add_idle(self, fd, flags):
|
def _add_idle(self, fd, flags):
|
||||||
''' this method is called when we plug a new idle object.
|
''' this method is called when we plug a new idle object.
|
||||||
Start listening for events from fd
|
Start listening for events from fd
|
||||||
'''
|
'''
|
||||||
|
@ -268,13 +336,13 @@ class GlibIdleQueue(IdleQueue):
|
||||||
|
|
||||||
def _process_events(self, fd, flags):
|
def _process_events(self, fd, flags):
|
||||||
try:
|
try:
|
||||||
return self.process_events(fd, flags)
|
return IdleQueue._process_events(self, fd, flags)
|
||||||
except Exception:
|
except Exception:
|
||||||
self.remove_idle(fd)
|
self._remove_idle(fd)
|
||||||
self.add_idle(fd, flags)
|
self._add_idle(fd, flags)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def remove_idle(self, fd):
|
def _remove_idle(self, fd):
|
||||||
''' this method is called when we unplug a new idle object.
|
''' this method is called when we unplug a new idle object.
|
||||||
Stop listening for events from fd
|
Stop listening for events from fd
|
||||||
'''
|
'''
|
||||||
|
@ -284,7 +352,7 @@ class GlibIdleQueue(IdleQueue):
|
||||||
del(self.events[fd])
|
del(self.events[fd])
|
||||||
|
|
||||||
def process(self):
|
def process(self):
|
||||||
self.check_time_events()
|
self._check_time_events()
|
||||||
|
|
||||||
|
|
||||||
# vim: se ts=3:
|
# vim: se ts=3:
|
||||||
|
|
Loading…
Reference in New Issue