Switched to non-blocking connections.

Removed threads and mutexes in connection.py
and gajim.py.
Add resolver through nslookup command, removed
dnspython and pydns (for now).
This commit is contained in:
Dimitur Kirov 2006-02-03 12:17:34 +00:00
parent 54f350b9c7
commit f62db4acfe
15 changed files with 2707 additions and 321 deletions

File diff suppressed because it is too large Load Diff

View File

@ -26,7 +26,6 @@
import os
import sys
import logging
import mutex
import config
from contacts import Contacts
@ -111,10 +110,6 @@ sleeper_state = {} # whether we pass auto away / xa or not
#'autoaway': autoaway and use sleeper
#'autoxa': autoxa and use sleeper
status_before_autoaway = {}
#queues of events from connections...
events_for_ui = {}
#... and its mutex
mutex_events_for_ui = mutex.mutex()
SHOW_LIST = ['offline', 'connecting', 'online', 'chat', 'away', 'xa', 'dnd',
'invisible']

317
src/common/nslookup.py Normal file
View File

@ -0,0 +1,317 @@
## common/nslookup.py
##
## Contributors for this file:
## - Dimitur Kirov <dkirov@gmail.com>
##
## Copyright (C) 2003-2004 Yann Le Boulanger <asterix@lagaule.org>
## Vincent Hanquez <tab@snarc.org>
## Copyright (C) 2006 Yann Le Boulanger <asterix@lagaule.org>
## Vincent Hanquez <tab@snarc.org>
## Nikos Kouremenos <nkour@jabber.org>
## Dimitur Kirov <dkirov@gmail.com>
## Travis Shirk <travis@pobox.com>
## Norman Rasmussen <norman@rasmussen.co.za>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published
## by the Free Software Foundation; version 2 only.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
import sys, os, sre
from xmpp.idlequeue import IdleObject, IdleQueue
if os.name == 'nt':
from subprocess import *
elif os.name == 'posix':
import fcntl
# it is good to check validity of arguments, when calling system commands
ns_type_pattern = sre.compile('^[a-z]+$')
# match srv host_name
host_pattern = sre.compile('^[a-z0-9\-._]*[a-z0-9]\.[a-z]{2,}$')
class Resolver:
def __init__(self, idlequeue):
self.idlequeue = idlequeue
# dict {host : list of srv records}
self.resolved_hosts = {}
# dict {host : list of callbacks}
self.handlers = {}
def parse_srv_result(self, fqdn, result):
''' parse the output of nslookup command and return list of
properties: 'host', 'port','weight', 'priority' corresponding to the found
srv hosts '''
if os.name == 'nt':
return self._parse_srv_result_nt(fqdn, result)
elif os.name == 'posix':
return self._parse_srv_result_posix(fqdn, result)
def _parse_srv_result_nt(self, fqdn, result):
# output from win32 nslookup command
if not result:
return []
hosts = []
lines = result.replace('\r','').split('\n')
current_host = None
for line in lines:
line = line.lstrip()
if line == '':
continue
if line.startswith(fqdn):
rest = line[len(fqdn):]
if rest.find('service') > -1:
current_host = {}
elif isinstance(current_host, dict):
res = line.strip().split('=')
if len(res) != 2:
if len(current_host) == 4:
hosts.append(current_host)
current_host = None
continue
prop_type = res[0].strip()
prop_value = res[1].strip()
if prop_type.find('prio') > -1:
try:
current_host['prio'] = int(prop_value)
except ValueError:
continue
elif prop_type.find('weight') > -1:
try:
current_host['weight'] = int(prop_value)
except ValueError:
continue
elif prop_type.find('port') > -1:
try:
current_host['port'] = int(prop_value)
except ValueError:
continue
elif prop_type.find('host') > -1:
current_host['host'] = prop_value
if len(current_host) == 4:
hosts.append(current_host)
current_host = None
return hosts
def _parse_srv_result_posix(self, fqdn, result):
# typical output of bind-tools nslookup command
if not result:
return []
hosts = []
lines = result.split('\n')
for line in lines:
if line == '':
continue
if line.startswith(fqdn):
rest = line[len(fqdn):].split('=')
if len(rest) != 2:
continue
answer_type, props_str = rest
if answer_type.strip() != 'service':
continue
props = props_str.strip().split(' ')
if len(props) < 4:
continue
prio, weight, port, host = props[-4:]
if host[-1] == '.':
host = host[:-1]
try:
prio = int(prio)
weight = int(weight)
port = int(port)
except ValueError:
continue
hosts.append({'host': host, 'port': port,'weight': weight,
'prio': prio})
return hosts
def _on_ready(self, host, result):
# nslookup finished, parse the result and call the handlers
result_list = self.parse_srv_result(host, result)
# practically it is impossible to be the opposite, but who knows :)
if not self.resolved_hosts.has_key(host):
self.resolved_hosts[host] = result_list
if self.handlers.has_key(host):
for callback in self.handlers[host]:
callback(host, result_list)
del(self.handlers[host])
def start_resolve(self, host):
''' spawn new nslookup process and start waiting for results '''
ns = NsLookup(self._on_ready, host)
ns.set_idlequeue(self.idlequeue)
ns.commandtimeout = 10
ns.start()
def resolve(self, host, on_ready):
if not host:
# empty host, return empty list of srv records
on_ready([])
return
if self.resolved_hosts.has_key(host):
# host is already resolved, return cached values
on_ready(host, self.resolved_hosts[host])
return
if self.handlers.has_key(host):
# host is about to be resolved by another connection,
# attach our callback
self.handlers[host].append(on_ready)
else:
# host has never been resolved, start now
self.handlers[host] = [on_ready]
self.start_resolve(host)
class IdleCommand(IdleObject):
def __init__(self, on_result):
# how long (sec.) to wait for result ( 0 - forever )
# it is a class var, instead of a constant and we can override it.
self.commandtimeout = 0
# when we have some kind of result (valid, ot not) we call this handler
self.result_handler = on_result
# if it is True, we can safetely execute the command
self.canexecute = True
self.idlequeue = None
self.result =''
def set_idlequeue(self, idlequeue):
self.idlequeue = idlequeue
def _return_result(self):
if self.result_handler:
self.result_handler(self.result)
self.result_handler = None
def _compose_command_args(self):
return ['echo', 'da']
def _compose_command_line(self):
''' return one line representation of command and its arguments '''
return reduce(lambda left, right: left + ' ' + right, self._compose_command_args())
def wait_child(self):
if self.pipe.poll() is None:
# result timeout
if self.endtime < self.idlequeue.current_time():
self._return_result()
self.pipe.stdout.close()
else:
# child is still active, continue to wait
self.idlequeue.set_alarm(self.wait_child, 0.1)
else:
# child has quit
self.result = self.pipe.stdout.read()
self._return_result()
self.pipe.stdout.close()
def start(self):
if os.name == 'nt':
self._start_nt()
elif os.name == 'posix':
self._start_posix()
def _start_nt(self):
if not self.canexecute:
self.result = ''
self._return_result()
return
self.pipe = Popen(self._compose_command_args(), stdout=PIPE,
bufsize = 1024, shell = True, stderr = STDOUT, stdin = None)
if self.commandtimeout >= 0:
self.endtime = self.idlequeue.current_time()
self.idlequeue.set_alarm(self.wait_child, 0.1)
def _start_posix(self):
self.pipe = os.popen(self._compose_command_line())
self.fd = self.pipe.fileno()
fcntl.fcntl(self.pipe, fcntl.F_SETFL, os.O_NONBLOCK)
self.idlequeue.plug_idle(self, False, True)
if self.commandtimeout >= 0:
self.idlequeue.set_read_timeout(self.fd, self.commandtimeout)
def end(self):
self.idlequeue.unplug_idle(self.fd)
try:
self.pipe.close()
except:
pass
def pollend(self):
self.idlequeue.remove_timeout(self.fd)
self.end()
self._return_result()
def pollin(self):
try:
res = self.pipe.read()
except Exception, e:
res = ''
if res == '':
return self.pollend()
else:
self.result += res
def read_timeout(self):
self.end()
self._return_result()
class NsLookup(IdleCommand):
def __init__(self, on_result, host='_xmpp-client', type = 'srv'):
IdleCommand.__init__(self, on_result)
self.commandtimeout = 30
self.host = host.lower()
self.type = type.lower()
if not host_pattern.match(self.host):
# invalid host name
# TODO: notify user about this. Maybe message to stderr will be enough
self.host = None
self.canexecute = False
return
if not ns_type_pattern.match(self.type):
self.type = None
self.host = None
self.canexecute = False
return
def _compose_command_args(self):
return ['nslookup', '-type=' + self.type , self.host]
def _return_result(self):
if self.result_handler:
self.result_handler(self.host, self.result)
self.result_handler = None
if __name__ == '__main__':
if os.name != 'posix':
sys.exit()
# testing Resolver class
import gobject
import gtk
idlequeue = IdleQueue()
resolver = Resolver(idlequeue)
def clicked(widget):
global resolver
host = text_view.get_text()
def on_result(host, result_array):
print 'Result:\n' + repr(result_array)
resolver.resolve(host, on_result)
win = gtk.Window()
win.set_border_width(6)
text_view = gtk.Entry()
text_view.set_text('_xmpp-client._tcp.jabber.org')
hbox = gtk.HBox()
hbox.set_spacing(3)
but = gtk.Button(' Lookup SRV ')
hbox.pack_start(text_view, 5)
hbox.pack_start(but, 0)
but.connect('clicked', clicked)
win.add(hbox)
win.show_all()
gobject.timeout_add(200, idlequeue.process)
gtk.main()

View File

@ -817,7 +817,7 @@ class Socks5Receiver(Socks5):
def connect(self):
''' create the socket and start the connect loop '''
self._sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.settimeout(50)
# this will not block the GUI
self._sock.setblocking(False)
self.state = 0 # about to be connected

View File

