new BOSHDispatcher (in dispatcher_nb), improved BOSHClient class, minor changes in other xmpp modules

This commit is contained in:
tomk 2008-07-07 23:04:10 +00:00
parent f379d06d2c
commit e1899f34dc
7 changed files with 553 additions and 319 deletions

View File

@ -173,11 +173,8 @@ class SASL(PlugIn):
self.startsasl='success'
log.info('Successfully authenticated with remote server.')
handlers=self._owner.Dispatcher.dumpHandlers()
print '6' * 79
print handlers
print '6' * 79
self._owner.Dispatcher.PlugOut()
dispatcher_nb.Dispatcher().PlugIn(self._owner)
dispatcher_nb.Dispatcher().PlugIn(self._owner, after_SASL=True)
self._owner.Dispatcher.restoreHandlers(handlers)
self._owner.User = self.username
if self.on_sasl :

View File

@ -1,80 +1,208 @@
import protocol, simplexml, locale, random, dispatcher_nb
import protocol, locale, random, dispatcher_nb
from client_nb import NBCommonClient
import transports_nb
import logging
from simplexml import Node
log = logging.getLogger('gajim.c.x.bosh')
class BOSHClient(NBCommonClient):
'''
Client class implementing BOSH.
Client class implementing BOSH. Extends common XMPP
'''
def __init__(self, *args, **kw):
def __init__(self, domain, idlequeue, caller=None):
'''Preceeds constructor of NBCommonClient and sets some of values that will
be used as attributes in <body> tag'''
self.Namespace = protocol.NS_HTTP_BIND
# BOSH parameters should be given via Advanced Configuration Editor
self.bosh_xml_lang = None
self.bosh_hold = 1
self.bosh_wait=60
self.bosh_rid=None
self.bosh_sid=None
self.bosh_httpversion = 'HTTP/1.1'
NBCommonClient.__init__(self, *args, **kw)
def connect(self, *args, **kw):
if locale.getdefaultlocale()[0]:
self.bosh_xml_lang = locale.getdefaultlocale()[0].split('_')[0]
# with 50-bit random initial rid, session would have to go up
# to 7881299347898368 messages to raise rid over 2**53
# (see http://www.xmpp.org/extensions/xep-0124.html#rids)
r = random.Random()
r.seed()
self.bosh_rid = r.getrandbits(50)
self.bosh_sid = None
if locale.getdefaultlocale()[0]:
self.bosh_xml_lang = locale.getdefaultlocale()[0].split('_')[0]
else:
self.bosh_xml_lang = 'en'
self.http_version = 'HTTP/1.1'
self.bosh_to = domain
#self.Namespace = protocol.NS_HTTP_BIND
#self.defaultNamespace = self.Namespace
self.bosh_session_on = False
NBCommonClient.__init__(self, domain, idlequeue, caller)
def connect(self, on_connect, on_connect_failure, proxy, hostname=None, port=5222,
on_proxy_failure=None, secure=None):
'''
Open XMPP connection (open XML streams in both directions).
:param hostname: hostname of XMPP server from SRV request
:param port: port number of XMPP server
:param on_connect: called after stream is successfully opened
:param on_connect_failure: called when error occures during connection
:param on_proxy_failure: called if error occurres during TCP connection to
proxy server or during connection to the proxy
:param proxy: dictionary with bosh-related paramters. It should contain at
least values for keys 'host' and 'port' - connection details for proxy
server and optionally keys 'user' and 'pass' as proxy credentials
:param secure: if
'''
NBCommonClient.connect(self, on_connect, on_connect_failure, hostname, port,
on_proxy_failure, proxy, secure)
if hostname:
self.route_host = hostname
else:
self.route_host = self.Server
assert(proxy.has_key('type'))
assert(proxy['type']=='bosh')
proxy = kw['proxy']
#self.bosh_protocol, self.bosh_host, self.bosh_uri = transports_nb.urisplit(proxy['host'])
self.bosh_port = proxy['port']
self.bosh_wait = proxy['bosh_wait']
self.bosh_hold = proxy['bosh_hold']
self.bosh_to = proxy['to']
#self.bosh_ack = proxy['bosh_ack']
#self.bosh_secure = proxy['bosh_secure']
NBCommonClient.connect(self, *args, **kw)
self.bosh_host = proxy['host']
self.bosh_port = proxy['port']
self.bosh_content = proxy['bosh_content']
# _on_tcp_failure is callback for errors which occur during name resolving or
# TCP connecting.
self._on_tcp_failure = self.on_proxy_failure
# in BOSH, client connects to Connection Manager instead of directly to
# XMPP server ((hostname, port)). If HTTP Proxy is specified, client connects
# to HTTP proxy and Connection Manager is specified at URI and Host header
# in HTTP message
# tcp_host, tcp_port is hostname and port for socket connection - Connection
# Manager or HTTP proxy
if proxy.has_key('proxy_host') and proxy['proxy_host'] and \
proxy.has_key('proxy_port') and proxy['proxy_port']:
tcp_host=proxy['proxy_host']
tcp_port=proxy['proxy_port']
# user and password for HTTP proxy
if proxy.has_key('user') and proxy['user'] and \
proxy.has_key('pass') and proxy['pass']:
proxy_creds=(proxy['user'],proxy['pass'])
else:
proxy_creds=(None, None)
else:
tcp_host = transports_nb.urisplit(proxy['host'])[1]
tcp_port=proxy['port']
if tcp_host is None:
self._on_connect_failure("Invalid BOSH URI")
return
self.socket = self.get_socket()
self._resolve_hostname(
hostname=tcp_host,
port=tcp_port,
on_success=self._try_next_ip,
on_failure=self._on_tcp_failure)
def _on_stream_start(self):
'''
Called after XMPP stream is opened. In BOSH, TLS is negotiated on socket
connect so success callback can be invoked after TCP connect.
(authentication is started from auth() method)
'''
self.onreceive(None)
if self.connected == 'tcp':
self._on_connect()
def get_socket(self):
tmp = transports_nb.NonBlockingHTTP(
raise_event=self.raise_event,
on_disconnect=self.on_http_disconnect,
http_uri = self.bosh_host,
http_port = self.bosh_port,
http_version = self.http_version
)
tmp.PlugIn(self)
return tmp
def on_http_disconnect(self):
log.info('HTTP socket disconnected')
#import traceback
#traceback.print_stack()
if self.bosh_session_on:
self.socket.connect(
conn_5tuple=self.current_ip,
on_connect=self.on_http_reconnect,
on_connect_failure=self.on_disconnect)
else:
self.on_disconnect()
def on_http_reconnect(self):
self.socket._plug_idle()
log.info('Connected to BOSH CM again')
pass
def on_http_reconnect_fail(self):
log.error('Error when reconnecting to BOSH CM')
self.on_disconnect()
def send(self, stanza, now = False):
(id, stanza_to_send) = self.Dispatcher.assign_id(stanza)
self.Connection.send(
self.socket.send(
self.boshify_stanza(stanza_to_send),
now = now)
return id
def get_rid(self):
# does this need a lock??"
self.bosh_rid = self.bosh_rid + 1
return str(self.bosh_rid)
def get_bodytag(self):
# this should be called not until after session creation response so sid has
# to be initialized.
assert(self.sid is not None)
self.rid = self.rid+1
assert(hasattr(self, 'bosh_sid'))
return protocol.BOSHBody(
attrs={ 'rid': str(self.bosh_rid),
attrs={ 'rid': self.get_rid(),
'sid': self.bosh_sid})
def get_initial_bodytag(self):
return protocol.BOSHBody(
attrs={'content': 'text/xml; charset=utf-8',
def get_initial_bodytag(self, after_SASL=False):
tag = protocol.BOSHBody(
attrs={'content': self.bosh_content,
'hold': str(self.bosh_hold),
'route': '%s:%s' % (self.route_host, self.Port),
'to': self.bosh_to,
'wait': str(self.bosh_wait),
'rid': str(self.bosh_rid),
'rid': self.get_rid(),
'xml:lang': self.bosh_xml_lang,
'xmpp:version': '1.0',
'xmlns:xmpp': 'urn:xmpp:xbosh'}
)
'ver': '1.6',
'xmlns:xmpp': 'urn:xmpp:xbosh'})
if after_SASL:
tag.delAttr('content')
tag.delAttr('hold')
tag.delAttr('route')
tag.delAttr('wait')
tag.delAttr('ver')
# xmpp:restart attribute is essential for stream restart request
tag.setAttr('xmpp:restart','true')
tag.setAttr('sid',self.bosh_sid)
return tag
def get_closing_bodytag(self):
closing_bodytag = self.get_bodytag()
@ -82,31 +210,26 @@ class BOSHClient(NBCommonClient):
return closing_bodytag
def boshify_stanza(self, stanza):
''' wraps stanza by body tag or modifies message entirely (in case of stream
opening and closing'''
log.info('boshify_staza - type is: %s' % type(stanza))
if isinstance(stanza, simplexml.Node):
tag = self.get_bodytag()
return tag.setPayload(stanza)
else:
# only stream initialization and stream terminatoion are not Nodes
if stanza.startswith(dispatcher_nb.XML_DECLARATION):
# stream init
return self.get_initial_bodytag()
else:
# should be stream closing
assert(stanza == dispatcher_nb.STREAM_TERMINATOR)
return self.get_closing_bodytag()
def boshify_stanza(self, stanza=None, body_attrs=None):
''' wraps stanza by body tag with rid and sid '''
#log.info('boshify_staza - type is: %s, stanza is %s' % (type(stanza), stanza))
tag = self.get_bodytag()
tag.setPayload([stanza])
return tag
def on_bodytag_attrs(self, body_attrs):
#log.info('on_bodytag_attrs: %s' % body_attrs)
if body_attrs.has_key('type'):
if body_attrs['type']=='terminated':
# BOSH session terminated
self.bosh_session_on = False
elif body_attrs['type']=='error':
# recoverable error
pass
if not self.bosh_sid:
# initial response - when bosh_sid is set
self.bosh_session_on = True
self.bosh_sid = body_attrs['sid']
self.Dispatcher.Stream._document_attrs['id']=body_attrs['authid']
def _on_stream_start(self):
'''
Called after XMPP stream is opened. In BOSH, TLS is negotiated elsewhere
so success callback can be invoked.
(authentication is started from auth() method)
'''
self.onreceive(None)
if self.connected == 'tcp':
self._on_connect()

View File

@ -32,7 +32,7 @@ class PlugIn:
def PlugIn(self,owner):
""" Attach to main instance and register ourself and all our staff in it. """
self._owner=owner
log.debug('Plugging %s into %s'%(self,self._owner))
log.info('Plugging %s __INTO__ %s' % (self,self._owner))
if owner.__dict__.has_key(self.__class__.__name__):
log.debug('Plugging ignored: another instance already plugged.')
return
@ -41,17 +41,27 @@ class PlugIn:
if owner.__dict__.has_key(method.__name__):
self._old_owners_methods.append(owner.__dict__[method.__name__])
owner.__dict__[method.__name__]=method
owner.__dict__[self.__class__.__name__]=self
if self.__class__.__name__.endswith('Dispatcher'):
# FIXME: I need BOSHDispatcher or XMPPDispatcher on .Dispatcher
# there must be a better way..
owner.__dict__['Dispatcher']=self
else:
owner.__dict__[self.__class__.__name__]=self
# following will not work for classes inheriting plugin()
#if self.__class__.__dict__.has_key('plugin'): return self.plugin(owner)
if hasattr(self,'plugin'): return self.plugin(owner)
def PlugOut(self):
""" Unregister all our staff from main instance and detach from it. """
log.debug('Plugging %s out of %s.'%(self,self._owner))
log.info('Plugging %s __OUT__ of %s.' % (self,self._owner))
for method in self._exported_methods: del self._owner.__dict__[method.__name__]
for method in self._old_owners_methods: self._owner.__dict__[method.__name__]=method
del self._owner.__dict__[self.__class__.__name__]
if self.__class__.__dict__.has_key('plugout'): return self.plugout()
if self.__class__.__name__.endswith('Dispatcher'):
del self._owner.__dict__['Dispatcher']
else:
del self._owner.__dict__[self.__class__.__name__]
#if self.__class__.__dict__.has_key('plugout'): return self.plugout()
if hasattr(self,'plugout'): return self.plugout()
del self._owner

View File

@ -41,11 +41,10 @@ class NBCommonClient:
:param caller: calling object - it has to implement certain methods (necessary?)
'''
self.Namespace = protocol.NS_CLIENT
self.defaultNamespace = self.Namespace
self.idlequeue = idlequeue
self.defaultNamespace = self.Namespace
self.disconnect_handlers = []
self.Server = domain
@ -85,12 +84,14 @@ class NBCommonClient:
self.SASL.PlugOut()
if self.__dict__.has_key('NonBlockingTLS'):
self.NonBlockingTLS.PlugOut()
if self.__dict__.has_key('NBHTTPPROXYsocket'):
if self.__dict__.has_key('NBHTTPProxySocket'):
self.NBHTTPPROXYsocket.PlugOut()
if self.__dict__.has_key('NBSOCKS5PROXYsocket'):
if self.__dict__.has_key('NBSOCKS5ProxySocket'):
self.NBSOCKS5PROXYsocket.PlugOut()
if self.__dict__.has_key('NonBlockingTcp'):
self.NonBlockingTcp.PlugOut()
if self.__dict__.has_key('NonBlockingTCP'):
self.NonBlockingTCP.PlugOut()
if self.__dict__.has_key('NonBlockingHTTP'):
self.NonBlockingHTTP.PlugOut()
def send(self, stanza, now = False):
@ -106,7 +107,7 @@ class NBCommonClient:
def connect(self, on_connect, on_connect_failure, hostname=None, port=5222,
on_proxy_failure=None, proxy=None, secure=None):
'''
Open XMPP connection (open streams in both directions).
Open XMPP connection (open XML streams in both directions).
:param hostname: hostname of XMPP server from SRV request
:param port: port number of XMPP server
:param on_connect: called after stream is successfully opened
@ -118,70 +119,14 @@ class NBCommonClient:
optionally keys 'user' and 'pass' as proxy credentials
:param secure:
'''
self.Port = port
if hostname:
xmpp_hostname = hostname
else:
xmpp_hostname = self.Server
self.on_connect = on_connect
self.on_connect_failure=on_connect_failure
self.on_proxy_failure = on_proxy_failure
self._secure = secure
self.Connection = None
self.Port = port
if proxy:
# with proxies, client connects to proxy instead of directly to
# XMPP server ((hostname, port))
# tcp_server is machine used for socket connection
tcp_server=proxy['host']
tcp_port=proxy['port']
self._on_tcp_failure = self.on_proxy_failure
if proxy.has_key('type'):
if proxy.has_key('user') and proxy.has_key('pass'):
proxy_creds=(proxy['user'],proxy['pass'])
else:
proxy_creds=(None, None)
type_ = proxy['type']
if type_ == 'socks5':
# SOCKS5 proxy
self.socket = transports_nb.NBSOCKS5ProxySocket(
on_disconnect=self.on_disconnect,
proxy_creds=proxy_creds,
xmpp_server=(xmpp_hostname, self.Port))
elif type_ == 'http':
# HTTP CONNECT to proxy
self.socket = transports_nb.NBHTTPProxySocket(
on_disconnect=self.on_disconnect,
proxy_creds=proxy_creds,
xmpp_server=(xmpp_hostname, self.Port))
elif type_ == 'bosh':
# BOSH - XMPP over HTTP
tcp_server = transports_nb.urisplit(tcp_server)[1]
self.socket = transports_nb.NonBlockingHTTP(
on_disconnect=self.on_disconnect,
http_uri = proxy['host'],
http_port = tcp_port)
else:
# HTTP CONNECT to proxy from environment variables
self.socket = transports_nb.NBHTTPProxySocket(
on_disconnect=self.on_disconnect,
proxy_creds=(None, None),
xmpp_server=(xmpp_hostname, self.Port))
else:
self._on_tcp_failure = self._on_connect_failure
tcp_server=xmpp_hostname
tcp_port=self.Port
self.socket = transports_nb.NonBlockingTcp(on_disconnect = self.on_disconnect)
self.socket.PlugIn(self)
self._resolve_hostname(
hostname=tcp_server,
port=tcp_port,
on_success=self._try_next_ip,
on_failure=self._on_tcp_failure)
@ -232,13 +177,14 @@ class NBCommonClient:
started, and _on_connect_failure on failure.
'''
#FIXME: use RegisterHandlerOnce instead of onreceive
log.info('=============xmpp_connect_machine() >> mode: %s, data: %s' % (mode,data))
log.info('========xmpp_connect_machine() >> mode: %s, data: %s' % (mode,str(data)[:20] ))
def on_next_receive(mode):
log.info('setting %s on next receive' % mode)
if mode is None:
self.onreceive(None)
else:
self.onreceive(lambda data:self._xmpp_connect_machine(mode, data))
self.onreceive(lambda _data:self._xmpp_connect_machine(mode, _data))
if not mode:
dispatcher_nb.Dispatcher().PlugIn(self)
@ -259,9 +205,11 @@ class NBCommonClient:
if not self.Dispatcher.Stream.features:
on_next_receive('RECEIVE_STREAM_FEATURES')
else:
log.info('got STREAM FEATURES in first read')
self._xmpp_connect_machine(mode='STREAM_STARTED')
else:
log.info('incoming stream version less than 1.0')
self._xmpp_connect_machine(mode='STREAM_STARTED')
elif mode == 'RECEIVE_STREAM_FEATURES':
@ -274,6 +222,7 @@ class NBCommonClient:
mode='FAILURE',
data='Missing <features> in 1.0 stream')
else:
log.info('got STREAM FEATURES in second read')
self._xmpp_connect_machine(mode='STREAM_STARTED')
elif mode == 'STREAM_STARTED':
@ -294,6 +243,10 @@ class NBCommonClient:
self.onreceive(None)
self.on_connect(self, self.connected)
def raise_event(self, event_type, data):
log.info('raising event from transport: %s %s' % (event_type,data))
if hasattr(self, 'Dispatcher'):
self.Dispatcher.Event('', event_type, data)
# moved from client.CommonClient:
@ -324,8 +277,6 @@ class NBCommonClient:
def auth(self, user, password, resource = '', sasl = 1, on_auth = None):
print 'auth called'
''' Authenticate connnection and bind resource. If resource is not provided
random one or library name used. '''
self._User, self._Password, self._Resource, self._sasl = user, password, resource, sasl
@ -388,39 +339,6 @@ class NBCommonClient:
self.on_auth(self, None)
class NonBlockingClient(NBCommonClient):
''' Example client class, based on CommonClient. '''
def _on_stream_start(self):
'''
Called after XMPP stream is opened.
In pure XMPP client, TLS negotiation may follow after esabilishing a stream.
'''
self.onreceive(None)
if self.connected == 'tcp':
if not self.connected or self._secure is not None and not self._secure:
# if we are disconnected or TLS/SSL is not desired, return
self._on_connect()
return
if not self.Dispatcher.Stream.features.getTag('starttls'):
# if server doesn't advertise TLS in init response
self._on_connect()
return
if self.incoming_stream_version() != '1.0':
self._on_connect()
return
# otherwise start TLS
tls_nb.NonBlockingTLS().PlugIn(
self,
on_tls_success=lambda: self._xmpp_connect(socket_type='tls'),
on_tls_failure=self._on_connect_failure)
elif self.connected == 'tls':
self._on_connect()
def initRoster(self):
''' Plug in the roster. '''
if not self.__dict__.has_key('NonBlockingRoster'):
@ -440,3 +358,101 @@ class NonBlockingClient(NBCommonClient):
self.send(dispatcher_nb.Presence(to=jid, typ=typ))
class NonBlockingClient(NBCommonClient):
''' Example client class, based on CommonClient. '''
def __init__(self, domain, idlequeue, caller=None):
NBCommonClient.__init__(self, domain, idlequeue, caller)
def connect(self, on_connect, on_connect_failure, hostname=None, port=5222,
on_proxy_failure=None, proxy=None, secure=None):
NBCommonClient.connect(self, on_connect, on_connect_failure, hostname, port,
on_proxy_failure, proxy, secure)
if hostname:
xmpp_hostname = hostname
else:
xmpp_hostname = self.Server
if proxy:
# with proxies, client connects to proxy instead of directly to
# XMPP server ((hostname, port))
# tcp_host is machine used for socket connection
tcp_host=proxy['host']
tcp_port=proxy['port']
self._on_tcp_failure = self.on_proxy_failure
if proxy.has_key('type'):
assert(proxy['type']!='bosh')
if proxy.has_key('user') and proxy.has_key('pass'):
proxy_creds=(proxy['user'],proxy['pass'])
else:
proxy_creds=(None, None)
type_ = proxy['type']
if type_ == 'socks5':
# SOCKS5 proxy
self.socket = transports_nb.NBSOCKS5ProxySocket(
on_disconnect=self.on_disconnect,
proxy_creds=proxy_creds,
xmpp_server=(xmpp_hostname, self.Port))
elif type_ == 'http':
# HTTP CONNECT to proxy
self.socket = transports_nb.NBHTTPProxySocket(
on_disconnect=self.on_disconnect,
proxy_creds=proxy_creds,
xmpp_server=(xmpp_hostname, self.Port))
else:
# HTTP CONNECT to proxy from environment variables
self.socket = transports_nb.NBHTTPProxySocket(
on_disconnect=self.on_disconnect,
proxy_creds=(None, None),
xmpp_server=(xmpp_hostname, self.Port))
else:
self._on_tcp_failure = self._on_connect_failure
tcp_host=xmpp_hostname
tcp_port=self.Port
self.socket = transports_nb.NonBlockingTCP(
raise_event = self.raise_event,
on_disconnect = self.on_disconnect)
self.socket.PlugIn(self)
self._resolve_hostname(
hostname=tcp_host,
port=tcp_port,
on_success=self._try_next_ip,
on_failure=self._on_tcp_failure)
def _on_stream_start(self):
'''
Called after XMPP stream is opened.
In pure XMPP client, TLS negotiation may follow after esabilishing a stream.
'''
self.onreceive(None)
if self.connected == 'tcp':
if not self.connected or not self._secure:
# if we are disconnected or TLS/SSL is not desired, return
self._on_connect()
return
if not self.Dispatcher.Stream.features.getTag('starttls'):
# if server doesn't advertise TLS in init response
self._on_connect()
return
if self.incoming_stream_version() != '1.0':
self._on_connect()
return
# otherwise start TLS
tls_nb.NonBlockingTLS().PlugIn(
self,
on_tls_success=lambda: self._xmpp_connect(socket_type='tls'),
on_tls_failure=self._on_connect_failure)
elif self.connected == 'tls':
self._on_connect()

View File

@ -14,7 +14,6 @@
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
# $Id: dispatcher.py,v 1.40 2006/01/18 19:26:43 normanr Exp $
'''
Main xmpppy mechanism. Provides library with methods to assign different handlers
@ -30,6 +29,7 @@ from client import PlugIn
import logging
log = logging.getLogger('gajim.c.x.dispatcher_nb')
log.setLevel(logging.INFO)
# default timeout to wait for response for our id
DEFAULT_TIMEOUT_SECONDS = 25
@ -38,9 +38,33 @@ ID = 0
STREAM_TERMINATOR = '</stream:stream>'
XML_DECLARATION = '<?xml version=\'1.0\'?>'
class Dispatcher(PlugIn):
# FIXME: ugly
from client_nb import NonBlockingClient
from bosh import BOSHClient
class Dispatcher():
# Why is this here - I needed to redefine Dispatcher for BOSH and easiest way
# was to inherit original Dispatcher (now renamed to XMPPDispatcher). Trouble
# is that reference used to access dispatcher instance is in Client attribute
# named by __class__.__name__ of the dispatcher instance .. long story short:
# I wrote following to avoid changing each client.Dispatcher.whatever() in xmpp/
# If having two kinds of dispatcher will go well, I will rewrite the
def PlugIn(self, client_obj, after_SASL=False):
if isinstance(client_obj, NonBlockingClient):
XMPPDispatcher().PlugIn(client_obj)
elif isinstance(client_obj, BOSHClient):
BOSHDispatcher().PlugIn(client_obj, after_SASL)
class XMPPDispatcher(PlugIn):
''' Ancestor of PlugIn class. Handles XMPP stream, i.e. aware of stream headers.
Can be plugged out/in to restart these headers (used for SASL f.e.). '''
def __init__(self):
PlugIn.__init__(self)
self.handlers={}
@ -84,8 +108,6 @@ class Dispatcher(PlugIn):
def plugin(self, owner):
''' Plug the Dispatcher instance into Client class instance and send initial stream header. Used internally.'''
log.debug('Dispatcher plugin')
self._init()
self._owner.lastErrNode = None
self._owner.lastErr = None
@ -106,8 +128,8 @@ class Dispatcher(PlugIn):
def StreamInit(self):
''' Send an initial stream header. '''
self.Stream = simplexml.NodeBuilder()
self.Stream._dispatch_depth = 2
self.Stream.dispatch = self.dispatch
self.Stream._dispatch_depth = 2
self.Stream.stream_header_received = self._check_stream_start
self.Stream.features = None
self._metastream = Node('stream:stream')
@ -159,7 +181,7 @@ class Dispatcher(PlugIn):
''' Creates internal structures for newly registered namespace.
You can register handlers for this namespace afterwards. By default one namespace
already registered (jabber:client or jabber:component:accept depending on context. '''
log.info('Registering namespace "%s"' % xmlns)
log.debug('Registering namespace "%s"' % xmlns)
self.handlers[xmlns]={}
self.RegisterProtocol('unknown', Protocol, xmlns=xmlns)
self.RegisterProtocol('default', Protocol, xmlns=xmlns)
@ -169,7 +191,7 @@ class Dispatcher(PlugIn):
Needed to start registering handlers for such stanzas.
Iq, message and presence protocols are registered by default. '''
if not xmlns: xmlns=self._owner.defaultNamespace
log.info('Registering protocol "%s" as %s(%s)' %(tag_name, Proto, xmlns))
log.debug('Registering protocol "%s" as %s(%s)' %(tag_name, Proto, xmlns))
self.handlers[xmlns][tag_name]={type:Proto, 'default':[]}
def RegisterNamespaceHandler(self, xmlns, handler, typ='', ns='', makefirst=0, system=0):
@ -196,7 +218,7 @@ class Dispatcher(PlugIn):
'''
if not xmlns:
xmlns=self._owner.defaultNamespace
log.info('Registering handler %s for "%s" type->%s ns->%s(%s)' %
log.debug('Registering handler %s for "%s" type->%s ns->%s(%s)' %
(handler, name, typ, ns, xmlns))
if not typ and not ns:
typ='default'
@ -287,32 +309,18 @@ class Dispatcher(PlugIn):
def dispatch(self, stanza, session=None, direct=0):
''' Main procedure that performs XMPP stanza recognition and calling apppropriate handlers for it.
Called internally. '''
#log.info('dispatch called: stanza = %s, session = %s, direct= %s' % (stanza, session, direct))
if not session:
session = self
session.Stream._mini_dom = None
name = stanza.getName()
if not direct and self._owner._component:
if name == 'route':
if stanza.getAttr('error') is None:
if len(stanza.getChildren()) == 1:
stanza = stanza.getChildren()[0]
name=stanza.getName()
else:
for each in stanza.getChildren():
self.dispatch(each,session,direct=1)
return
elif name == 'presence':
return
elif name in ('features','bind'):
pass
else:
raise UnsupportedStanzaType(name)
if name=='features':
session.Stream.features=stanza
xmlns=stanza.getNamespace()
#log.info('in dispatch, getting ns for %s, and the ns is %s' % (stanza, xmlns))
if not self.handlers.has_key(xmlns):
log.warn("Unknown namespace: " + xmlns)
xmlns='unknown'
@ -330,7 +338,6 @@ class Dispatcher(PlugIn):
stanza.props=stanza.getProperties()
ID=stanza.getID()
log.debug("Dispatching %s stanza with type->%s props->%s id->%s"%(name,typ,stanza.props,ID))
list=['default'] # we will use all handlers:
if self.handlers[xmlns][name].has_key(typ): list.append(typ) # from very common...
for prop in stanza.props:
@ -427,3 +434,56 @@ class Dispatcher(PlugIn):
stanza.setParent(self._metastream)
return (_ID, stanza)
class BOSHDispatcher(XMPPDispatcher):
def PlugIn(self, owner, after_SASL=False):
self.after_SASL = after_SASL
XMPPDispatcher.PlugIn(self, owner)
def StreamInit(self):
''' Send an initial stream header. '''
self.Stream = simplexml.NodeBuilder()
self.Stream.dispatch = self.dispatch
self.Stream._dispatch_depth = 2
self.Stream.stream_header_received = self._check_stream_start
self.Stream.features = None
self._metastream = Node('stream:stream')
self._metastream.setNamespace(self._owner.Namespace)
self._metastream.setAttr('version', '1.0')
self._metastream.setAttr('xmlns:stream', NS_STREAMS)
self._metastream.setAttr('to', self._owner.Server)
if locale.getdefaultlocale()[0]:
self._metastream.setAttr('xml:lang',
locale.getdefaultlocale()[0].split('_')[0])
self.restart = True
self._owner.Connection.send(self._owner.get_initial_bodytag(self.after_SASL))
def StreamTerminate(self):
''' Send a stream terminator. '''
self._owner.Connection.send(self._owner.get_closing_bodytag())
def ProcessNonBlocking(self, data=None):
if self.restart:
fromstream = self._metastream
fromstream.setAttr('from', fromstream.getAttr('to'))
fromstream.delAttr('to')
data = '%s%s>%s' % (XML_DECLARATION,str(fromstream)[:-2] ,data)
self.restart = False
return XMPPDispatcher.ProcessNonBlocking(self, data)
def dispatch(self, stanza, session=None, direct=0):
if stanza.getName()=='body' and stanza.getNamespace()==NS_HTTP_BIND:
self._owner.on_bodytag_attrs(stanza.getAttrs())
#self._owner.send_empty_bodytag()
for child in stanza.getChildren():
XMPPDispatcher.dispatch(self, child, session, direct)
else:
XMPPDispatcher.dispatch(self, stanza, session, direct)

View File

@ -20,7 +20,7 @@ I'm personally using it in many other separate projects. It is designed to be as
import xml.parsers.expat
import logging
log = logging.getLogger('gajim.c.x.simplexml')
#log.setLevel(logging.DEBUG)
def XMLescape(txt):
"""Returns provided string with symbols & < > " replaced by their respective XML entities."""
@ -99,7 +99,10 @@ class Node(object):
for a in self.kids:
if not fancy and (len(self.data)-1)>=cnt: s=s+XMLescape(self.data[cnt])
elif (len(self.data)-1)>=cnt: s=s+XMLescape(self.data[cnt].strip())
s = s + a.__str__(fancy and fancy+1)
if isinstance(a, str) or isinstance(a, unicode):
s = s + a.__str__()
else:
s = s + a.__str__(fancy and fancy+1)
cnt=cnt+1
if not fancy and (len(self.data)-1) >= cnt: s = s + XMLescape(self.data[cnt])
elif (len(self.data)-1) >= cnt: s = s + XMLescape(self.data[cnt].strip())
@ -343,7 +346,7 @@ class NodeBuilder:
attrs[self.namespaces[ns]+attr[sp+1:]]=attrs[attr]
del attrs[attr] #
self._inc_depth()
log.info("DEPTH -> %i , tag -> %s, attrs -> %s" % (self.__depth, tag, `attrs`))
log.info("STARTTAG.. DEPTH -> %i , tag -> %s, attrs -> %s" % (self.__depth, tag, `attrs`))
if self.__depth == self._dispatch_depth:
if not self._mini_dom :
self._mini_dom = Node(tag=tag, attrs=attrs)

View File

@ -3,6 +3,7 @@
##
## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov
## modified by Dimitur Kirov <dkirov@gmail.com>
## modified by Dimitur Kirov <dkirov@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
@ -65,10 +66,21 @@ CONNECTED ='CONNECTED'
DISCONNECTING ='DISCONNECTING'
class NonBlockingTransport(PlugIn):
def __init__(self, on_disconnect):
def __init__(self, raise_event, on_disconnect):
PlugIn.__init__(self)
self.raise_event = raise_event
self.on_disconnect = on_disconnect
self.on_connect = None
self.on_connect_failure = None
self.idlequeue = None
self.on_receive = None
self.server = None
self.port = None
self.state = DISCONNECTED
self._exported_methods=[self.disconnect, self.onreceive]
def plugin(self, owner):
owner.Connection=self
@ -79,30 +91,72 @@ class NonBlockingTransport(PlugIn):
self._owner.Connection = None
self._owner = None
def connect(self, conn_5tuple, on_connect, on_connect_failure):
self.on_connect = on_connect
self.on_connect_failure = on_connect_failure
(self.server, self.port) = conn_5tuple[4][:2]
log.info('NonBlocking Connect :: About tot connect to %s:%s' % (self.server, self.port))
class NonBlockingTcp(PlugIn, IdleObject):
def set_state(self, newstate):
assert(newstate in [DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING])
if (self.state, newstate) in [(CONNECTING, DISCONNECTING), (DISCONNECTED, DISCONNECTING)]:
log.info('strange move: %s -> %s' % (self.state, newstate))
self.state = newstate
def _on_connect(self, data):
''' preceeds call of on_connect callback '''
self.set_state(CONNECTED)
self.on_connect()
def _on_connect_failure(self,err_message):
''' preceeds call of on_connect_failure callback '''
# In case of error while connecting we need to close socket
# but we don't want to call DisconnectHandlers from client,
# thus the do_callback=False
self.disconnect(do_callback=False)
self.on_connect_failure(err_message=err_message)
def send(self, raw_data, now=False):
if self.state not in [CONNECTED, DISCONNECTING]:
# FIXME better handling needed
log.error('Trying to send %s when transport is %s.' %
(raw_data, self.state))
return
def disconnect(self, do_callback=True):
self.set_state(DISCONNECTED)
if do_callback:
# invoke callback given in __init__
self.on_disconnect()
def onreceive(self, recv_handler):
''' Sets the on_receive callback. Do not confuse it with
on_receive() method, which is the callback itself.'''
if not recv_handler:
if hasattr(self._owner, 'Dispatcher'):
self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
else:
self.on_receive = None
return
log.info('setting onreceive on %s' % recv_handler)
self.on_receive = recv_handler
def tcp_connection_started(self):
self.set_state(CONNECTING)
# on_connect/on_conn_failure will be called from self.pollin/self.pollout
class NonBlockingTCP(NonBlockingTransport, IdleObject):
'''
Non-blocking TCP socket wrapper
'''
def __init__(self, on_disconnect):
def __init__(self, raise_event, on_disconnect):
'''
Class constructor.
'''
PlugIn.__init__(self)
IdleObject.__init__(self)
self.on_disconnect = on_disconnect
self.on_connect = None
self.on_connect_failure = None
self.sock = None
self.idlequeue = None
self.on_receive = None
self.DBG_LINE='socket'
self.state = DISCONNECTED
NonBlockingTransport.__init__(self, raise_event, on_disconnect)
# writable, readable - keep state of the last pluged flags
# This prevents replug of same object with the same flags
self.writable = True
@ -122,14 +176,6 @@ class NonBlockingTcp(PlugIn, IdleObject):
self._exported_methods=[self.disconnect, self.onreceive, self.set_send_timeout,
self.set_timeout, self.remove_timeout]
def plugin(self, owner):
owner.Connection=self
self.idlequeue = owner.idlequeue
def plugout(self):
self._owner.Connection = None
self._owner = None
def get_fd(self):
try:
@ -147,14 +193,12 @@ class NonBlockingTcp(PlugIn, IdleObject):
:param on_connect_failure: callback called on failure when estabilishing tcp
connection
'''
self.on_connect = on_connect
self.on_connect_failure = on_connect_failure
(self.server, self.port) = conn_5tuple[4][:2]
log.info('NonBlocking Connect :: About tot connect to %s:%s' % conn_5tuple[4][:2])
NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure)
try:
self._sock = socket.socket(*conn_5tuple[:3])
except socket.error, (errnum, errstr):
on_connect_failure('NonBlockingTcp: Error while creating socket: %s %s' % (errnum, errstr))
self._on_connect_failure('NonBlockingTCP: Error while creating socket: %s %s' % (errnum, errstr))
return
self._send = self._sock.send
@ -177,9 +221,8 @@ class NonBlockingTcp(PlugIn, IdleObject):
if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
# connecting in progress
self.set_state(CONNECTING)
log.info('After connect. "%s" raised => CONNECTING' % errstr)
# on_connect/failure will be called from self.pollin/self.pollout
self.tcp_connection_started()
return
elif errnum in (0, 10056, errno.EISCONN):
# already connected - this branch is very unlikely, nonblocking connect() will
@ -195,27 +238,9 @@ class NonBlockingTcp(PlugIn, IdleObject):
(self.server, self.port, errnum, errstr))
def _on_connect(self, data):
''' preceeds call of on_connect callback '''
self.set_state(CONNECTED)
''' with TCP socket, we have to remove send-timeout '''
self.idlequeue.remove_timeout(self.get_fd())
self.on_connect()
def set_state(self, newstate):
assert(newstate in [DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING])
if (self.state, newstate) in [(CONNECTING, DISCONNECTING), (DISCONNECTED, DISCONNECTING)]:
log.info('strange move: %s -> %s' % (self.state, newstate))
self.state = newstate
def _on_connect_failure(self,err_message):
''' preceeds call of on_connect_failure callback '''
# In case of error while connecting we need to close socket
# but we don't want to call DisconnectHandlers from client,
# thus the do_callback=False
self.disconnect(do_callback=False)
self.on_connect_failure(err_message=err_message)
NonBlockingTransport._on_connect(self, data)
def pollin(self):
@ -250,10 +275,7 @@ class NonBlockingTcp(PlugIn, IdleObject):
self._sock.close()
except socket.error, (errnum, errstr):
log.error('Error disconnecting a socket: %s %s' % (errnum,errstr))
self.set_state(DISCONNECTED)
if do_callback:
# invoke callback given in __init__
self.on_disconnect()
NonBlockingTransport.disconnect(self, do_callback)
def read_timeout(self):
'''
@ -295,11 +317,7 @@ class NonBlockingTcp(PlugIn, IdleObject):
'''Append raw_data to the queue of messages to be send.
If supplied data is unicode string, encode it to utf-8.
'''
if self.state not in [CONNECTED, DISCONNECTING]:
log.error('Trying to send %s when transport is %s.' %
(raw_data, self.state))
return
NonBlockingTransport.send(self, raw_data, now)
r = raw_data
if isinstance(r, unicode):
r = r.encode('utf-8')
@ -343,31 +361,13 @@ class NonBlockingTcp(PlugIn, IdleObject):
sent_data = self.sendbuff[:send_count]
self.sendbuff = self.sendbuff[send_count:]
self._plug_idle()
self._raise_event(DATA_SENT, sent_data)
self.raise_event(DATA_SENT, sent_data)
except socket.error, e:
log.error('_do_send:', exc_info=True)
traceback.print_exc()
self.disconnect()
def _raise_event(self, event_type, data):
if data and data.strip():
log.info('raising event from transport: %s %s' % (event_type,data))
if hasattr(self._owner, 'Dispatcher'):
self._owner.Dispatcher.Event('', event_type, data)
def onreceive(self, recv_handler):
''' Sets the on_receive callback. Do not confuse it with
on_receive() method, which is the callback itself.'''
if not recv_handler:
if hasattr(self._owner, 'Dispatcher'):
self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
else:
self.on_receive = None
return
log.info('setting onreceive on %s' % recv_handler)
self.on_receive = recv_handler
def _do_receive(self):
''' Reads all pending incoming data. Calls owner's disconnected() method if appropriate.'''
@ -410,7 +410,7 @@ class NonBlockingTcp(PlugIn, IdleObject):
# pass received data to owner
#self.
if self.on_receive:
self._raise_event(DATA_RECEIVED, received)
self.raise_event(DATA_RECEIVED, received)
self._on_receive(received)
else:
# This should never happen, so we need the debug. (If there is no handler
@ -418,31 +418,37 @@ class NonBlockingTcp(PlugIn, IdleObject):
log.error('SOCKET Unhandled data received: %s' % received)
self.disconnect()
def _on_receive(self, data):
# Overriding this method allows modifying received data before it is passed
# to owner's callback.
log.info('About to call on_receive which is %s' % self.on_receive)
def _on_receive(self,data):
'''Preceeds passing received data to Client class. Gets rid of HTTP headers
and checks them.'''
self.on_receive(data)
class NonBlockingHTTP(NonBlockingTcp):
class NonBlockingHTTP(NonBlockingTCP):
'''
Socket wrapper that cretes HTTP message out of sent data and peels-off
HTTP headers from incoming messages
'''
def __init__(self, http_uri, http_port, on_disconnect):
def __init__(self, raise_event, on_disconnect, http_uri, http_port, http_version=None):
self.http_protocol, self.http_host, self.http_path = urisplit(http_uri)
if self.http_protocol is None:
self.http_protocol = 'http'
if self.http_path == '':
http_path = '/'
self.http_port = http_port
NonBlockingTcp.__init__(self, on_disconnect)
if http_version:
self.http_version = http_version
else:
self.http_version = 'HTTP/1.1'
# buffer for partial responses
self.recvbuff = ''
self.expected_length = 0
NonBlockingTCP.__init__(self, raise_event, on_disconnect)
def send(self, raw_data, now=False):
NonBlockingTcp.send(
NonBlockingTCP.send(
self,
self.build_http_message(raw_data),
now)
@ -450,21 +456,43 @@ class NonBlockingHTTP(NonBlockingTcp):
def _on_receive(self,data):
'''Preceeds passing received data to Client class. Gets rid of HTTP headers
and checks them.'''
statusline, headers, httpbody = self.parse_http_message(data)
if statusline[1] != '200':
log.error('HTTP Error: %s %s' % (statusline[1], statusline[2]))
self.disconnect()
if not self.recvbuff:
# recvbuff empty - fresh HTTP message was received
statusline, headers, self.recvbuff = self.parse_http_message(data)
if statusline[1] != '200':
log.error('HTTP Error: %s %s' % (statusline[1], statusline[2]))
self.disconnect()
return
self.expected_length = int(headers['Content-Length'])
else:
#sth in recvbuff - append currently received data to HTTP mess in buffer
self.recvbuff = '%s%s' % (self.recvbuff, data)
if self.expected_length > len(self.recvbuff):
# If we haven't received the whole HTTP mess yet, let's end the thread.
# It will be finnished from one of following poll calls on plugged socket.
return
# FIXME the reassembling doesn't work - Connection Manager on jabbim.cz
# closes TCP connection before sending <Content-Length> announced bytes.. WTF
# all was received, now call the on_receive callback
httpbody = self.recvbuff
self.recvbuff=''
self.expected_length=0
self.on_receive(httpbody)
def build_http_message(self, httpbody):
def build_http_message(self, httpbody, method='POST'):
'''
Builds http message with given body.
Values for headers and status line fields are taken from class variables.
)
'''
headers = ['POST %s HTTP/1.1' % self.http_path,
absolute_uri = '%s://%s:%s%s' % (self.http_protocol, self.http_host,
self.http_port, self.http_path)
headers = ['%s %s %s' % (method, absolute_uri, self.http_version),
'Host: %s:%s' % (self.http_host, self.http_port),
'Content-Type: text/xml; charset=utf-8',
'Content-Length: %s' % len(str(httpbody)),
@ -482,7 +510,7 @@ class NonBlockingHTTP(NonBlockingTcp):
)
'''
message = message.replace('\r','')
(header, httpbody) = message.split('\n\n')
(header, httpbody) = message.split('\n\n',1)
header = header.split('\n')
statusline = header[0].split(' ')
header = header[1:]
@ -494,15 +522,15 @@ class NonBlockingHTTP(NonBlockingTcp):
class NBProxySocket(NonBlockingTcp):
class NBProxySocket(NonBlockingTCP):
'''
Interface for proxy socket wrappers - when tunnneling XMPP over proxies,
some connecting process usually has to be done before opening stream.
'''
def __init__(self, on_disconnect, xmpp_server, proxy_creds=(None,None)):
def __init__(self, raise_event, on_disconnect, xmpp_server, proxy_creds=(None,None)):
self.proxy_user, self.proxy_pass = proxy_creds
self.xmpp_server = xmpp_server
NonBlockingTcp.__init__(self, on_disconnect)
NonBlockingTCP.__init__(self, raise_event, on_disconnect)
def connect(self, conn_5tuple, on_connect, on_connect_failure):
@ -515,7 +543,7 @@ class NBProxySocket(NonBlockingTcp):
self.after_proxy_connect = on_connect
NonBlockingTcp.connect(self,
NonBlockingTCP.connect(self,
conn_5tuple=conn_5tuple,
on_connect =self._on_tcp_connect,
on_connect_failure =on_connect_failure)
@ -526,7 +554,7 @@ class NBProxySocket(NonBlockingTcp):
class NBHTTPProxySocket(NBProxySocket):
''' This class can be used instead of NonBlockingTcp
''' This class can be used instead of NonBlockingTCP
HTTP (CONNECT) proxy connection class. Allows to use HTTP proxies like squid with
(optionally) simple authentication (using login and password).
'''
@ -588,7 +616,7 @@ class NBSOCKS5ProxySocket(NBProxySocket):
redefines only connect method. Allows to use SOCKS5 proxies with
(optionally) simple authentication (only USERNAME/PASSWORD auth).
'''
# TODO: replace DEBUG with ordinrar logging, replace on_proxy_failure() with
# TODO: replace on_proxy_failure() with
# _on_connect_failure, at the end call _on_connect()
def _on_tcp_connect(self):
@ -620,13 +648,13 @@ class NBSOCKS5ProxySocket(NBProxySocket):
self.send(to_send)
else:
if reply[1] == '\xff':
self.DEBUG('Authentification to proxy impossible: no acceptable '
'auth method', 'error')
log.error('Authentification to proxy impossible: no acceptable '
'auth method')
self._owner.disconnected()
self.on_proxy_failure('Authentification to proxy impossible: no '
'acceptable authentification method')
return
self.DEBUG('Invalid proxy reply', 'error')
log.error('Invalid proxy reply')
self._owner.disconnected()
self.on_proxy_failure('Invalid proxy reply')
return
@ -635,21 +663,21 @@ class NBSOCKS5ProxySocket(NBProxySocket):
if reply is None:
return
if len(reply) != 2:
self.DEBUG('Invalid proxy reply', 'error')
log.error('Invalid proxy reply')
self._owner.disconnected()
self.on_proxy_failure('Invalid proxy reply')
return
if reply[0] != '\x01':
self.DEBUG('Invalid proxy reply', 'error')
log.error('Invalid proxy reply')
self._owner.disconnected()
self.on_proxy_failure('Invalid proxy reply')
return
if reply[1] != '\x00':
self.DEBUG('Authentification to proxy failed', 'error')
log.error('Authentification to proxy failed')
self._owner.disconnected()
self.on_proxy_failure('Authentification to proxy failed')
return
self.DEBUG('Authentification successfull. Jabber server contacted.','ok')
log.info('Authentification successfull. Jabber server contacted.')
# Request connection
req = "\x05\x01\x00"
# If the given destination address is an IP address, we'll
@ -675,12 +703,12 @@ class NBSOCKS5ProxySocket(NBProxySocket):
if reply is None:
return
if len(reply) < 10:
self.DEBUG('Invalid proxy reply', 'error')
log.error('Invalid proxy reply')
self._owner.disconnected()
self.on_proxy_failure('Invalid proxy reply')
return
if reply[0] != '\x05':
self.DEBUG('Invalid proxy reply', 'error')
log.error('Invalid proxy reply')
self._owner.disconnected()
self.on_proxy_failure('Invalid proxy reply')
return
@ -700,7 +728,7 @@ class NBSOCKS5ProxySocket(NBProxySocket):
txt = errors[ord(reply[1])-1]
else:
txt = 'Invalid proxy reply'
self.DEBUG(txt, 'error')
log.error(txt)
self.on_proxy_failure(txt)
return
# Get the bound address/port
@ -709,7 +737,7 @@ class NBSOCKS5ProxySocket(NBProxySocket):
elif reply[3] == "\x03":
begin, end = 4, 4 + reply[4]
else:
self.DEBUG('Invalid proxy reply', 'error')
log.error('Invalid proxy reply')
self._owner.disconnected()
self.on_proxy_failure('Invalid proxy reply')
return
@ -717,9 +745,6 @@ class NBSOCKS5ProxySocket(NBProxySocket):
if self.on_connect_proxy:
self.on_connect_proxy()
def DEBUG(self, text, severity):
''' Overwrites DEBUG tag to allow debug output be presented as "CONNECTproxy".'''
return self._owner.DEBUG(DBG_CONNECT_PROXY, text, severity)