@ -26,6 +26,7 @@ and use only methods for access all values you should not have any problems.
"""
import simplexml,protocol,debug,auth,transports,roster,dispatcher,features,browser,filetransfer,commands
import simplexml,protocol,debug,auth_nb,auth,transports,transports_nb,roster_nb,roster,dispatcher_nb,dispatcher,features_nb,features,browser,filetransfer,commands, idlequeue
from client_nb import *
from client import *
from protocol import *

366
src/common/xmpp/auth_nb.py Normal file
View File

@ -0,0 +1,366 @@
## auth_nb.py
## based on auth.py
##
## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
## modified by Dimitur Kirov <dkirov@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2, or (at your option)
## any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
'''
Provides library with all Non-SASL and SASL authentication mechanisms.
Can be used both for client and transport authentication.
'''
from protocol import *
from auth import *
from client import PlugIn
import sha,base64,random,dispatcher_nb
class SASL(PlugIn):
''' Implements SASL authentication. '''
def __init__(self,username,password, on_sasl):
PlugIn.__init__(self)
self.username=username
self.password=password
self.on_sasl = on_sasl
def plugin(self,owner):
if not self._owner.Dispatcher.Stream._document_attrs.has_key('version'):
self.startsasl='not-supported'
elif self._owner.Dispatcher.Stream.features:
try:
self.FeaturesHandler(self._owner.Dispatcher, self._owner.Dispatcher.Stream.features)
except NodeProcessed:
pass
else: self.startsasl=None
def auth(self):
''' Start authentication. Result can be obtained via "SASL.startsasl" attribute and will be
either "success" or "failure". Note that successfull auth will take at least
two Dispatcher.Process() calls. '''
if self.startsasl:
pass
elif self._owner.Dispatcher.Stream.features:
try:
self.FeaturesHandler(self._owner.Dispatcher, self._owner.Dispatcher.Stream.features)
except NodeProcessed:
pass
else: self._owner.RegisterHandler('features', self.FeaturesHandler, xmlns=NS_STREAMS)
def plugout(self):
''' Remove SASL handlers from owner's dispatcher. Used internally. '''
self._owner.UnregisterHandler('features', self.FeaturesHandler, xmlns=NS_STREAMS)
self._owner.UnregisterHandler('challenge', self.SASLHandler, xmlns=NS_SASL)
self._owner.UnregisterHandler('failure', self.SASLHandler, xmlns=NS_SASL)
self._owner.UnregisterHandler('success', self.SASLHandler, xmlns=NS_SASL)
def FeaturesHandler(self, conn, feats):
''' Used to determine if server supports SASL auth. Used internally. '''
if not feats.getTag('mechanisms', namespace=NS_SASL):
self.startsasl='not-supported'
self.DEBUG('SASL not supported by server', 'error')
return
mecs=[]
for mec in feats.getTag('mechanisms', namespace=NS_SASL).getTags('mechanism'):
mecs.append(mec.getData())
self._owner.RegisterHandler('challenge', self.SASLHandler, xmlns=NS_SASL)
self._owner.RegisterHandler('failure', self.SASLHandler, xmlns=NS_SASL)
self._owner.RegisterHandler('success', self.SASLHandler, xmlns=NS_SASL)
if "DIGEST-MD5" in mecs:
node=Node('auth',attrs={'xmlns':NS_SASL,'mechanism':'DIGEST-MD5'})
elif "PLAIN" in mecs:
sasl_data='%s\x00%s\x00%s' % (self.username+'@' + self._owner.Server,
self.username, self.password)
node=Node('auth', attrs={'xmlns':NS_SASL,'mechanism':'PLAIN'},
payload=[base64.encodestring(sasl_data)])
else:
self.startsasl='failure'
self.DEBUG('I can only use DIGEST-MD5 and PLAIN mecanisms.', 'error')
return
self.startsasl='in-process'
self._owner.send(node.__str__())
raise NodeProcessed
def SASLHandler(self, conn, challenge):
''' Perform next SASL auth step. Used internally. '''
if challenge.getNamespace() <> NS_SASL:
return
if challenge.getName() == 'failure':
self.startsasl = 'failure'
try:
reason = challenge.getChildren()[0]
except:
reason = challenge
self.DEBUG('Failed SASL authentification: %s' % reason, 'error')
if self.on_sasl :
self.on_sasl ()
raise NodeProcessed
elif challenge.getName() == 'success':
self.startsasl='success'
self.DEBUG('Successfully authenticated with remote server.', 'ok')
handlers=self._owner.Dispatcher.dumpHandlers()
self._owner.Dispatcher.PlugOut()
dispatcher_nb.Dispatcher().PlugIn(self._owner)
self._owner.Dispatcher.restoreHandlers(handlers)
self._owner.User = self.username
if self.on_sasl :
self.on_sasl ()
raise NodeProcessed
########################################3333
incoming_data = challenge.getData()
chal={}
data=base64.decodestring(incoming_data)
self.DEBUG('Got challenge:'+data,'ok')
for pair in data.split(','):
key, value = pair.split('=', 1)
if value[:1] == '"' and value[-1:] == '"':
value=value[1:-1]
chal[key]=value
if chal.has_key('qop') and chal['qop']=='auth':
resp={}
resp['username']=self.username
resp['realm']=self._owner.Server
resp['nonce']=chal['nonce']
cnonce=''
for i in range(7):
cnonce+=hex(int(random.random()*65536*4096))[2:]
resp['cnonce']=cnonce
resp['nc']=('00000001')
resp['qop']='auth'
resp['digest-uri']='xmpp/'+self._owner.Server
A1=C([H(C([resp['username'], resp['realm'], self.password])), resp['nonce'], resp['cnonce']])
A2=C(['AUTHENTICATE',resp['digest-uri']])
response= HH(C([HH(A1),resp['nonce'],resp['nc'],resp['cnonce'],resp['qop'],HH(A2)]))
resp['response']=response
resp['charset']='utf-8'
sasl_data=''
for key in ['charset', 'username', 'realm', 'nonce', 'nc', 'cnonce', 'digest-uri', 'response', 'qop']:
if key in ['nc','qop','response','charset']:
sasl_data += "%s=%s," % (key,resp[key])
else:
sasl_data += '%s="%s",' % (key,resp[key])
########################################3333
node=Node('response', attrs={'xmlns':NS_SASL},
payload=[base64.encodestring(sasl_data[:-1]).replace('\r','').replace('\n','')])
self._owner.send(node.__str__())
elif chal.has_key('rspauth'):
self._owner.send(Node('response', attrs={'xmlns':NS_SASL}).__str__())
else:
self.startsasl='failure'
self.DEBUG('Failed SASL authentification: unknown challenge', 'error')
if self.on_sasl :
self.on_sasl ()
raise NodeProcessed
class NonBlockingNonSASL(PlugIn):
''' Implements old Non-SASL (JEP-0078) authentication used
in jabberd1.4 and transport authentication.
'''
def __init__(self, user, password, resource, on_auth):
''' Caches username, password and resource for auth. '''
PlugIn.__init__(self)
self.DBG_LINE ='gen_auth'
self.user = user
self.password= password
self.resource = resource
self.on_auth = on_auth
def plugin(self, owner):
''' Determine the best auth method (digest/0k/plain) and use it for auth.
Returns used method name on success. Used internally. '''
if not self.resource:
return self.authComponent(owner)
self.DEBUG('Querying server about possible auth methods', 'start')
self.owner = owner
resp = owner.Dispatcher.SendAndWaitForResponse(
Iq('get', NS_AUTH, payload=[Node('username', payload=[self.user])]), func=self._on_username
)
def _on_username(self, resp):
if not isResultNode(resp):
self.DEBUG('No result node arrived! Aborting...','error')
return self.on_auth(None)
iq=Iq(typ='set',node=resp)
query=iq.getTag('query')
query.setTagData('username',self.user)
query.setTagData('resource',self.resource)
if query.getTag('digest'):
self.DEBUG("Performing digest authentication",'ok')
query.setTagData('digest',
sha.new(self.owner.Dispatcher.Stream._document_attrs['id']+self.password).hexdigest())
if query.getTag('password'):
query.delChild('password')
self._method='digest'
elif query.getTag('token'):
token=query.getTagData('token')
seq=query.getTagData('sequence')
self.DEBUG("Performing zero-k authentication",'ok')
hash = sha.new(sha.new(self.password).hexdigest()+token).hexdigest()
for foo in xrange(int(seq)):
hash = sha.new(hash).hexdigest()
query.setTagData('hash',hash)
self._method='0k'
else:
self.DEBUG("Sequre methods unsupported, performing plain text authentication",'warn')
query.setTagData('password',self.password)
self._method='plain'
resp=self.owner.Dispatcher.SendAndWaitForResponse(iq, func=self._on_auth)
def _on_auth(self, resp):
if isResultNode(resp):
self.DEBUG('Sucessfully authenticated with remove host.','ok')
self.owner.User=self.user
self.owner.Resource=self.resource
self.owner._registered_name=self.owner.User+'@'+self.owner.Server+'/'+self.owner.Resource
return self.on_auth(self._method)
self.DEBUG('Authentication failed!','error')
return self.on_auth(None)
def authComponent(self,owner):
''' Authenticate component. Send handshake stanza and wait for result. Returns "ok" on success. '''
self.handshake=0
owner.send(Node(NS_COMPONENT_ACCEPT+' handshake',
payload=[sha.new(owner.Dispatcher.Stream._document_attrs['id']+self.password).hexdigest()]))
owner.RegisterHandler('handshake', self.handshakeHandler, xmlns=NS_COMPONENT_ACCEPT)
self._owner.onreceive(self._on_auth_component)
def _on_auth_component(self, data):
''' called when we receive some response, after we send the handshake '''
if data:
self.Dispatcher.ProcessNonBlocking(data)
if not self.handshake:
self.DEBUG('waiting on handshake', 'notify')
return
self._owner.onreceive(None)
owner._registered_name=self.user
if self.handshake+1:
return self.on_auth('ok')
self.on_auth(None)
def handshakeHandler(self,disp,stanza):
''' Handler for registering in dispatcher for accepting transport authentication. '''
if stanza.getName() == 'handshake':
self.handshake=1
else:
self.handshake=-1
class NonBlockingBind(Bind):
''' Bind some JID to the current connection to allow router know of our location.'''
def plugin(self, owner):
''' Start resource binding, if allowed at this time. Used internally. '''
if self._owner.Dispatcher.Stream.features:
try:
self.FeaturesHandler(self._owner.Dispatcher, self._owner.Dispatcher.Stream.features)
except NodeProcessed:
pass
else: self._owner.RegisterHandler('features', self.FeaturesHandler, xmlns=NS_STREAMS)
def plugout(self):
''' Remove Bind handler from owner's dispatcher. Used internally. '''
self._owner.UnregisterHandler('features', self.FeaturesHandler, xmlns=NS_STREAMS)
def NonBlockingBind(self, resource=None, on_bound=None):
''' Perform binding. Use provided resource name or random (if not provided). '''
self.on_bound = on_bound
self._resource = resource
if self._resource:
self._resource = [Node('resource', payload=[self._resource])]
else:
self._resource = []
self._owner.onreceive(None)
resp=self._owner.Dispatcher.SendAndWaitForResponse(
Protocol('iq',typ='set',
payload=[Node('bind', attrs={'xmlns':NS_BIND}, payload=self._resource)]),
func=self._on_bound)
def _on_bound(self, resp):
if isResultNode(resp):
self.bound.append(resp.getTag('bind').getTagData('jid'))
self.DEBUG('Successfully bound %s.'%self.bound[-1],'ok')
jid=JID(resp.getTag('bind').getTagData('jid'))
self._owner.User=jid.getNode()
self._owner.Resource=jid.getResource()
self._owner.SendAndWaitForResponse(Protocol('iq', typ='set',
payload=[Node('session', attrs={'xmlns':NS_SESSION})]), func=self._on_session)
elif resp:
self.DEBUG('Binding failed: %s.' % resp.getTag('error'),'error')
self.on_bound(None)
else:
self.DEBUG('Binding failed: timeout expired.', 'error')
self.on_bound(None)
def _on_session(self, resp):
self._owner.onreceive(None)
if isResultNode(resp):
self.DEBUG('Successfully opened session.', 'ok')
self.session = 1
self.on_bound('ok')
else:
self.DEBUG('Session open failed.', 'error')
self.session = 0
self.on_bound(None)
self._owner.onreceive(None)
if isResultNode(resp):
self.DEBUG('Successfully opened session.', 'ok')
self.session = 1
self.on_bound('ok')
else:
self.DEBUG('Session open failed.', 'error')
self.session = 0
self.on_bound(None)
class NBComponentBind(ComponentBind):
''' ComponentBind some JID to the current connection to allow
router know of our location.
'''
def plugin(self,owner):
''' Start resource binding, if allowed at this time. Used internally. '''
if self._owner.Dispatcher.Stream.features:
try:
self.FeaturesHandler(self._owner.Dispatcher, self._owner.Dispatcher.Stream.features)
except NodeProcessed:
pass
else:
self._owner.RegisterHandler('features', self.FeaturesHandler, xmlns=NS_STREAMS)
self.needsUnregister = 1
def plugout(self):
''' Remove ComponentBind handler from owner's dispatcher. Used internally. '''
if self.needsUnregister:
self._owner.UnregisterHandler('features', self.FeaturesHandler, xmlns=NS_STREAMS)
def Bind(self, domain = None, on_bind = None):
''' Perform binding. Use provided domain name (if not provided). '''
self._owner.onreceive(self._on_bound)
self.on_bind = on_bind
def _on_bound(self, resp):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if self.bound is None:
return
self._owner.onreceive(None)
self._owner.SendAndWaitForResponse(
Protocol('bind', attrs={'name':domain}, xmlns=NS_COMPONENT_1),
func=self._on_bind_reponse)
def _on_bind_reponse(self, res):
if resp and resp.getAttr('error'):
self.DEBUG('Binding failed: %s.' % resp.getAttr('error'), 'error')
elif resp:
self.DEBUG('Successfully bound.', 'ok')
if self.on_bind:
self.on_bind('ok')
else:
self.DEBUG('Binding failed: timeout expired.', 'error')
if self.on_bind:
self.on_bind(None)

View File

@ -0,0 +1,389 @@
## client_nb.py
## based on client.py
##
## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
## modified by Dimitur Kirov <dkirov@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2, or (at your option)
## any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
# $Id: client.py,v 1.52 2006/01/02 19:40:55 normanr Exp $
'''
Provides PlugIn class functionality to develop extentions for xmpppy.
Also provides Client and Component classes implementations as the
examples of xmpppy structures usage.
These classes can be used for simple applications "AS IS" though.
'''
import socket
import debug
import transports_nb, dispatcher_nb, auth_nb, roster_nb
from client import *
class NBCommonClient(CommonClient):
''' Base for Client and Component classes.'''
def __init__(self, server, port=5222, debug=['always', 'nodebuilder'], caller=None,
on_connect=None, on_connect_failure=None):
''' Caches server name and (optionally) port to connect to. "debug" parameter specifies
the debug IDs that will go into debug output. You can either specifiy an "include"
or "exclude" list. The latter is done via adding "always" pseudo-ID to the list.
Full list: ['nodebuilder', 'dispatcher', 'gen_auth', 'SASL_auth', 'bind', 'socket',
'CONNECTproxy', 'TLS', 'roster', 'browser', 'ibb'] . '''
if self.__class__.__name__ == 'NonBlockingClient':
self.Namespace, self.DBG = 'jabber:client', DBG_CLIENT
elif self.__class__.__name__ == 'NBCommonClient':
self.Namespace, self.DBG = dispatcher_nb.NS_COMPONENT_ACCEPT, DBG_COMPONENT
self.defaultNamespace = self.Namespace
self.disconnect_handlers = []
self.Server = server
self.Port = port
# Who initiated this client
# Used to register the EventDispatcher
self._caller = caller
if debug and type(debug) != list:
debug = ['always', 'nodebuilder']
self._DEBUG = Debug.Debug(debug)
self.DEBUG = self._DEBUG.Show
self.debug_flags = self._DEBUG.debug_flags
self.debug_flags.append(self.DBG)
self._owner = self
self._registered_name = None
self.connected = ''
self._component=0
self.idlequeue = None
self.socket = None
self.on_connect = on_connect
self.on_connect_failure = on_connect_failure
#~ self.RegisterDisconnectHandler(self.DisconnectHandler)
def set_idlequeue(self, idlequeue):
self.idlequeue = idlequeue
def disconnected(self):
''' Called on disconnection. Calls disconnect handlers and cleans things up. '''
self.connected=''
self.DEBUG(self.DBG,'Disconnect detected','stop')
self.disconnect_handlers.reverse()
for i in self.disconnect_handlers:
i()
self.disconnect_handlers.reverse()
if self.__dict__.has_key('NonBlockingTLS'):
self.NonBlockingTLS.PlugOut()
def reconnectAndReauth(self):
''' Example of reconnection method. In fact, it can be used to batch connection and auth as well. '''
handlerssave=self.Dispatcher.dumpHandlers()
self.Dispatcher.PlugOut()
if self.__dict__.has_key('NonBlockingNonSASL'):
self.NonBlockingNonSASL.PlugOut()
if self.__dict__.has_key('SASL'):
self.SASL.PlugOut()
if self.__dict__.has_key('NonBlockingTLS'):
self.NonBlockingTLS.PlugOut()
if self.__dict__.has_key('NBHTTPPROXYsocket'):
self.NBHTTPPROXYsocket.PlugOut()
if self.__dict__.has_key('NonBlockingTcp'):
self.NonBlockingTcp.PlugOut()
if not self.connect(server=self._Server, proxy=self._Proxy):
return
if not self.auth(self._User, self._Password, self._Resource):
return
self.Dispatcher.restoreHandlers(handlerssave)
return self.connected
def connect(self,server=None,proxy=None, ssl=None, on_stream_start = None):
''' Make a tcp/ip connection, protect it with tls/ssl if possible and start XMPP stream. '''
if not server:
server = (self.Server, self.Port)
self._Server, self._Proxy, self._Ssl = server , proxy, ssl
self.on_stream_start = on_stream_start
if proxy:
self.socket = transports_nb.NBHTTPPROXYsocket(self._on_connected,
self._on_connected_failure, proxy, server)
else:
self.connected = 'tcp'
self.socket = transports_nb.NonBlockingTcp(self._on_connected,
self._on_connected_failure, server)
self.socket.PlugIn(self)
return True
def get_attrs(self, on_stream_start):
self.on_stream_start = on_stream_start
self.onreceive(self._on_receive_document_attrs)
def _on_connected_failure(self):
if self.socket:
self.socket.PlugOut()
if self.on_connect_failure:
self.on_connect_failure()
def _on_connected(self):
self.connected = 'tcp'
if self._Ssl or self.Connection.getPort() in (5223, 443):
try:
transports_nb.NonBlockingTLS().PlugIn(self, now=1)
self.connected = 'ssl'
except socket.sslerror:
return
self.onreceive(self._on_receive_document_attrs)
dispatcher_nb.Dispatcher().PlugIn(self)
def _on_receive_document_attrs(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if not hasattr(self, 'Dispatcher') or \
self.Dispatcher.Stream._document_attrs is None:
return
self.onreceive(None)
if self.Dispatcher.Stream._document_attrs.has_key('version') and \
self.Dispatcher.Stream._document_attrs['version'] == '1.0':
self.onreceive(self._on_receive_stream_features)
return
if self.on_stream_start:
self.on_stream_start()
self.on_stream_start = None
return True
def _on_receive_stream_features(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if not self.Dispatcher.Stream.features:
return
# pass # If we get version 1.0 stream the features tag MUST BE presented
self.onreceive(None)
if self.on_stream_start:
self.on_stream_start()
self.on_stream_start = None
return True
class NonBlockingClient(NBCommonClient):
''' Example client class, based on CommonClient. '''
def connect(self,server=None,proxy=None,secure=None,use_srv=True):
''' Connect to jabber server. If you want to specify different ip/port to connect to you can
pass it as tuple as first parameter. If there is HTTP proxy between you and server
specify it's address and credentials (if needed) in the second argument.
If you want ssl/tls support to be discovered and enable automatically - leave third argument as None. (ssl will be autodetected only if port is 5223 or 443)
If you want to force SSL start (i.e. if port 5223 or 443 is remapped to some non-standard port) then set it to 1.
If you want to disable tls/ssl support completely, set it to 0.
Example: connect(('192.168.5.5',5222),{'host':'proxy.my.net','port':8080,'user':'me','password':'secret'})
Returns '' or 'tcp' or 'tls', depending on the result.'''
self.__secure = secure
self.Connection = None
NBCommonClient.connect(self, server = server, proxy = proxy,
on_stream_start = self._on_tcp_stream_start)
return self.connected
def _is_connected(self):
self.onreceive(None)
if self.on_connect:
self.on_connect(self, self.connected)
self.on_connect = None
def _on_tcp_stream_start(self):
if not self.connected or self.__secure is not None and not self.__secure:
self._is_connected()
return True
self.isplugged = True
self.onreceive(None)
transports_nb.NonBlockingTLS().PlugIn(self)
if not self.Dispatcher.Stream._document_attrs.has_key('version') or \
not self.Dispatcher.Stream._document_attrs['version']=='1.0':
self._is_connected()
return
if not self.Dispatcher.Stream.features.getTag('starttls'):
self._is_connected()
return
self.onreceive(self._on_receive_starttls)
def _on_receive_starttls(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if not self.NonBlockingTLS.starttls:
return
self.onreceive(None)
if not hasattr(self, 'NonBlockingTLS') or self.NonBlockingTLS.starttls != 'success':
self.event('tls_failed')
self._is_connected()
return
self.connected = 'tls'
self.onreceive(None)
self._is_connected()
return True
def auth(self, user, password, resource = '', sasl = 1, on_auth = None):
''' Authenticate connnection and bind resource. If resource is not provided
random one or library name used. '''
self._User, self._Password, self._Resource, self._sasl = user, password, resource, sasl
self.on_auth = on_auth
self.get_attrs(self._on_doc_attrs)
return
def _on_old_auth(self, res):
if res:
self.connected += '+old_auth'
self.on_auth(self, 'old_auth')
else:
self.on_auth(self, None)
def _on_doc_attrs(self):
if self._sasl:
auth_nb.SASL(self._User, self._Password, self._on_start_sasl).PlugIn(self)
if not self._sasl or self.SASL.startsasl == 'not-supported':
if not self._Resource:
self._Resource = 'xmpppy'
auth_nb.NonBlockingNonSASL(self._User, self._Password, self._Resource, self._on_old_auth).PlugIn(self)
return
self.onreceive(self._on_start_sasl)
self.SASL.auth()
return True
def _on_start_sasl(self, data=None):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if not self.__dict__.has_key('SASL'):
# SASL is pluged out, possible disconnect
return
if self.SASL.startsasl == 'in-process':
return
self.onreceive(None)
if self.SASL.startsasl == 'failure':
# wrong user/pass, stop auth
self.connected = None
self._on_sasl_auth(None)
self.SASL.PlugOut()
elif self.SASL.startsasl == 'success':
auth_nb.NonBlockingBind().PlugIn(self)
self.onreceive(self._on_auth_bind)
return True
def _on_auth_bind(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if self.NonBlockingBind.bound is None:
return
self.NonBlockingBind.NonBlockingBind(self._Resource, self._on_sasl_auth)
return True
def _on_sasl_auth(self, res):
self.onreceive(None)
if res:
self.connected += '+sasl'
self.on_auth(self, 'sasl')
else:
self.on_auth(self, None)
def initRoster(self):
''' Plug in the roster. '''
if not self.__dict__.has_key('NonBlockingRoster'):
roster_nb.NonBlockingRoster().PlugIn(self)
def getRoster(self, on_ready = None):
''' Return the Roster instance, previously plugging it in and
requesting roster from server if needed. '''
if self.__dict__.has_key('NonBlockingRoster'):
return self.NonBlockingRoster.getRoster(on_ready)
return None
def sendPresence(self, jid=None, typ=None, requestRoster=0):
''' Send some specific presence state.
Can also request roster from server if according agrument is set.'''
if requestRoster: roster_nb.NonBlockingRoster().PlugIn(self)
self.send(dispatcher_nb.Presence(to=jid, typ=typ))
class Component(NBCommonClient):
''' Component class. The only difference from CommonClient is ability to perform component authentication. '''
def __init__(self, server, port=5347, typ=None, debug=['always', 'nodebuilder'],
domains=None, component=0, on_connect = None, on_connect_failure = None):
''' Init function for Components.
As components use a different auth mechanism which includes the namespace of the component.
Jabberd1.4 and Ejabberd use the default namespace then for all client messages.
Jabberd2 uses jabber:client.
'server' argument is a server name that you are connecting to (f.e. "localhost").
'port' can be specified if 'server' resolves to correct IP. If it is not then you'll need to specify IP
and port while calling "connect()".'''
NBCommonClient.__init__(self, server, port=port, debug=debug)
self.typ = typ
self.component=component
if domains:
self.domains=domains
else:
self.domains=[server]
self.on_connect_component = on_connect
self.on_connect_failure = on_connect_failure
def connect(self, server=None, proxy=None):
''' This will connect to the server, and if the features tag is found then set
the namespace to be jabber:client as that is required for jabberd2.
'server' and 'proxy' arguments have the same meaning as in xmpp.Client.connect() '''
if self.component:
self.Namespace=auth.NS_COMPONENT_1
self.Server=server[0]
NBCommonClient.connect(self, server=server, proxy=proxy,
on_connect = self._on_connect, on_connect_failure = self.on_connect_failure)
def _on_connect(self):
if self.typ=='jabberd2' or not self.typ and self.Dispatcher.Stream.features != None:
self.defaultNamespace=auth.NS_CLIENT
self.Dispatcher.RegisterNamespace(self.defaultNamespace)
self.Dispatcher.RegisterProtocol('iq',dispatcher.Iq)
self.Dispatcher.RegisterProtocol('message',dispatcher_nb.Message)
self.Dispatcher.RegisterProtocol('presence',dispatcher_nb.Presence)
self.on_connect(self.connected)
def auth(self, name, password, dup=None, sasl=0):
''' Authenticate component "name" with password "password".'''
self._User, self._Password, self._Resource=name, password,''
try:
if self.component:
sasl=1
if sasl:
auth.SASL(name,password).PlugIn(self)
if not sasl or self.SASL.startsasl=='not-supported':
if auth.NonSASL(name,password,'').PlugIn(self):
self.connected+='+old_auth'
return 'old_auth'
return
self.SASL.auth()
self.onreceive(self._on_auth_component)
except:
self.DEBUG(self.DBG,"Failed to authenticate %s" % name,'error')
def _on_auth_component(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if self.SASL.startsasl == 'in-process':
return
if self.SASL.startsasl =='success':
if self.component:
self._component = self.component
auth.NBComponentBind().PlugIn(self)
self.onreceive(_on_component_bind)
self.connected += '+sasl'
else:
raise auth.NotAuthorized(self.SASL.startsasl)
def _on_component_bind(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if self.NBComponentBind.bound is None:
return
for domain in self.domains:
self.NBComponentBind.Bind(domain, _on_component_bound)
def _on_component_bound(self, resp):
self.NBComponentBind.PlugOut()

View File

@ -0,0 +1,410 @@
## dispatcher_nb.py
## based on dispatcher.py
##
## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
## modified by Dimitur Kirov <dkirov@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2, or (at your option)
## any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
# $Id: dispatcher.py,v 1.40 2006/01/18 19:26:43 normanr Exp $
'''
Main xmpppy mechanism. Provides library with methods to assign different handlers
to different XMPP stanzas.
Contains one tunable attribute: DefaultTimeout (25 seconds by default). It defines time that
Dispatcher.SendAndWaitForResponce method will wait for reply stanza before giving up.
'''
import simplexml, sys
from protocol import *
from client import PlugIn
# default timeout to wait for response for our id
DEFAULT_TIMEOUT_SECONDS = 25
ID = 0
class Dispatcher(PlugIn):
''' Ancestor of PlugIn class. Handles XMPP stream, i.e. aware of stream headers.
Can be plugged out/in to restart these headers (used for SASL f.e.). '''
def __init__(self):
PlugIn.__init__(self)
DBG_LINE='dispatcher'
self.handlers={}
self._expected={}
self._defaultHandler=None
self._pendingExceptions=[]
self._eventHandler=None
self._cycleHandlers=[]
self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler, \
self.RegisterEventHandler, self.UnregisterCycleHandler, self.RegisterCycleHandler, \
self.RegisterHandlerOnce, self.UnregisterHandler, self.RegisterProtocol, \
self.SendAndWaitForResponse, self.send,self.disconnect, \
self.SendAndCallForResponse, self.getAnID, self.Event]
def getAnID(self):
global ID
ID += 1
return `ID`
def dumpHandlers(self):
''' Return set of user-registered callbacks in it's internal format.
Used within the library to carry user handlers set over Dispatcher replugins. '''
return self.handlers
def restoreHandlers(self, handlers):
''' Restores user-registered callbacks structure from dump previously obtained via dumpHandlers.
Used within the library to carry user handlers set over Dispatcher replugins. '''
self.handlers = handlers
def _init(self):
''' Registers default namespaces/protocols/handlers. Used internally. '''
self.RegisterNamespace('unknown')
self.RegisterNamespace(NS_STREAMS)
self.RegisterNamespace(self._owner.defaultNamespace)
self.RegisterProtocol('iq', Iq)
self.RegisterProtocol('presence', Presence)
self.RegisterProtocol('message', Message)
self.RegisterDefaultHandler(self.returnStanzaHandler)
# Register Gajim's event handler as soon as dispatcher begins
self.RegisterEventHandler(self._owner._caller._event_dispatcher)
self.on_responses = {}
def plugin(self, owner):
''' Plug the Dispatcher instance into Client class instance and send initial stream header. Used internally.'''
self._init()
self._owner.lastErrNode = None
self._owner.lastErr = None
self._owner.lastErrCode = None
self.StreamInit()
def plugout(self):
''' Prepares instance to be destructed. '''
self.Stream.dispatch = None
self.Stream.DEBUG = None
self.Stream.features = None
self.Stream.destroy()
def StreamInit(self):
''' Send an initial stream header. '''
self.Stream = simplexml.NodeBuilder()
self.Stream._dispatch_depth = 2
self.Stream.dispatch = self.dispatch
self.Stream.stream_header_received = self._check_stream_start
self._owner.debug_flags.append(simplexml.DBG_NODEBUILDER)
self.Stream.DEBUG = self._owner.DEBUG
self.Stream.features = None
self._metastream = Node('stream:stream')
self._metastream.setNamespace(self._owner.Namespace)
self._metastream.setAttr('version', '1.0')
self._metastream.setAttr('xmlns:stream', NS_STREAMS)
self._metastream.setAttr('to', self._owner.Server)
self._owner.send("<?xml version='1.0'?>%s>" % str(self._metastream)[:-2])
def _check_stream_start(self, ns, tag, attrs):
if ns<>NS_STREAMS or tag<>'stream':
raise ValueError('Incorrect stream start: (%s,%s). Terminating.' % (tag, ns))
def ProcessNonBlocking(self, data=None):
''' Check incoming stream for data waiting. If "timeout" is positive - block for as max. this time.
Returns:
1) length of processed data if some data were processed;
2) '0' string if no data were processed but link is alive;
3) 0 (zero) if underlying connection is closed.'''
for handler in self._cycleHandlers:
handler(self)
if len(self._pendingExceptions) > 0:
_pendingException = self._pendingExceptions.pop()
raise _pendingException[0], _pendingException[1], _pendingException[2]
self.Stream.Parse(data)
if len(self._pendingExceptions) > 0:
_pendingException = self._pendingExceptions.pop()
raise _pendingException[0], _pendingException[1], _pendingException[2]
return len(data)
def RegisterNamespace(self, xmlns, order='info'):
''' Creates internal structures for newly registered namespace.
You can register handlers for this namespace afterwards. By default one namespace
already registered (jabber:client or jabber:component:accept depending on context. '''
self.DEBUG('Registering namespace "%s"' % xmlns, order)
self.handlers[xmlns]={}
self.RegisterProtocol('unknown', Protocol, xmlns=xmlns)
self.RegisterProtocol('default', Protocol, xmlns=xmlns)
def RegisterProtocol(self, tag_name, Proto, xmlns=None, order='info'):
''' Used to declare some top-level stanza name to dispatcher.
Needed to start registering handlers for such stanzas.
Iq, message and presence protocols are registered by default. '''
if not xmlns: xmlns=self._owner.defaultNamespace
self.DEBUG('Registering protocol "%s" as %s(%s)' %
(tag_name, Proto, xmlns), order)
self.handlers[xmlns][tag_name]={type:Proto, 'default':[]}
def RegisterNamespaceHandler(self, xmlns, handler, typ='', ns='', makefirst=0, system=0):
''' Register handler for processing all stanzas for specified namespace. '''
self.RegisterHandler('default', handler, typ, ns, xmlns, makefirst, system)
def RegisterHandler(self, name, handler, typ='', ns='', xmlns=None, makefirst=0, system=0):
'''Register user callback as stanzas handler of declared type. Callback must take
(if chained, see later) arguments: dispatcher instance (for replying), incomed
return of previous handlers.
The callback must raise xmpp.NodeProcessed just before return if it want preven
callbacks to be called with the same stanza as argument _and_, more importantly
library from returning stanza to sender with error set (to be enabled in 0.2 ve
Arguments:
"name" - name of stanza. F.e. "iq".
"handler" - user callback.
"typ" - value of stanza's "type" attribute. If not specified any value match
"ns" - namespace of child that stanza must contain.
"chained" - chain together output of several handlers.
"makefirst" - insert handler in the beginning of handlers list instead of
adding it to the end. Note that more common handlers (i.e. w/o "typ" and "
will be called first nevertheless.
"system" - call handler even if NodeProcessed Exception were raised already.
'''
if not xmlns:
xmlns=self._owner.defaultNamespace
self.DEBUG('Registering handler %s for "%s" type->%s ns->%s(%s)' %
(handler, name, typ, ns, xmlns), 'info')
if not typ and not ns:
typ='default'
if not self.handlers.has_key(xmlns):
self.RegisterNamespace(xmlns,'warn')
if not self.handlers[xmlns].has_key(name):
self.RegisterProtocol(name,Protocol,xmlns,'warn')
if not self.handlers[xmlns][name].has_key(typ+ns):
self.handlers[xmlns][name][typ+ns]=[]
if makefirst:
self.handlers[xmlns][name][typ+ns].insert(0,{'func':handler,'system':system})
else:
self.handlers[xmlns][name][typ+ns].append({'func':handler,'system':system})
def RegisterHandlerOnce(self,name,handler,typ='',ns='',xmlns=None,makefirst=0, system=0):
''' Unregister handler after first call (not implemented yet). '''
if not xmlns:
xmlns=self._owner.defaultNamespace
self.RegisterHandler(name, handler, typ, ns, xmlns, makefirst, system)
def UnregisterHandler(self, name, handler, typ='', ns='', xmlns=None):
''' Unregister handler. "typ" and "ns" must be specified exactly the same as with registering.'''
if not xmlns:
xmlns=self._owner.defaultNamespace
if not typ and not ns:
typ='default'
if not self.handlers[xmlns].has_key(name):
return
if not self.handlers[xmlns][name].has_key(typ+ns):
return
for pack in self.handlers[xmlns][name][typ+ns]:
if handler==pack['func']:
break
else:
pack=None
try:
self.handlers[xmlns][name][typ+ns].remove(pack)
except ValueError:
pass
def RegisterDefaultHandler(self,handler):
''' Specify the handler that will be used if no NodeProcessed exception were raised.
This is returnStanzaHandler by default. '''
self._defaultHandler=handler
def RegisterEventHandler(self,handler):
''' Register handler that will process events. F.e. "FILERECEIVED" event. '''
self._eventHandler=handler
def returnStanzaHandler(self,conn,stanza):
''' Return stanza back to the sender with <feature-not-implemennted/> error set. '''
if stanza.getType() in ['get','set']:
conn.send(Error(stanza,ERR_FEATURE_NOT_IMPLEMENTED))
def streamErrorHandler(self,conn,error):
name,text='error',error.getData()
for tag in error.getChildren():
if tag.getNamespace()==NS_XMPP_STREAMS:
if tag.getName()=='text':
text=tag.getData()
else:
name=tag.getName()
if name in stream_exceptions.keys():
exc=stream_exceptions[name]
else:
exc=StreamError
raise exc((name,text))
def RegisterCycleHandler(self, handler):
''' Register handler that will be called on every Dispatcher.Process() call. '''
if handler not in self._cycleHandlers:
self._cycleHandlers.append(handler)
def UnregisterCycleHandler(self, handler):
''' Unregister handler that will is called on every Dispatcher.Process() call.'''
if handler in self._cycleHandlers:
self._cycleHandlers.remove(handler)
def Event(self, realm, event, data):
''' Raise some event. Takes three arguments:
1) "realm" - scope of event. Usually a namespace.
2) "event" - the event itself. F.e. "SUCESSFULL SEND".
3) data that comes along with event. Depends on event.'''
if self._eventHandler: self._eventHandler(realm,event,data)
def dispatch(self, stanza, session=None, direct=0):
''' Main procedure that performs XMPP stanza recognition and calling apppropriate handlers for it.
Called internally. '''
if not session:
session = self
session.Stream._mini_dom = None
name = stanza.getName()
if not direct and self._owner._component:
if name == 'route':
if stanza.getAttr('error') == None:
if len(stanza.getChildren()) == 1:
stanza = stanza.getChildren()[0]
name=stanza.getName()
else:
for each in stanza.getChildren():
self.dispatch(each,session,direct=1)
return
elif name == 'presence':
return
elif name in ('features','bind'):
pass
else:
raise UnsupportedStanzaType(name)
if name=='features':
session.Stream.features=stanza
xmlns=stanza.getNamespace()
if not self.handlers.has_key(xmlns):
self.DEBUG("Unknown namespace: " + xmlns, 'warn')
xmlns='unknown'
if not self.handlers[xmlns].has_key(name):
self.DEBUG("Unknown stanza: " + name, 'warn')
name='unknown'
else:
self.DEBUG("Got %s/%s stanza" % (xmlns, name), 'ok')
if stanza.__class__.__name__=='Node':
stanza=self.handlers[xmlns][name][type](node=stanza)
typ=stanza.getType()
if not typ: typ=''
stanza.props=stanza.getProperties()
ID=stanza.getID()
session.DEBUG("Dispatching %s stanza with type->%s props->%s id->%s"%(name,typ,stanza.props,ID),'ok')
list=['default'] # we will use all handlers:
if self.handlers[xmlns][name].has_key(typ): list.append(typ) # from very common...
for prop in stanza.props:
if self.handlers[xmlns][name].has_key(prop): list.append(prop)
if typ and self.handlers[xmlns][name].has_key(typ+prop): list.append(typ+prop) # ...to very particular
chain=self.handlers[xmlns]['default']['default']
for key in list:
if key: chain = chain + self.handlers[xmlns][name][key]
output=''
if session._expected.has_key(ID):
user=0
if type(session._expected[ID]) == type(()):
cb,args = session._expected[ID]
session.DEBUG("Expected stanza arrived. Callback %s(%s) found!" % (cb, args), 'ok')
try:
cb(session,stanza,**args)
except Exception, typ:
if typ.__class__.__name__ <>'NodeProcessed': raise
else:
session.DEBUG("Expected stanza arrived!",'ok')
session._expected[ID]=stanza
else:
user=1
for handler in chain:
if user or handler['system']:
try:
handler['func'](session,stanza)
except Exception, typ:
if typ.__class__.__name__ <> 'NodeProcessed':
self._pendingExceptions.insert(0, sys.exc_info())
return
user=0
if user and self._defaultHandler:
self._defaultHandler(session, stanza)
def WaitForData(self, data):
if data is None:
return
res = self.ProcessNonBlocking(data)
self._owner.remove_timeout()
if self._expected[self._witid] is None:
return
if self.on_responses.has_key(self._witid):
self._owner.onreceive(None)
resp, args = self.on_responses[self._witid]
del(self.on_responses[self._witid])
if args is None:
resp(self._expected[self._witid])
else:
resp(self._owner, self._expected[self._witid], **args)
def SendAndWaitForResponse(self, stanza, timeout=None, func=None, args=None):
''' Put stanza on the wire and wait for recipient's response to it. '''
if timeout is None:
timeout = DEFAULT_TIMEOUT_SECONDS
self._witid = self.send(stanza)
if func:
self.on_responses[self._witid] = (func, args)
if timeout:
self._owner.set_timeout(timeout)
self._owner.onreceive(self.WaitForData)
self._expected[self._witid] = None
return self._witid
def SendAndCallForResponse(self, stanza, func=None, args=None):
''' Put stanza on the wire and call back when recipient replies.
Additional callback arguments can be specified in args. '''
self.SendAndWaitForResponse(stanza, 0, func, args)
def send(self, stanza):
''' Serialise stanza and put it on the wire. Assign an unique ID to it before send.
Returns assigned ID.'''
if type(stanza) in [type(''), type(u'')]:
return self._owner.Connection.send(stanza)
if not isinstance(stanza, Protocol):
_ID=None
elif not stanza.getID():
global ID
ID+=1
_ID=`ID`
stanza.setID(_ID)
else:
_ID=stanza.getID()
if self._owner._registered_name and not stanza.getAttr('from'):
stanza.setAttr('from', self._owner._registered_name)
if self._owner._component and stanza.getName() != 'bind':
to=self._owner.Server
if stanza.getTo() and stanza.getTo().getDomain():
to=stanza.getTo().getDomain()
frm=stanza.getFrom()
if frm.getDomain():
frm=frm.getDomain()
route=Protocol('route', to=to, frm=frm, payload=[stanza])
stanza=route
stanza.setNamespace(self._owner.Namespace)
stanza.setParent(self._metastream)
self._owner.Connection.send(stanza)
return _ID
def disconnect(self):
''' Send a stream terminator. '''
self._owner.Connection.send('</stream:stream>')

View File

@ -0,0 +1,228 @@
## features.py
##
## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2, or (at your option)
## any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
# $Id: features.py,v 1.22 2005/09/30 20:13:04 mikealbon Exp $
from features import REGISTER_DATA_RECEIVED
from protocol import *
def _on_default_response(disp, iq, cb):
def _on_response(resp):
if isResultNode(resp):
if cb:
cb(1)
elif cb:
cb(False)
disp.SendAndCallForResponse(iq, _on_response)
def _discover(disp, ns, jid, node = None, fb2b=0, fb2a=1, cb=None):
""" Try to obtain info from the remote object.
If remote object doesn't support disco fall back to browse (if fb2b is true)
and if it doesnt support browse (or fb2b is not true) fall back to agents protocol
(if gb2a is true). Returns obtained info. Used internally. """
iq=Iq(to=jid, typ='get', queryNS=ns)
if node:
iq.setAttr('node',node)
def _on_resp1(resp):
if fb2b and not isResultNode(resp):
# Fallback to browse
disp.SendAndCallForResponse(Iq(to=jid,typ='get',queryNS=NS_BROWSE), _on_resp2)
else:
_on_resp2('')
def _on_resp2(resp):
if fb2a and not isResultNode(resp):
# Fallback to agents
disp.SendAndCallForResponse(Iq(to=jid,typ='get',queryNS=NS_AGENTS), _on_result)
else:
_on_result('')
def _on_result(resp):
if isResultNode(resp):
if cb:
cb(resp.getQueryPayload())
elif cb:
cb([])
disp.SendAndCallForResponse(iq, _on_resp1)
# this function is not used in gajim ???
def discoverItems(disp,jid,node=None, cb=None):
""" Query remote object about any items that it contains. Return items list. """
""" According to JEP-0030:
query MAY have node attribute
item: MUST HAVE jid attribute and MAY HAVE name, node, action attributes.
action attribute of item can be either of remove or update value."""
def _on_response(result_array):
ret=[]
for result in result_array:
if result.getName()=='agent' and result.getTag('name'):
result.setAttr('name', result.getTagData('name'))
ret.append(result.attrs)
if cb:
cb(ret)
_discover(disp, NS_DISCO_ITEMS, jid, node, _on_response)
def discoverInfo(disp,jid,node=None, cb=None):
""" Query remote object about info that it publishes. Returns identities and features lists."""
""" According to JEP-0030:
query MAY have node attribute
identity: MUST HAVE category and name attributes and MAY HAVE type attribute.
feature: MUST HAVE var attribute"""
def _on_response(result):
identities , features = [] , []
for i in result:
if i.getName()=='identity':
identities.append(i.attrs)
elif i.getName()=='feature':
features.append(i.getAttr('var'))
elif i.getName()=='agent':
if i.getTag('name'):
i.setAttr('name',i.getTagData('name'))
if i.getTag('description'):
i.setAttr('name',i.getTagData('description'))
identities.append(i.attrs)
if i.getTag('groupchat'):
features.append(NS_GROUPCHAT)
if i.getTag('register'):
features.append(NS_REGISTER)
if i.getTag('search'):
features.append(NS_SEARCH)
if cb:
cb(identities , features)
_discover(disp, NS_DISCO_INFO, jid, node, _on_response)
### Registration ### jabber:iq:register ### JEP-0077 ###########################
def getRegInfo(disp, host, info={}, sync=True):
""" Gets registration form from remote host.
You can pre-fill the info dictionary.
F.e. if you are requesting info on registering user joey than specify
info as {'username':'joey'}. See JEP-0077 for details.
'disp' must be connected dispatcher instance."""
iq=Iq('get',NS_REGISTER,to=host)
for i in info.keys():
iq.setTagData(i,info[i])
if sync:
disp.SendAndCallForResponse(iq, lambda resp: _ReceivedRegInfo(disp.Dispatcher,resp, host))
else:
disp.SendAndCallForResponse(iq, _ReceivedRegInfo, {'agent': host })
def _ReceivedRegInfo(con, resp, agent):
iq=Iq('get',NS_REGISTER,to=agent)
if not isResultNode(resp):
return
df=resp.getTag('query',namespace=NS_REGISTER).getTag('x',namespace=NS_DATA)
if df:
con.Event(NS_REGISTER,REGISTER_DATA_RECEIVED,(agent,DataForm(node=df),True))
return
df=DataForm(typ='form')
for i in resp.getQueryPayload():
if type(i)<>type(iq):
pass
elif i.getName()=='instructions':
df.addInstructions(i.getData())
else:
df.setField(i.getName()).setValue(i.getData())
con.Event(NS_REGISTER, REGISTER_DATA_RECEIVED, (agent,df,False))
def register(disp, host, info, cb):
""" Perform registration on remote server with provided info.
disp must be connected dispatcher instance.
Returns true or false depending on registration result.
If registration fails you can get additional info from the dispatcher's owner
attributes lastErrNode, lastErr and lastErrCode.
"""
iq=Iq('set', NS_REGISTER, to=host)
if not isinstance(info, dict):
info=info.asDict()
for i in info.keys():
iq.setTag('query').setTagData(i,info[i])
_on_default_response(disp, iq, cb)
def unregister(disp, host, cb):
""" Unregisters with host (permanently removes account).
disp must be connected and authorized dispatcher instance.
Returns true on success."""
iq = Iq('set', NS_REGISTER, to=host, payload=[Node('remove')])
_on_default_response(disp, iq, cb)
def changePasswordTo(disp, newpassword, host=None, cb = None):
""" Changes password on specified or current (if not specified) server.
disp must be connected and authorized dispatcher instance.
Returns true on success."""
if not host: host=disp._owner.Server
iq = Iq('set',NS_REGISTER,to=host, payload=[Node('username',
payload=[disp._owner.Server]),Node('password',payload=[newpassword])])
_on_default_response(disp, iq, cb)
### Privacy ### jabber:iq:privacy ### draft-ietf-xmpp-im-19 ####################
#type=[jid|group|subscription]
#action=[allow|deny]
def getPrivacyLists(disp, cb):
""" Requests privacy lists from connected server.
Returns dictionary of existing lists on success."""
iq = Iq('get', NS_PRIVACY)
def _on_response(resp):
dict = {'lists': []}
try:
if not isResultNode(resp):
cb(False)
return
for list in resp.getQueryPayload():
if list.getName()=='list':
dict['lists'].append(list.getAttr('name'))
else:
dict[list.getName()]=list.getAttr('name')
cb(dict)
except:
pass
cb(False)
disp.SendAndCallForResponse(iq, _on_respons)
def getPrivacyList(disp, listname, cb):
""" Requests specific privacy list listname. Returns list of XML nodes (rules)
taken from the server responce."""
def _on_response(resp):
try:
if isResultNode(resp):
return cb(resp.getQueryPayload()[0])
except:
pass
cb(False)
iq = Iq('get', NS_PRIVACY, payload=[Node('list', {'name': listname})])
disp.SendAndCallForResponse(iq, _on_response)
def setActivePrivacyList(disp, listname=None, typ='active', cb=None):
""" Switches privacy list 'listname' to specified type.
By default the type is 'active'. Returns true on success."""
if listname:
attrs={'name':listname}
else:
attrs={}
iq = Iq('set',NS_PRIVACY,payload=[Node(typ,attrs)])
_on_default_response(disp, iq, cb)
def setDefaultPrivacyList(disp, listname=None):
""" Sets the default privacy list as 'listname'. Returns true on success."""
return setActivePrivacyList(disp, listname,'default')
def setPrivacyList(disp, list, cb):
""" Set the ruleset. 'list' should be the simpleXML node formatted
according to RFC 3921 (XMPP-IM) (I.e. Node('list',{'name':listname},payload=[...]) )
Returns true on success."""
iq=Iq('set',NS_PRIVACY,payload=[list])
_on_default_response(disp, iq, cb)
def delPrivacyList(disp,listname, cb):
""" Deletes privacy list 'listname'. Returns true on success."""
iq = Iq('set',NS_PRIVACY,payload=[Node('list',{'name':listname})])
_on_default_response(disp, iq, cb)

View File

@ -0,0 +1,151 @@
## idlequeue.py
##
## Copyright (C) 2006 Dimitur Kirov <dkirov@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2, or (at your option)
## any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
import select
class IdleObject:
''' base class for all idle listeners, these are the methods, which are called from IdleQueue
'''
def __init__(self):
self.fd = -1
pass
def pollend(self):
''' called on stream failure '''
pass
def pollin(self):
''' called on new read event '''
pass
def pollout(self):
''' called on new write event (connect in sockets is a pollout) '''
pass
def read_timeout(self, fd):
''' called when timeout has happend '''
pass
class IdleQueue:
def __init__(self):
self.queue = {}
# when there is a timeout it executes obj.read_timeout()
# timeout is not removed automatically!
self.read_timeouts = {}
# cb, which are executed after XX sec., alarms are removed automatically
self.alarms = {}
self.init_idle()
def init_idle(self):
self.selector = select.poll()
def remove_timeout(self, fd):
''' self explanatory, remove the timeout from 'read_timeouts' dict '''
if self.read_timeouts.has_key(fd):
del(self.read_timeouts[fd])
def set_alarm(self, alarm_cb, seconds):
''' set up a new alarm, to be called after alarm_cb sec. '''
alarm_time = self.current_time() + seconds
# almost impossible, but in case we have another alarm_cb at this time
if self.alarms.has_key(alarm_time):
self.alarms[alarm_time].append(alarm_cb)
else:
self.alarms[alarm_time] = [alarm_cb]
def set_read_timeout(self, fd, seconds):
''' set a new timeout, if it is not removed after 'seconds',
then obj.read_timeout() will be called '''
timeout = self.current_time() + seconds
self.read_timeouts[fd] = timeout
def check_time_events(self):
current_time = self.current_time()
for fd, timeout in self.read_timeouts.items():
if timeout > current_time:
continue
if self.queue.has_key(fd):
self.queue[fd].read_timeout()
else:
self.remove_timeout(fd)
times = self.alarms.keys()
for alarm_time in times:
if alarm_time > current_time:
break
for cb in self.alarms[alarm_time]:
cb()
del(self.alarms[alarm_time])
def plug_idle(self, obj, writable = True, readable = True):
if self.queue.has_key(obj.fd):
self.unplug_idle(obj.fd)
self.queue[obj.fd] = obj
if writable:
if not readable:
flags = 4 # read only
else:
flags = 7 # both readable and writable
else:
flags = 3 # write only
flags |= 16 # hung up, closed channel
self.add_idle(obj.fd, flags)
def add_idle(self, fd, flags):
self.selector.register(fd, flags)
def unplug_idle(self, fd):
if self.queue.has_key(fd):
del(self.queue[fd])
self.remove_idle(fd)
def current_time(self):
from time import time
return time()
def remove_idle(self, fd):
self.selector.unregister(fd)
def process_events(self, fd, flags):
obj = self.queue.get(fd)
if obj is None:
self.unplug_idle(fd)
return False
if flags & 3: # waiting read event
obj.pollin()
return True
elif flags & 4: # waiting write event
obj.pollout()
return True
elif flags & 16: # closed channel
obj.pollend()
return False
def process(self):
if not self.queue:
return True
try:
waiting_descriptors = self.selector.poll(0)
except select.error, e:
waiting_descriptors = []
if e[0] != 4: # interrupt
raise
for fd, flags in waiting_descriptors:
self.process_events(fd, flags)
self.check_time_events()
return True

View File

@ -0,0 +1,59 @@
## roster_nb.py
## based on roster.py
##
## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
## modified by Dimitur Kirov <dkirov@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2, or (at your option)
## any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
# $Id: roster.py,v 1.17 2005/05/02 08:38:49 snakeru Exp $
'''
Simple roster implementation. Can be used though for different tasks like
mass-renaming of contacts.
'''
from roster import Roster
from protocol import NS_ROSTER
class NonBlockingRoster(Roster):
def plugin(self, owner, request=1):
''' Register presence and subscription trackers in the owner's dispatcher.
Also request roster from server if the 'request' argument is set.
Used internally.'''
self._owner.RegisterHandler('iq', self.RosterIqHandler, 'result', NS_ROSTER, makefirst = 1)
self._owner.RegisterHandler('iq', self.RosterIqHandler, 'set', NS_ROSTER)
self._owner.RegisterHandler('presence', self.PresenceHandler)
if request:
self.Request()
def _on_roster_set(self, data):
if data:
self._owner.Dispatcher.ProcessNonBlocking(data)
if not self.set:
return
self._owner.onreceive(None)
if self.on_ready:
self.on_ready(self)
self.on_ready = None
return True
def getRoster(self, on_ready=None):
''' Requests roster from server if neccessary and returns self. '''
if not self.set:
self.on_ready = on_ready
self._owner.onreceive(self._on_roster_set)
return
if on_ready:
on_ready(self)
on_ready = None
else:
return self

View File

@ -0,0 +1,481 @@
## transports_nb.py
## based on transports.py
##
## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov
## modified by Dimitur Kirov <dkirov@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2, or (at your option)
## any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
import socket,select,base64,dispatcher_nb
from simplexml import ustr
from client import PlugIn
from idlequeue import IdleObject
from protocol import *
from transports import *
import sys
import os
import errno
# timeout to connect to the server socket, it doesn't include auth
CONNECT_TIMEOUT_SECONDS = 30
# how long to wait for a disconnect to complete
DISCONNECT_TIMEOUT_SECONDS = 10
# size of the buffer which reads data from server
# if lower, more stanzas will be fragmented and processed twice
RECV_BUFSIZE = 1048576
class NonBlockingTcp(PlugIn, IdleObject):
''' This class can be used instead of transports.Tcp in threadless implementations '''
def __init__(self, on_connect = None, on_connect_failure = None, server=None, use_srv = True):
''' Cache connection point 'server'. 'server' is the tuple of (host, port)
absolutely the same as standard tcp socket uses.
on_connect - called when we connect to the socket
on_connect_failure - called if there was error connecting to socket
'''
IdleObject.__init__(self)
PlugIn.__init__(self)
self.DBG_LINE='socket'
self._exported_methods=[self.send, self.disconnect, self.onreceive, self.set_send_timeout,
self.start_disconnect, self.set_timeout, self.remove_timeout]
self._server = server
self.on_connect = on_connect
self.on_connect_failure = on_connect_failure
self.on_receive = None
self.on_disconnect = None
# 0 - not connected
# 1 - connected
# -1 - about to disconnect (when we wait for final events to complete)
# -2 - disconnected
self.state = 0
# queue with messages to be send
self.sendqueue = []
# bytes remained from the last send message
self.sendbuff = ''
# time to wait for SOME stanza to come and then send keepalive
self.sendtimeout = 0
# in case we want to something different than sending keepalives
self.on_timeout = None
# writable, readable - keep state of the last pluged flags
# This prevents replug of same object with the same flags
self.writable = True
self.readable = False
def plugin(self, owner):
''' Fire up connection. Return non-empty string on success.
Also registers self.disconnected method in the owner's dispatcher.
Called internally. '''
self.idlequeue = owner.idlequeue
if not self._server:
self._server=(self._owner.Server,5222)
if self.connect(self._server) is False:
return False
return True
def read_timeout(self):
if self.state == 0:
self.idlequeue.unplug_idle(self.fd)
if self.on_connect_failure:
self.on_connect_failure()
else:
if self.on_timeout:
self.on_timeout()
self.renew_send_timeout()
def connect(self,server=None, proxy = None, secure = None):
''' Try to establish connection. Returns non-empty string on success. '''
if not server:
server=self._server
else:
self._server = server
self.state = 0
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.setblocking(False)
except:
if self.on_connect_failure:
self.on_connect_failure()
return False
self.fd = self._sock.fileno()
self.idlequeue.plug_idle(self, True, False)
self.set_timeout(CONNECT_TIMEOUT_SECONDS)
self._do_connect()
return True
def _plug_idle(self):
readable = self.state != 0
if self.sendqueue or self.sendbuff:
writable = True
else:
writable = False
if self.writable != writable or self.readable != readable:
self.idlequeue.plug_idle(self, writable, readable)
def pollout(self):
if self.state == 0:
return self._do_connect()
return self._do_send()
def plugout(self):
''' Disconnect from the remote server and unregister self.disconnected method from
the owner's dispatcher. '''
self.disconnect()
self._owner.Connection = None
def pollin(self):
self._do_receive()
def pollend(self):
self.disconnect()
if self.on_connect_failure:
self.on_connect_failure()
self.on_connect_failure = None
def disconnect(self):
if self.state == -2: # already disconnected
return
self.state = -2
self.sendqueue = None
self.remove_timeout()
self._owner.disconnected()
self.idlequeue.unplug_idle(self.fd)
if self.on_disconnect:
self.on_disconnect()
def end_disconnect(self):
''' force disconnect only if we are still trying to disconnect '''
if self.state == -1:
self.disconnect()
def start_disconnect(self, to_send, on_disconnect):
self.on_disconnect = on_disconnect
self.sendqueue = []
self.send(to_send)
self.send('</stream:stream>')
self.state = -1 # about to disconnect
self.idlequeue.set_alarm(self.end_disconnect, DISCONNECT_TIMEOUT_SECONDS)
def set_timeout(self, timeout):
if self.state >= 0 and self.fd > 0:
self.idlequeue.set_read_timeout(self.fd, timeout)
def remove_timeout(self):
if self.fd:
self.idlequeue.remove_timeout(self.fd)
def onreceive(self, recv_handler):
if not recv_handler:
if hasattr(self._owner, 'Dispatcher'):
self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
else:
self.on_receive = None
return
_tmp = self.on_receive
# make sure this cb is not overriden by recursive calls
if not recv_handler(None) and _tmp == self.on_receive:
self.on_receive = recv_handler
def _do_receive(self):
''' Reads all pending incoming data. Calls owner's disconnected() method if appropriate.'''
received = ''
errnum = 0
try:
# get as many bites, as possible, but not more than RECV_BUFSIZE
received = self._recv(RECV_BUFSIZE)
except Exception, e:
if len(e.args) > 0 and isinstance(e.args[0], int):
errnum = e[0]
# "received" will be empty anyhow
if not received and errnum != 2:
if errnum != 8: # EOF occurred in violation of protocol
self.DEBUG('Socket error while receiving data', 'error')
if self.state >= 0:
self.disconnect()
return False
if self.state < 0:
return
# we have received some bites, stop the timeout!
self.renew_send_timeout()
if self.on_receive:
if received.strip():
self.DEBUG(received,'got')
if hasattr(self._owner, 'Dispatcher'):
self._owner.Dispatcher.Event('', DATA_RECEIVED, received)
self.on_receive(received)
else:
# This should never happed, so we need the debug
self.DEBUG('Unhandled data received: %s' % received,'got')
self.disconnect()
return True
def _do_send(self):
if not self.sendbuff:
if not self.sendqueue:
return None # nothing to send
self.sendbuff = self.sendqueue.pop(0)
self.sent_data = self.sendbuff
try:
send_count = self._send(self.sendbuff)
if send_count:
self.sendbuff = self.sendbuff[send_count:]
if not self.sendbuff and not self.sendqueue:
if self.state < 0:
self._on_send()
self.disconnect()
return
# we are not waiting for write
self._plug_idle()
self._on_send()
except Exception, e:
if self.state < 0:
self.disconnect()
return
if self._on_send_failure:
self._on_send_failure()
return
return True
def _do_connect(self):
if self.state != 0:
return
self._sock.setblocking(False)
# connect_ex is better than try:connect
try:
errnum = self._sock.connect_ex(self._server)
except socket.error, e:
errnum = e[0]
# in progress, or would block
if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
return
# 10056 - already connected, only on win32
# code 'WS*' is not available on GNU, so we use its numeric value
elif errnum not in (0, 10056):
self.remove_timeout()
if self.on_connect_failure:
self.on_connect_failure()
return
self.remove_timeout()
self._owner.Connection=self
self.state = 1
self._sock.setblocking(False)
self._send = self._sock.send
self._recv = self._sock.recv
self._plug_idle()
if self.on_connect:
self.on_connect()
self.on_connect = None
return True
def send(self, raw_data):
''' Writes raw outgoing data. Blocks until done.
If supplied data is unicode string, encodes it to utf-8 before send. '''
if self.state <= 0:
return
r = raw_data
if isinstance(r, unicode):
r = r.encode('utf-8')
elif not isinstance(r, str):
r = ustr(r).encode('utf-8')
self.sendqueue.append(r)
self._plug_idle()
def _on_send(self):
if self.sent_data and self.sent_data.strip():
self.DEBUG(self.sent_data,'sent')
if hasattr(self._owner, 'Dispatcher'):
self._owner.Dispatcher.Event('', DATA_SENT, self.sent_data)
self.sent_data = None
def _on_send_failure(self):
self.DEBUG("Socket error while sending data",'error')
self._owner.disconnected()
self.sent_data = None
def set_send_timeout(self, timeout, on_timeout):
self.sendtimeout = timeout
if self.sendtimeout > 0:
self.on_timeout = on_timeout
else:
self.on_timeout = None
def renew_send_timeout(self):
if self.on_timeout and self.sendtimeout > 0:
self.set_timeout(self.sendtimeout)
else:
self.remove_timeout()
def getHost(self):
''' Return the 'host' value that is connection is [will be] made to.'''
return self._server[0]
def getPort(self):
''' Return the 'port' value that is connection is [will be] made to.'''
return self._server[1]
class NonBlockingTLS(PlugIn):
''' TLS connection used to encrypts already estabilished tcp connection.'''
def PlugIn(self, owner, now=0, on_tls_start = None):
''' If the 'now' argument is true then starts using encryption immidiatedly.
If 'now' in false then starts encryption as soon as TLS feature is
declared by the server (if it were already declared - it is ok).
'''
if owner.__dict__.has_key('NonBlockingTLS'):
return # Already enabled.
PlugIn.PlugIn(self, owner)
DBG_LINE='NonBlockingTLS'
self.on_tls_start = on_tls_start
if now:
res = self._startSSL()
self.tls_start()
return res
if self._owner.Dispatcher.Stream.features:
try:
self.FeaturesHandler(self._owner.Dispatcher, self._owner.Dispatcher.Stream.features)
except NodeProcessed:
pass
else:
self._owner.RegisterHandlerOnce('features',self.FeaturesHandler, xmlns=NS_STREAMS)
self.starttls = None
def plugout(self,now=0):
''' Unregisters TLS handler's from owner's dispatcher. Take note that encription
can not be stopped once started. You can only break the connection and start over.'''
self._owner.UnregisterHandler('features',self.FeaturesHandler,xmlns=NS_STREAMS)
def tls_start(self):
if self.on_tls_start:
self.on_tls_start()
self.on_tls_start = None
def FeaturesHandler(self, conn, feats):
''' Used to analyse server <features/> tag for TLS support.
If TLS is supported starts the encryption negotiation. Used internally '''
if not feats.getTag('starttls', namespace=NS_TLS):
self.DEBUG("TLS unsupported by remote server.", 'warn')
self.tls_start()
return
self.DEBUG("TLS supported by remote server. Requesting TLS start.", 'ok')
self._owner.RegisterHandlerOnce('proceed', self.StartTLSHandler, xmlns=NS_TLS)
self._owner.RegisterHandlerOnce('failure', self.StartTLSHandler, xmlns=NS_TLS)
self._owner.send('<starttls xmlns="%s"/>' % NS_TLS)
self.tls_start()
raise NodeProcessed
def _startSSL(self):
''' Immidiatedly switch socket to TLS mode. Used internally.'''
tcpsock=self._owner.Connection
tcpsock._sock.setblocking(True)
tcpsock._sslObj = socket.ssl(tcpsock._sock, None, None)
tcpsock._sock.setblocking(False)
tcpsock._sslIssuer = tcpsock._sslObj.issuer()
tcpsock._sslServer = tcpsock._sslObj.server()
tcpsock._recv = tcpsock._sslObj.read
tcpsock._send = tcpsock._sslObj.write
self.starttls='success'
def StartTLSHandler(self, conn, starttls):
''' Handle server reply if TLS is allowed to process. Behaves accordingly.
Used internally.'''
if starttls.getNamespace() <> NS_TLS:
return
self.starttls = starttls.getName()
if self.starttls == 'failure':
self.DEBUG('Got starttls response: ' + self.starttls,'error')
return
self.DEBUG('Got starttls proceed response. Switching to TLS/SSL...','ok')
self._startSSL()
self._owner.Dispatcher.PlugOut()
dispatcher_nb.Dispatcher().PlugIn(self._owner)
class NBHTTPPROXYsocket(NonBlockingTcp):
''' This class can be used instead of transports.HTTPPROXYsocket
HTTP (CONNECT) proxy connection class. Uses TCPsocket as the base class
redefines only connect method. Allows to use HTTP proxies like squid with
(optionally) simple authentication (using login and password).
'''
def __init__(self, on_connect =None, on_connect_failure = None,proxy = None,server = None,use_srv=True):
''' Caches proxy and target addresses.
'proxy' argument is a dictionary with mandatory keys 'host' and 'port' (proxy address)
and optional keys 'user' and 'password' to use for authentication.
'server' argument is a tuple of host and port - just like TCPsocket uses. '''
self.on_connect_proxy = on_connect
self.on_connect_failure = on_connect_failure
NonBlockingTcp.__init__(self, self._on_tcp_connect, on_connect_failure, server, use_srv)
self.DBG_LINE=DBG_CONNECT_PROXY
self.server = server
self.proxy=proxy
def plugin(self, owner):
''' Starts connection. Used interally. Returns non-empty string on success.'''
owner.debug_flags.append(DBG_CONNECT_PROXY)
NonBlockingTcp.plugin(self,owner)
def connect(self,dupe=None):
''' Starts connection. Connects to proxy, supplies login and password to it
(if were specified while creating instance). Instructs proxy to make
connection to the target server. Returns non-empty sting on success. '''
NonBlockingTcp.connect(self, (self.proxy['host'], self.proxy['port']))
def _on_tcp_connect(self):
self.DEBUG('Proxy server contacted, performing authentification','start')
connector = ['CONNECT %s:%s HTTP/1.0'%self.server,
'Proxy-Connection: Keep-Alive',
'Pragma: no-cache',
'Host: %s:%s'%self.server,
'User-Agent: HTTPPROXYsocket/v0.1']
if self.proxy.has_key('user') and self.proxy.has_key('password'):
credentials = '%s:%s' % ( self.proxy['user'], self.proxy['password'])
credentials = base64.encodestring(credentials).strip()
connector.append('Proxy-Authorization: Basic '+credentials)
connector.append('\r\n')
self.onreceive(self._on_headers_sent)
self.send('\r\n'.join(connector))
def _on_headers_sent(self, reply):
if reply is None:
return
self.reply = reply.replace('\r', '')
try:
proto, code, desc = reply.split('\n')[0].split(' ', 2)
except:
raise error('Invalid proxy reply')
if code <> '200':
self.DEBUG('Invalid proxy reply: %s %s %s' % (proto, code, desc),'error')
self._owner.disconnected()
return
self.onreceive(self._on_proxy_auth)
def _on_proxy_auth(self, reply):
if self.reply.find('\n\n') == -1:
if reply is None:
return
if reply.find('\n\n') == -1:
self.reply += reply.replace('\r', '')
return
self.DEBUG('Authentification successfull. Jabber server contacted.','ok')
if self.on_connect_proxy:
self.on_connect_proxy()
def DEBUG(self, text, severity):
''' Overwrites DEBUG tag to allow debug output be presented as "CONNECTproxy".'''
return self._owner.DEBUG(DBG_CONNECT_PROXY, text, severity)

View File

@ -4,6 +4,7 @@
## - Yann Le Boulanger <asterix@lagaule.org>
## - Nikos Kouremenos <kourem@gmail.com>
## - Vincent Hanquez <tab@snarc.org>
## - Dimitur Kirov <dkirov@gmail.com>
##
## Copyright (C) 2003-2004 Yann Le Boulanger <asterix@lagaule.org>
## Vincent Hanquez <tab@snarc.org>
@ -1229,7 +1230,6 @@ class AccountModificationWindow:
gajim.last_message_time[self.account]
gajim.status_before_autoaway[name] = \
gajim.status_before_autoaway[self.account]
gajim.events_for_ui[name] = gajim.events_for_ui[self.account]
gajim.contacts.change_account_name(self.account, name)
@ -1256,7 +1256,6 @@ class AccountModificationWindow:
del gajim.encrypted_chats[self.account]
del gajim.last_message_time[self.account]
del gajim.status_before_autoaway[self.account]
del gajim.events_for_ui[self.account]
gajim.connections[self.account].name = name
gajim.connections[name] = gajim.connections[self.account]
del gajim.connections[self.account]
@ -2266,7 +2265,9 @@ class RemoveAccountWindow:
_('If you remove it, the connection will be lost.'))
if dialog.get_response() != gtk.RESPONSE_OK:
return
gajim.connections[self.account].change_status('offline', 'offline')
# change status to offline only if we will not remove this JID from server
if not self.remove_and_unregister_radiobutton.get_active():
gajim.connections[self.account].change_status('offline', 'offline')
if self.remove_and_unregister_radiobutton.get_active():
if not gajim.connections[self.account].password:
@ -2280,10 +2281,15 @@ class RemoveAccountWindow:
# We don't remove account cause we canceled pw window
return
gajim.connections[self.account].password = passphrase
if not gajim.connections[self.account].unregister_account():
# unregistration failed, we don't remove the account
# Error message is send by connect_and_auth()
return
gajim.connections[self.account].unregister_account(self._on_remove_success)
else:
self._on_remove_success(True)
def _on_remove_success(self, res):
# action of unregistration has failed, we don't remove the account
# Error message is send by connect_and_auth()
if not res:
return
# Close all opened windows
gajim.interface.roster.close_all(gajim.interface.instances[self.account])
del gajim.connections[self.account]
@ -2302,7 +2308,6 @@ class RemoveAccountWindow:
del gajim.encrypted_chats[self.account]
del gajim.last_message_time[self.account]
del gajim.status_before_autoaway[self.account]
del gajim.events_for_ui[self.account]
if len(gajim.connections) >= 2: # Do not merge accounts if only one exists
gajim.interface.roster.regroup = gajim.config.get('mergeaccounts')
else:
@ -2828,7 +2833,6 @@ _('You can set advanced account options by pressing Advanced button, or later by
con = connection.Connection(self.account)
if savepass:
con.password = password
gajim.events_for_ui[self.account] = []
if not self.modify:
con.new_account(self.account, config)
return
@ -2858,7 +2862,6 @@ _('You can set advanced account options by pressing Advanced button, or later by
gajim.encrypted_chats[self.account] = []
gajim.last_message_time[self.account] = {}
gajim.status_before_autoaway[self.account] = ''
gajim.events_for_ui[self.account] = []
# refresh accounts window
if gajim.interface.instances.has_key('accounts'):
gajim.interface.instances['accounts'].init_accounts()

View File

@ -105,6 +105,8 @@ import notify
import common.sleepy
from common.xmpp import idlequeue
from common import nslookup
from common import socks5
from common import gajim
from common import connection
@ -152,6 +154,39 @@ import disco
GTKGUI_GLADE = 'gtkgui.glade'
class GlibIdleQueue(idlequeue.IdleQueue):
'''
Extends IdleQueue to use glib io_add_wath, instead of select/poll
In another, `non gui' implementation of Gajim IdleQueue can be used safetly.
'''
def init_idle(self):
''' this method is called at the end of class constructor.
Creates a dict, which maps file/pipe/sock descriptor to glib event id'''
self.events = {}
if gtk.pygtk_version >= (2, 8, 0):
# time() is already called in glib, we just get the last value
# overrides IdleQueue.current_time()
self.current_time = lambda: gobject.get_current_time()
def add_idle(self, fd, flags):
''' this method is called when we plug a new idle object.
Start listening for events from fd
'''
res = gobject.io_add_watch(fd, flags, self.process_events,
priority=gobject.PRIORITY_LOW)
# store the id of the watch, so that we can remove it on unplug
self.events[fd] = res
def remove_idle(self, fd):
''' this method is called when we unplug a new idle object.
Stop listening for events from fd
'''
gobject.source_remove(self.events[fd])
del(self.events[fd])
def process(self):
self.check_time_events()
class Interface:
def handle_event_roster(self, account, data):
#('ROSTER', account, array)
@ -1333,27 +1368,17 @@ class Interface:
'SIGNED_IN': self.handle_event_signed_in,
'META_CONTACTS': self.handle_event_meta_contacts,
}
def exec_event(self, account):
ev = gajim.events_for_ui[account].pop(0)
self.handlers[ev[0]](account, ev[1])
gajim.handlers = self.handlers
def process_connections(self):
# We copy the list of connections because one can disappear while we
# process()
accounts = []
for account in gajim.connections:
accounts.append(account)
for account in accounts:
if gajim.connections[account].connected:
gajim.connections[account].process(0.01)
if gajim.socks5queue.connected:
gajim.socks5queue.process(0)
for account in gajim.events_for_ui: #when we create a new account we don't have gajim.connection
while len(gajim.events_for_ui[account]):
gajim.mutex_events_for_ui.lock(self.exec_event, account)
gajim.mutex_events_for_ui.unlock()
time.sleep(0.01) # so threads in connection.py have time to run
''' called each XXX (200) miliseconds. For now it checks for idlequeue timeouts
and FT events.
'''
gajim.idlequeue.process()
# TODO: rewrite socks5 classes to work with idlequeue and remove these lines
if gajim.socks5queue.connected:
gajim.socks5queue.process(0)
return True # renew timeout (loop for ever)
def save_config(self):
@ -1471,6 +1496,12 @@ class Interface:
gajim.socks5queue = socks5.SocksQueue(
self.handle_event_file_rcv_completed,
self.handle_event_file_progress)
# in a nongui implementation, just call:
# gajim.idlequeue = IdleQueue() , and
# gajim.idlequeue.process() each foo miliseconds
gajim.idlequeue = GlibIdleQueue()
# resolve and keep current record of resolved hosts
gajim.resolver = nslookup.Resolver(gajim.idlequeue)
self.register_handlers()
for account in gajim.config.get_per('accounts'):
gajim.connections[account] = common.connection.Connection(account)
@ -1495,7 +1526,6 @@ class Interface:
gajim.encrypted_chats[a] = []
gajim.last_message_time[a] = {}
gajim.status_before_autoaway[a] = ''
gajim.events_for_ui[a] = []
self.roster = roster_window.RosterWindow()

View File

@ -723,7 +723,9 @@ class RosterWindow:
def draw_roster(self):
'''Clear and draw roster'''
self.tree.get_model().clear()
# clear the model, only if it is not empty
if self.tree.get_model():
self.tree.get_model().clear()
for acct in gajim.connections:
self.add_account_to_roster(acct)
for jid in gajim.contacts.get_jid_list(acct):
@ -899,7 +901,8 @@ class RosterWindow:
# if we're online ...
if connection.connection:
roster = connection.connection.getRoster()
if roster.getItem(jid):
# in threadless connection when no roster stanza is sent, 'roster' is None
if roster and roster.getItem(jid):
resources = roster.getResources(jid)
# ...get the contact info for our other online resources
for resource in resources: