BOSHClient transformed to NonBlockingBOSH transport - it's easier to maintain more connections from below, implemented handling of non-persistent HTTP connections - it runs with ejabberd, improved NonBlockingTransport interface, minor changes in BOSHDispatcher

This commit is contained in:
tomk 2008-07-13 22:22:58 +00:00
parent e1899f34dc
commit 3d860f40a6
9 changed files with 451 additions and 353 deletions

View File

@ -521,24 +521,19 @@ class Connection(ConnectionHandlers):
self.connection = None
if self._current_type == 'ssl':
# SSL (force TLS on different port than plain)
port = self._current_host['ssl_port']
secur = 1
secure = 'force'
else:
port = self._current_host['port']
if self._current_type == 'plain':
secur = 0
# plain connection
secure = None
else:
secur = None
# TLS (on the same port as plain)
secure = 'negotiate'
if self._proxy and self._proxy['type'] == 'bosh':
clientClass = common.xmpp.bosh.BOSHClient
else:
clientClass = common.xmpp.NonBlockingClient
# there was:
# "if gajim.verbose:"
# here
con = clientClass(
con = common.xmpp.NonBlockingClient(
domain=self._hostname,
caller=self,
idlequeue=gajim.idlequeue)
@ -550,11 +545,11 @@ class Connection(ConnectionHandlers):
if self.on_connect_success == self._on_new_account:
con.RegisterDisconnectHandler(self._on_new_account)
# FIXME: BOSH properties should be in proxy dictionary - loaded from
# config
if self._proxy and self._proxy['type'] == 'bosh':
# FIXME: BOSH properties should be loaded from config
if self._proxy and self._proxy['type'] == 'bosh':
self._proxy['bosh_hold'] = '1'
self._proxy['bosh_wait'] = '60'
self._proxy['bosh_content'] = 'text/xml; charset=utf-8'
log.info('Connecting to %s: [%s:%d]', self.name,
@ -566,7 +561,7 @@ class Connection(ConnectionHandlers):
on_proxy_failure=self.on_proxy_failure,
on_connect_failure=self.connect_to_next_type,
proxy=self._proxy,
secure = secur)
secure = secure)
else:
self.connect_to_next_host(retry)
@ -578,9 +573,11 @@ class Connection(ConnectionHandlers):
'connection_types').split()
else:
self._connection_types = ['tls', 'ssl', 'plain']
# FIXME: remove after tls and ssl will be degubbed
#self._connection_types = ['plain']
self._connection_types = ['plain']
host = self.select_next_host(self._hosts)
self._current_host = host
self._hosts.remove(host)
@ -619,6 +616,8 @@ class Connection(ConnectionHandlers):
if _con_type == 'tcp':
_con_type = 'plain'
if _con_type != self._current_type:
log.info('Connecting to next type beacuse desired is %s and returned is %s'
% (self._current_type, _con_type))
self.connect_to_next_type()
return
if _con_type == 'plain' and gajim.config.get_per('accounts', self.name,
@ -662,7 +661,12 @@ class Connection(ConnectionHandlers):
(con.Connection.ssl_fingerprint_sha1,))
return True
self._register_handlers(con, con_type)
con.auth(name, self.password, self.server_resource, 1, self.__on_auth)
con.auth(
user=name,
password=self.password,
resource=self.server_resource,
sasl=1,
on_auth=self.__on_auth)
def ssl_certificate_accepted(self):
name = gajim.config.get_per('accounts', self.name, 'name')
@ -997,7 +1001,7 @@ class Connection(ConnectionHandlers):
self.time_to_reconnect = None
self.connection.RegisterDisconnectHandler(self._on_disconnected)
self.connection.send(p)
self.connection.send(p, now=True)
self.connection.StreamTerminate()
#self.connection.start_disconnect(p, self._on_disconnected)
else:
@ -1554,7 +1558,7 @@ class Connection(ConnectionHandlers):
def gc_got_disconnected(self, room_jid):
''' A groupchat got disconnected. This can be or purpose or not.
Save the time we quit to avoid duplicate logs AND be faster than get that
date from DB. Save it in mem AND in a small table (with fast access)
date from DB. Save it in mem AND in a small table (with fast access)
'''
log_time = time_time()
self.last_history_time[room_jid] = log_time

View File

@ -1,235 +1,260 @@
import protocol, locale, random, dispatcher_nb
from client_nb import NBCommonClient
import transports_nb
import logging
import locale, random
from transports_nb import NonBlockingTransport, NonBlockingHTTP, CONNECTED, CONNECTING, DISCONNECTED
from protocol import BOSHBody
from simplexml import Node
import logging
log = logging.getLogger('gajim.c.x.bosh')
class BOSHClient(NBCommonClient):
'''
Client class implementing BOSH. Extends common XMPP
'''
def __init__(self, domain, idlequeue, caller=None):
'''Preceeds constructor of NBCommonClient and sets some of values that will
be used as attributes in <body> tag'''
self.bosh_sid=None
FAKE_DESCRIPTOR = -1337
'''Fake file descriptor - it's used for setting read_timeout in idlequeue for
BOSH Transport. Timeouts in queue are saved by socket descriptor.
In TCP-derived transports it is file descriptor of socket'''
class NonBlockingBOSH(NonBlockingTransport):
def __init__(self, raise_event, on_disconnect, idlequeue, xmpp_server, domain,
bosh_dict):
NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue)
# with 50-bit random initial rid, session would have to go up
# to 7881299347898368 messages to raise rid over 2**53
# (see http://www.xmpp.org/extensions/xep-0124.html#rids)
r = random.Random()
r.seed()
global FAKE_DESCRIPTOR
FAKE_DESCRIPTOR = FAKE_DESCRIPTOR - 1
self.fake_fd = FAKE_DESCRIPTOR
self.bosh_rid = r.getrandbits(50)
self.bosh_sid = None
if locale.getdefaultlocale()[0]:
self.bosh_xml_lang = locale.getdefaultlocale()[0].split('_')[0]
else:
self.bosh_xml_lang = 'en'
self.http_version = 'HTTP/1.1'
self.http_persistent = False
self.http_pipelining = False
self.bosh_to = domain
#self.Namespace = protocol.NS_HTTP_BIND
#self.defaultNamespace = self.Namespace
self.bosh_session_on = False
self.route_host, self.route_port = xmpp_server
NBCommonClient.__init__(self, domain, idlequeue, caller)
self.bosh_wait = bosh_dict['bosh_wait']
self.bosh_hold = bosh_dict['bosh_hold']
self.bosh_host = bosh_dict['host']
self.bosh_port = bosh_dict['port']
self.bosh_content = bosh_dict['bosh_content']
self.http_socks = []
self.stanzas_to_send = []
self.prio_bosh_stanza = None
self.current_recv_handler = None
# if proxy_host .. do sth about HTTP proxy etc.
def connect(self, conn_5tuple, on_connect, on_connect_failure):
NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure)
self.http_socks.append(self.get_http_socket())
self.tcp_connection_started()
# this connect() is not needed because sockets can be connected on send but
# we need to know if host is reachable in order to invoke callback for
# failure when connecting (it's different than callback for errors
# occurring after connection is etabilished)
self.http_socks[0].connect(
conn_5tuple = conn_5tuple,
on_connect = lambda: self._on_connect(self.http_socks[0]),
on_connect_failure = self._on_connect_failure)
def connect(self, on_connect, on_connect_failure, proxy, hostname=None, port=5222,
on_proxy_failure=None, secure=None):
'''
Open XMPP connection (open XML streams in both directions).
:param hostname: hostname of XMPP server from SRV request
:param port: port number of XMPP server
:param on_connect: called after stream is successfully opened
:param on_connect_failure: called when error occures during connection
:param on_proxy_failure: called if error occurres during TCP connection to
proxy server or during connection to the proxy
:param proxy: dictionary with bosh-related paramters. It should contain at
least values for keys 'host' and 'port' - connection details for proxy
server and optionally keys 'user' and 'pass' as proxy credentials
:param secure: if
def get_fd(self):
return self.fake_fd
def on_http_request_possible(self):
'''
NBCommonClient.connect(self, on_connect, on_connect_failure, hostname, port,
on_proxy_failure, proxy, secure)
Called after HTTP response is received - another request is possible.
There should be always one pending request on BOSH CM.
'''
log.info('on_http_req possible, stanzas in list: %s, state:\n%s' %
(self.stanzas_to_send, self.get_current_state()))
# if one of sockets is connecting, sth is about to be sent - we don't have to
# send request from here
for s in self.http_socks:
if s.state==CONNECTING or s.pending_requests>0: return
self.flush_stanzas()
if hostname:
self.route_host = hostname
def flush_stanzas(self):
# another to-be-locked candidate
log.info('flushing stanzas')
if self.prio_bosh_stanza:
tmp = self.prio_bosh_stanza
self.prio_bosh_stanza = None
else:
self.route_host = self.Server
assert(proxy.has_key('type'))
assert(proxy['type']=='bosh')
self.bosh_wait = proxy['bosh_wait']
self.bosh_hold = proxy['bosh_hold']
self.bosh_host = proxy['host']
self.bosh_port = proxy['port']
self.bosh_content = proxy['bosh_content']
# _on_tcp_failure is callback for errors which occur during name resolving or
# TCP connecting.
self._on_tcp_failure = self.on_proxy_failure
tmp = self.stanzas_to_send
self.stanzas_to_send = []
self.send_http(tmp)
# in BOSH, client connects to Connection Manager instead of directly to
# XMPP server ((hostname, port)). If HTTP Proxy is specified, client connects
# to HTTP proxy and Connection Manager is specified at URI and Host header
# in HTTP message
# tcp_host, tcp_port is hostname and port for socket connection - Connection
# Manager or HTTP proxy
if proxy.has_key('proxy_host') and proxy['proxy_host'] and \
proxy.has_key('proxy_port') and proxy['proxy_port']:
tcp_host=proxy['proxy_host']
tcp_port=proxy['proxy_port']
def send(self, stanza, now=False):
# body tags should be send only via send_http()
assert(not isinstance(stanza, BOSHBody))
now = True
if now:
self.send_http([stanza])
else:
self.stanzas_to_send.append(stanza)
# user and password for HTTP proxy
if proxy.has_key('user') and proxy['user'] and \
proxy.has_key('pass') and proxy['pass']:
proxy_creds=(proxy['user'],proxy['pass'])
def send_http(self, payload):
# "Protocol" and string/unicode stanzas should be sent via send()
# (only initiating and terminating BOSH stanzas should be send via send_http)
assert(isinstance(payload, list) or isinstance(payload, BOSHBody))
log.warn('send_http: stanzas: %s\n%s' % (payload, self.get_current_state()))
if isinstance(payload, list):
bosh_stanza = self.boshify_stanzas(payload)
else:
# bodytag_payload is <body ...>, we don't boshify, only add the rid
bosh_stanza = payload
picked_sock = self.pick_socket()
if picked_sock:
log.info('sending to socket %s' % id(picked_sock))
bosh_stanza.setAttr('rid', self.get_rid())
picked_sock.send(bosh_stanza)
else:
# no socket was picked but one is about to connect - save the stanza and
# return
if self.prio_bosh_stanza:
payload = self.merge_stanzas(payload, self.prio_bosh_stanza)
if payload is None:
log.error('Error in BOSH socket handling - unable to send %s because %s\
is already about to be sent' % (payload, self.prio_bosh_stanza))
self.disconnect()
self.prio_bosh_stanza = payload
def merge_stanzas(self, s1, s2):
if isinstance(s1, BOSHBody):
if isinstance(s2, BOSHBody):
# both are boshbodies
return
else:
proxy_creds=(None, None)
s1.setPayload(s2, add=True)
return s1
elif isinstance(s2, BOSHBody):
s2.setPayload(s1, add=True)
return s2
else:
tcp_host = transports_nb.urisplit(proxy['host'])[1]
tcp_port=proxy['port']
#both are lists
s1.extend(s2)
return s1
if tcp_host is None:
self._on_connect_failure("Invalid BOSH URI")
def get_current_state(self):
t = '\t\tSOCKET_ID\tSOCKET_STATE\tPENDING_REQS\n'
for s in self.http_socks:
t = '%s\t\t%s\t%s\t%s\n' % (t,id(s), s.state, s.pending_requests)
return t
def pick_socket(self):
# try to pick connected socket with no pending reqs
for s in self.http_socks:
if s.state == CONNECTED and s.pending_requests == 0:
return s
# try to connect some disconnected socket
for s in self.http_socks:
if s.state==DISCONNECTED:
self.connect_and_flush(s)
return
self.socket = self.get_socket()
self._resolve_hostname(
hostname=tcp_host,
port=tcp_port,
on_success=self._try_next_ip,
on_failure=self._on_tcp_failure)
def _on_stream_start(self):
'''
Called after XMPP stream is opened. In BOSH, TLS is negotiated on socket
connect so success callback can be invoked after TCP connect.
(authentication is started from auth() method)
'''
self.onreceive(None)
if self.connected == 'tcp':
self._on_connect()
def get_socket(self):
tmp = transports_nb.NonBlockingHTTP(
raise_event=self.raise_event,
on_disconnect=self.on_http_disconnect,
http_uri = self.bosh_host,
http_port = self.bosh_port,
http_version = self.http_version
)
tmp.PlugIn(self)
return tmp
def on_http_disconnect(self):
log.info('HTTP socket disconnected')
#import traceback
#traceback.print_stack()
if self.bosh_session_on:
self.socket.connect(
conn_5tuple=self.current_ip,
on_connect=self.on_http_reconnect,
on_connect_failure=self.on_disconnect)
else:
self.on_disconnect()
def on_http_reconnect(self):
self.socket._plug_idle()
log.info('Connected to BOSH CM again')
pass
# if there is any just-connecting socket, it will send the data in its
# connect callback
for s in self.http_socks:
if s.state==CONNECTING:
return
# being here means there are only CONNECTED scokets with pending requests.
# Lets create and connect another one
s = self.get_http_socket()
self.http_socks.append(s)
self.connect_and_flush(s)
return
def on_http_reconnect_fail(self):
log.error('Error when reconnecting to BOSH CM')
self.on_disconnect()
def send(self, stanza, now = False):
(id, stanza_to_send) = self.Dispatcher.assign_id(stanza)
def connect_and_flush(self, socket):
socket.connect(
conn_5tuple = self.conn_5tuple,
on_connect = self.flush_stanzas,
on_connect_failure = self.disconnect)
self.socket.send(
self.boshify_stanza(stanza_to_send),
now = now)
return id
def get_rid(self):
# does this need a lock??"
self.bosh_rid = self.bosh_rid + 1
return str(self.bosh_rid)
def boshify_stanzas(self, stanzas=[], body_attrs=None):
''' wraps zero to many stanzas by body tag with xmlns and sid '''
log.debug('boshify_staza - type is: %s, stanza is %s' % (type(stanzas), stanzas))
tag = BOSHBody(attrs={'sid': self.bosh_sid})
tag.setPayload(stanzas)
return tag
def get_bodytag(self):
# this should be called not until after session creation response so sid has
# to be initialized.
assert(hasattr(self, 'bosh_sid'))
return protocol.BOSHBody(
attrs={ 'rid': self.get_rid(),
'sid': self.bosh_sid})
def get_initial_bodytag(self, after_SASL=False):
tag = protocol.BOSHBody(
return BOSHBody(
attrs={'content': self.bosh_content,
'hold': str(self.bosh_hold),
'route': '%s:%s' % (self.route_host, self.Port),
'route': '%s:%s' % (self.route_host, self.route_port),
'to': self.bosh_to,
'wait': str(self.bosh_wait),
'rid': self.get_rid(),
'xml:lang': self.bosh_xml_lang,
'xmpp:version': '1.0',
'ver': '1.6',
'xmlns:xmpp': 'urn:xmpp:xbosh'})
if after_SASL:
tag.delAttr('content')
tag.delAttr('hold')
tag.delAttr('route')
tag.delAttr('wait')
tag.delAttr('ver')
# xmpp:restart attribute is essential for stream restart request
tag.setAttr('xmpp:restart','true')
tag.setAttr('sid',self.bosh_sid)
return tag
def get_after_SASL_bodytag(self):
return BOSHBody(
attrs={ 'to': self.bosh_to,
'sid': self.bosh_sid,
'xml:lang': self.bosh_xml_lang,
'xmpp:version': '1.0',
'xmpp:restart': 'true',
'xmlns:xmpp': 'urn:xmpp:xbosh'})
def get_closing_bodytag(self):
closing_bodytag = self.get_bodytag()
closing_bodytag.setAttr('type', 'terminate')
return closing_bodytag
return BOSHBody(attrs={'sid': self.bosh_sid, 'type': 'terminate'})
def get_rid(self):
self.bosh_rid = self.bosh_rid + 1
return str(self.bosh_rid)
def boshify_stanza(self, stanza=None, body_attrs=None):
''' wraps stanza by body tag with rid and sid '''
#log.info('boshify_staza - type is: %s, stanza is %s' % (type(stanza), stanza))
tag = self.get_bodytag()
tag.setPayload([stanza])
return tag
def get_http_socket(self):
s = NonBlockingHTTP(
raise_event=self.raise_event,
on_disconnect=self.disconnect,
idlequeue = self.idlequeue,
on_http_request_possible = self.on_http_request_possible,
http_uri = self.bosh_host,
http_port = self.bosh_port,
http_version = self.http_version)
if self.current_recv_handler:
s.onreceive(self.current_recv_handler)
return s
def onreceive(self, recv_handler):
if recv_handler is None:
recv_handler = self._owner.Dispatcher.ProcessNonBlocking
self.current_recv_handler = recv_handler
for s in self.http_socks:
s.onreceive(recv_handler)
def on_bodytag_attrs(self, body_attrs):
#log.info('on_bodytag_attrs: %s' % body_attrs)
if body_attrs.has_key('type'):
if body_attrs['type']=='terminated':
# BOSH session terminated
self.bosh_session_on = False
elif body_attrs['type']=='error':
# recoverable error
pass
if not self.bosh_sid:
# initial response - when bosh_sid is set
self.bosh_session_on = True
self.bosh_sid = body_attrs['sid']
self.Dispatcher.Stream._document_attrs['id']=body_attrs['authid']
def disconnect(self, do_callback=True):
if self.state == DISCONNECTED: return
for s in self.http_socks:
s.disconnect(do_callback=False)
NonBlockingTransport.disconnect(self, do_callback)

View File

@ -48,7 +48,7 @@ class PlugIn:
else:
owner.__dict__[self.__class__.__name__]=self
# following will not work for classes inheriting plugin()
# following commented line will not work for classes inheriting plugin()
#if self.__class__.__dict__.has_key('plugin'): return self.plugin(owner)
if hasattr(self,'plugin'): return self.plugin(owner)

View File

@ -23,7 +23,7 @@ These classes can be used for simple applications "AS IS" though.
import socket
import transports_nb, tls_nb, dispatcher_nb, auth_nb, roster_nb, protocol
import transports_nb, tls_nb, dispatcher_nb, auth_nb, roster_nb, protocol, bosh
from client import *
import logging
@ -49,7 +49,7 @@ class NBCommonClient:
self.Server = domain
# caller is who initiated this client, it is sed to register the EventDispatcher
# caller is who initiated this client, it is needed to register the EventDispatcher
self._caller = caller
self._owner = self
self._registered_name = None
@ -92,16 +92,8 @@ class NBCommonClient:
self.NonBlockingTCP.PlugOut()
if self.__dict__.has_key('NonBlockingHTTP'):
self.NonBlockingHTTP.PlugOut()
def send(self, stanza, now = False):
''' interface for putting stanzas on wire. Puts ID to stanza if needed and
sends it via socket wrapper'''
(id, stanza_to_send) = self.Dispatcher.assign_id(stanza)
self.Connection.send(stanza_to_send, now = now)
return id
if self.__dict__.has_key('NonBlockingBOSH'):
self.NonBlockingBOSH.PlugOut()
def connect(self, on_connect, on_connect_failure, hostname=None, port=5222,
@ -177,7 +169,7 @@ class NBCommonClient:
started, and _on_connect_failure on failure.
'''
#FIXME: use RegisterHandlerOnce instead of onreceive
log.info('========xmpp_connect_machine() >> mode: %s, data: %s' % (mode,str(data)[:20] ))
log.info('-------------xmpp_connect_machine() >> mode: %s, data: %s' % (mode,str(data)[:20] ))
def on_next_receive(mode):
log.info('setting %s on next receive' % mode)
@ -187,7 +179,8 @@ class NBCommonClient:
self.onreceive(lambda _data:self._xmpp_connect_machine(mode, _data))
if not mode:
dispatcher_nb.Dispatcher().PlugIn(self)
# starting state
d=dispatcher_nb.Dispatcher().PlugIn(self)
on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES')
elif mode == 'FAILURE':
@ -205,7 +198,7 @@ class NBCommonClient:
if not self.Dispatcher.Stream.features:
on_next_receive('RECEIVE_STREAM_FEATURES')
else:
log.info('got STREAM FEATURES in first read')
log.info('got STREAM FEATURES in first recv')
self._xmpp_connect_machine(mode='STREAM_STARTED')
else:
@ -222,7 +215,7 @@ class NBCommonClient:
mode='FAILURE',
data='Missing <features> in 1.0 stream')
else:
log.info('got STREAM FEATURES in second read')
log.info('got STREAM FEATURES in second recv')
self._xmpp_connect_machine(mode='STREAM_STARTED')
elif mode == 'STREAM_STARTED':
@ -244,7 +237,7 @@ class NBCommonClient:
self.on_connect(self, self.connected)
def raise_event(self, event_type, data):
log.info('raising event from transport: %s %s' % (event_type,data))
log.info('raising event from transport: :::::%s::::\n_____________\n%s\n_____________\n' % (event_type,data))
if hasattr(self, 'Dispatcher'):
self.Dispatcher.Event('', event_type, data)
@ -272,8 +265,9 @@ class NBCommonClient:
''' get the ip address of the account, from which is made connection
to the server , (e.g. me).
We will create listening socket on the same ip '''
if hasattr(self, 'Connection'):
return self.Connection._sock.getsockname()
# FIXME: tuple (ip, port) is expected (and checked for) but port num is useless
if hasattr(self, 'socket'):
return self.socket.peerhost
def auth(self, user, password, resource = '', sasl = 1, on_auth = None):
@ -364,6 +358,7 @@ class NonBlockingClient(NBCommonClient):
def __init__(self, domain, idlequeue, caller=None):
NBCommonClient.__init__(self, domain, idlequeue, caller)
self.protocol_type = 'XMPP'
def connect(self, on_connect, on_connect_failure, hostname=None, port=5222,
on_proxy_failure=None, proxy=None, secure=None):
@ -379,35 +374,33 @@ class NonBlockingClient(NBCommonClient):
if proxy:
# with proxies, client connects to proxy instead of directly to
# XMPP server ((hostname, port))
# tcp_host is machine used for socket connection
tcp_host=proxy['host']
tcp_port=proxy['port']
# tcp_host is hostname of machine used for socket connection
# (DNS request will be done for this hostname)
tcp_host, tcp_port, proxy_user, proxy_pass = \
transports_nb.get_proxy_data_from_dict(proxy)
self._on_tcp_failure = self.on_proxy_failure
if proxy.has_key('type'):
assert(proxy['type']!='bosh')
if proxy.has_key('user') and proxy.has_key('pass'):
proxy_creds=(proxy['user'],proxy['pass'])
else:
proxy_creds=(None, None)
type_ = proxy['type']
if type_ == 'socks5':
# SOCKS5 proxy
self.socket = transports_nb.NBSOCKS5ProxySocket(
if proxy['type'] == 'bosh':
self.socket = bosh.NonBlockingBOSH(
on_disconnect=self.on_disconnect,
proxy_creds=proxy_creds,
xmpp_server=(xmpp_hostname, self.Port))
elif type_ == 'http':
# HTTP CONNECT to proxy
self.socket = transports_nb.NBHTTPProxySocket(
on_disconnect=self.on_disconnect,
proxy_creds=proxy_creds,
xmpp_server=(xmpp_hostname, self.Port))
raise_event = self.raise_event,
idlequeue = self.idlequeue,
xmpp_server=(xmpp_hostname, self.Port),
domain = self.Server,
bosh_dict = proxy)
self.protocol_type = 'BOSH'
else:
# HTTP CONNECT to proxy from environment variables
self.socket = transports_nb.NBHTTPProxySocket(
if proxy['type'] == 'socks5':
proxy_class = transports_nb.NBSOCKS5ProxySocket
elif proxy['type'] == 'http':
proxy_class = transports_nb.NBHTTPProxySocket
self.socket = proxy_class(
on_disconnect=self.on_disconnect,
proxy_creds=(None, None),
raise_event = self.raise_event,
idlequeue = self.idlequeue,
proxy_creds=(proxy_user, proxy_pass),
xmpp_server=(xmpp_hostname, self.Port))
else:
self._on_tcp_failure = self._on_connect_failure
@ -415,6 +408,7 @@ class NonBlockingClient(NBCommonClient):
tcp_port=self.Port
self.socket = transports_nb.NonBlockingTCP(
raise_event = self.raise_event,
idlequeue = self.idlequeue,
on_disconnect = self.on_disconnect)
self.socket.PlugIn(self)

View File

@ -42,8 +42,6 @@ XML_DECLARATION = '<?xml version=\'1.0\'?>'
# FIXME: ugly
from client_nb import NonBlockingClient
from bosh import BOSHClient
class Dispatcher():
# Why is this here - I needed to redefine Dispatcher for BOSH and easiest way
# was to inherit original Dispatcher (now renamed to XMPPDispatcher). Trouble
@ -53,9 +51,9 @@ class Dispatcher():
# If having two kinds of dispatcher will go well, I will rewrite the
def PlugIn(self, client_obj, after_SASL=False):
if isinstance(client_obj, NonBlockingClient):
if client_obj.protocol_type == 'XMPP':
XMPPDispatcher().PlugIn(client_obj)
elif isinstance(client_obj, BOSHClient):
elif client_obj.protocol_type == 'BOSH':
BOSHDispatcher().PlugIn(client_obj, after_SASL)
@ -76,8 +74,8 @@ class XMPPDispatcher(PlugIn):
self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler, \
self.RegisterEventHandler, self.UnregisterCycleHandler, self.RegisterCycleHandler, \
self.RegisterHandlerOnce, self.UnregisterHandler, self.RegisterProtocol, \
self.SendAndWaitForResponse, self.assign_id, self.StreamTerminate, \
self.SendAndCallForResponse, self.getAnID, self.Event]
self.SendAndWaitForResponse, self.StreamTerminate, \
self.SendAndCallForResponse, self.getAnID, self.Event, self.send]
def getAnID(self):
global ID
@ -112,10 +110,7 @@ class XMPPDispatcher(PlugIn):
self._owner.lastErrNode = None
self._owner.lastErr = None
self._owner.lastErrCode = None
if hasattr(self._owner, 'StreamInit'):
self._owner.StreamInit()
else:
self.StreamInit()
self.StreamInit()
def plugout(self):
''' Prepares instance to be destructed. '''
@ -165,6 +160,7 @@ class XMPPDispatcher(PlugIn):
self.Stream.Parse(data)
# end stream:stream tag received
if self.Stream and self.Stream.has_received_endtag():
# FIXME call client method
self._owner.Connection.disconnect()
return 0
except ExpatError:
@ -414,25 +410,19 @@ class XMPPDispatcher(PlugIn):
''' Put stanza on the wire and call back when recipient replies.
Additional callback arguments can be specified in args. '''
self.SendAndWaitForResponse(stanza, 0, func, args)
def assign_id(self, stanza):
''' Assign an unique ID to stanza and return assigned ID.'''
if type(stanza) in [type(''), type(u'')]:
return (None, stanza)
if not isinstance(stanza, Protocol):
_ID=None
elif not stanza.getID():
global ID
ID+=1
_ID=`ID`
stanza.setID(_ID)
else:
_ID=stanza.getID()
if self._owner._registered_name and not stanza.getAttr('from'):
stanza.setAttr('from', self._owner._registered_name)
stanza.setNamespace(self._owner.Namespace)
stanza.setParent(self._metastream)
return (_ID, stanza)
def send(self, stanza, now=False):
id = None
if type(stanza) not in [type(''), type(u'')]:
if isinstance(stanza, Protocol):
id = stanza.getID()
if id is None:
stanza.setID(self.getAnID())
id = stanza.getID()
if self._owner._registered_name and not stanza.getAttr('from'):
stanza.setAttr('from', self._owner._registered_name)
self._owner.Connection.send(stanza, now)
return id
class BOSHDispatcher(XMPPDispatcher):
@ -458,12 +448,16 @@ class BOSHDispatcher(XMPPDispatcher):
locale.getdefaultlocale()[0].split('_')[0])
self.restart = True
self._owner.Connection.send(self._owner.get_initial_bodytag(self.after_SASL))
if self.after_SASL:
self._owner.Connection.send_http(self._owner.Connection.get_after_SASL_bodytag())
else:
self._owner.Connection.send_http(self._owner.Connection.get_initial_bodytag())
def StreamTerminate(self):
''' Send a stream terminator. '''
self._owner.Connection.send(self._owner.get_closing_bodytag())
self._owner.Connection.send_http(self._owner.Connection.get_closing_bodytag())
def ProcessNonBlocking(self, data=None):
@ -478,10 +472,31 @@ class BOSHDispatcher(XMPPDispatcher):
def dispatch(self, stanza, session=None, direct=0):
if stanza.getName()=='body' and stanza.getNamespace()==NS_HTTP_BIND:
self._owner.on_bodytag_attrs(stanza.getAttrs())
#self._owner.send_empty_bodytag()
for child in stanza.getChildren():
XMPPDispatcher.dispatch(self, child, session, direct)
stanza_attrs = stanza.getAttrs()
if stanza_attrs.has_key('authid'):
# should be only in init response
# auth module expects id of stream in document attributes
self.Stream._document_attrs['id'] = stanza_attrs['authid']
if stanza_attrs.has_key('sid'):
# session ID should be only in init response
self._owner.Connection.bosh_sid = stanza_attrs['sid']
if stanza_attrs.has_key('terminate'):
# staznas under body still should be passed to XMPP dispatcher
self._owner.on_disconnect()
if stanza_attrs.has_key('error'):
# recoverable error
pass
children = stanza.getChildren()
if children:
for child in children:
XMPPDispatcher.dispatch(self, child, session, direct)
else:
XMPPDispatcher.dispatch(self, stanza, session, direct)

View File

@ -15,6 +15,7 @@
import select
import logging
log = logging.getLogger('gajim.c.x.idlequeue')
log.setLevel(logging.DEBUG)
class IdleObject:
''' base class for all idle listeners, these are the methods, which are called from IdleQueue
@ -36,7 +37,7 @@ class IdleObject:
pass
def read_timeout(self):
''' called when timeout has happend '''
''' called when timeout happened '''
pass
class IdleQueue:
@ -55,7 +56,8 @@ class IdleQueue:
self.selector = select.poll()
def remove_timeout(self, fd):
log.debug('read timeout removed for fd %s' % fd)
#log.debug('read timeout removed for fd %s' % fd)
print 'read timeout removed for fd %s' % fd
if self.read_timeouts.has_key(fd):
del(self.read_timeouts[fd])
@ -71,11 +73,13 @@ class IdleQueue:
def set_read_timeout(self, fd, seconds):
''' set a new timeout, if it is not removed after 'seconds',
then obj.read_timeout() will be called '''
log.debug('read timeout set for fd %s on %s seconds' % (fd, seconds))
#log.debug('read timeout set for fd %s on %s seconds' % (fd, seconds))
print 'read timeout set for fd %s on %s seconds' % (fd, seconds)
timeout = self.current_time() + seconds
self.read_timeouts[fd] = timeout
def check_time_events(self):
print 'check time evs'
current_time = self.current_time()
for fd, timeout in self.read_timeouts.items():
if timeout > current_time:
@ -134,6 +138,7 @@ class IdleQueue:
return False
if flags & 3: # waiting read event
#print 'waiting read on %d, flags are %d' % (fd, flags)
obj.pollin()
return True

View File

@ -300,6 +300,13 @@ class JID:
""" Produce hash of the JID, Allows to use JID objects as keys of the dictionary. """
return hash(self.__str__())
class BOSHBody(Node):
'''
<body> tag that wraps usual XMPP stanzas in XMPP over BOSH
'''
def __init__(self, attrs={}, payload=[], node=None):
Node.__init__(self, tag='body', attrs=attrs, payload=payload, node=node)
self.setNamespace(NS_HTTP_BIND)
class Protocol(Node):
@ -400,13 +407,6 @@ class Protocol(Node):
if item in ['to','from']: val=JID(val)
return self.setAttr(item,val)
class BOSHBody(Protocol):
'''
<body> tag that wraps usual XMPP stanzas in BOSH
'''
def __init__(self, to=None, frm=None, attrs={}, payload=[], node=None):
Protocol.__init__(self, name='body', to=to, frm=frm, attrs=attrs,
payload=payload, xmlns=NS_HTTP_BIND, node=node)
class Message(Protocol):
""" XMPP Message stanza - "push" mechanism."""

View File

@ -3,7 +3,7 @@
##
## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov
## modified by Dimitur Kirov <dkirov@gmail.com>
## modified by Dimitur Kirov <dkirov@gmail.com>
## modified by Tomas Karasek <tom.to.the.k@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
@ -45,6 +45,34 @@ def urisplit(uri):
proto, host, path = grouped[1], grouped[3], grouped[4]
return proto, host, path
def get_proxy_data_from_dict(proxy):
type = proxy['type']
# with http-connect/socks5 proxy, we do tcp connecting to the proxy machine
tcp_host, tcp_port = proxy['host'], proxy['port']
if type == 'bosh':
# in ['host'] is whole URI
tcp_host = urisplit(proxy['host'])[1]
# in BOSH, client connects to Connection Manager instead of directly to
# XMPP server ((hostname, port)). If HTTP Proxy is specified, client connects
# to HTTP proxy and Connection Manager is specified at URI and Host header
# in HTTP message
if proxy.has_key('proxy_host') and proxy.has_key('proxy_port'):
tcp_host, tcp_port = proxy['proxy_host'], proxy['proxy_port']
# user and pass for socks5/http_connect proxy. In case of BOSH, it's user and
# pass for http proxy - If there's no proxy_host they won't be used
if proxy.has_key('user'):
proxy_user = proxy['user']
else:
proxy_user = None
if proxy.has_key('pass'):
proxy_pass = proxy['pass']
else:
proxy_pass = None
return tcp_host, tcp_port, proxy_user, proxy_pass
# timeout to connect to the server socket, it doesn't include auth
CONNECT_TIMEOUT_SECONDS = 30
@ -63,62 +91,72 @@ DATA_SENT='DATA SENT'
DISCONNECTED ='DISCONNECTED'
CONNECTING ='CONNECTING'
CONNECTED ='CONNECTED'
DISCONNECTING ='DISCONNECTING'
# transports have different constructor and same connect
class NonBlockingTransport(PlugIn):
def __init__(self, raise_event, on_disconnect):
def __init__(self, raise_event, on_disconnect, idlequeue):
PlugIn.__init__(self)
self.raise_event = raise_event
self.on_disconnect = on_disconnect
self.on_connect = None
self.on_connect_failure = None
self.idlequeue = None
self.idlequeue = idlequeue
self.on_receive = None
self.server = None
self.port = None
self.state = DISCONNECTED
self._exported_methods=[self.disconnect, self.onreceive]
self._exported_methods=[self.disconnect, self.onreceive, self.set_send_timeout,
self.set_timeout, self.remove_timeout]
# time to wait for SOME stanza to come and then send keepalive
self.sendtimeout = 0
# in case we want to something different than sending keepalives
self.on_timeout = None
def plugin(self, owner):
owner.Connection=self
self.idlequeue = owner.idlequeue
def plugout(self):
self._owner.Connection = None
self._owner = None
def connect(self, conn_5tuple, on_connect, on_connect_failure):
'''
connect method should have the same declaration in all derived transports
'''
assert(self.state == DISCONNECTED)
self.on_connect = on_connect
self.on_connect_failure = on_connect_failure
(self.server, self.port) = conn_5tuple[4][:2]
log.info('NonBlocking Connect :: About tot connect to %s:%s' % (self.server, self.port))
self.conn_5tuple = conn_5tuple
log.info('NonBlocking Connect :: About to connect to %s:%s' % (self.server, self.port))
def set_state(self, newstate):
assert(newstate in [DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING])
if (self.state, newstate) in [(CONNECTING, DISCONNECTING), (DISCONNECTED, DISCONNECTING)]:
log.info('strange move: %s -> %s' % (self.state, newstate))
assert(newstate in [DISCONNECTED, CONNECTING, CONNECTED])
self.state = newstate
def _on_connect(self, data):
''' preceeds call of on_connect callback '''
# data is reference to socket wrapper instance. We don't need it in client
# because
self.peerhost = data._sock.getsockname()
self.set_state(CONNECTED)
self.on_connect()
def _on_connect_failure(self,err_message):
''' preceeds call of on_connect_failure callback '''
# In case of error while connecting we need to close socket
# In case of error while connecting we need to disconnect transport
# but we don't want to call DisconnectHandlers from client,
# thus the do_callback=False
self.disconnect(do_callback=False)
self.on_connect_failure(err_message=err_message)
def send(self, raw_data, now=False):
if self.state not in [CONNECTED, DISCONNECTING]:
if self.state not in [CONNECTED]:
# FIXME better handling needed
log.error('Trying to send %s when transport is %s.' %
(raw_data, self.state))
@ -139,24 +177,49 @@ class NonBlockingTransport(PlugIn):
else:
self.on_receive = None
return
log.info('setting onreceive on %s' % recv_handler)
self.on_receive = recv_handler
def tcp_connection_started(self):
self.set_state(CONNECTING)
# on_connect/on_conn_failure will be called from self.pollin/self.pollout
def read_timeout(self):
if self.on_timeout:
self.on_timeout()
self.renew_send_timeout()
def renew_send_timeout(self):
if self.on_timeout and self.sendtimeout > 0:
self.set_timeout(self.sendtimeout)
else:
self.remove_timeout()
def set_timeout(self, timeout):
self.idlequeue.set_read_timeout(self.get_fd(), timeout)
def get_fd(self):
pass
def remove_timeout(self):
self.idlequeue.remove_timeout(self.get_fd())
def set_send_timeout(self, timeout, on_timeout):
self.sendtimeout = timeout
if self.sendtimeout > 0:
self.on_timeout = on_timeout
else:
self.on_timeout = None
class NonBlockingTCP(NonBlockingTransport, IdleObject):
'''
Non-blocking TCP socket wrapper
'''
def __init__(self, raise_event, on_disconnect):
def __init__(self, raise_event, on_disconnect, idlequeue):
'''
Class constructor.
'''
NonBlockingTransport.__init__(self, raise_event, on_disconnect)
NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue)
# writable, readable - keep state of the last pluged flags
# This prevents replug of same object with the same flags
self.writable = True
@ -165,23 +228,16 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
# queue with messages to be send
self.sendqueue = []
# time to wait for SOME stanza to come and then send keepalive
self.sendtimeout = 0
# in case we want to something different than sending keepalives
self.on_timeout = None
# bytes remained from the last send message
self.sendbuff = ''
self._exported_methods=[self.disconnect, self.onreceive, self.set_send_timeout,
self.set_timeout, self.remove_timeout]
def get_fd(self):
try:
tmp = self._sock.fileno()
return tmp
except:
except socket.error, (errnum, errstr):
log.error('Trying to get file descriptor of not-connected socket: %s' % errstr )
return 0
def connect(self, conn_5tuple, on_connect, on_connect_failure):
@ -205,6 +261,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._recv = self._sock.recv
self.fd = self._sock.fileno()
self.idlequeue.plug_idle(self, True, False)
self.peerhost = None
errnum = 0
''' variable for errno symbol that will be found from exception raised from connect() '''
@ -221,11 +278,11 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
# connecting in progress
log.info('After connect. "%s" raised => CONNECTING' % errstr)
log.info('After NB connect() of %s. "%s" raised => CONNECTING' % (id(self),errstr))
self.tcp_connection_started()
return
elif errnum in (0, 10056, errno.EISCONN):
# already connected - this branch is very unlikely, nonblocking connect() will
# already connected - this branch is probably useless, nonblocking connect() will
# return EINPROGRESS exception in most cases. When here, we don't need timeout
# on connected descriptor and success callback can be called.
log.info('After connect. "%s" raised => CONNECTED' % errstr)
@ -240,6 +297,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
def _on_connect(self, data):
''' with TCP socket, we have to remove send-timeout '''
self.idlequeue.remove_timeout(self.get_fd())
NonBlockingTransport._on_connect(self, data)
@ -253,6 +311,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
log.info('pollout called, state == %s' % self.state)
if self.state==CONNECTING:
log.info('%s socket wrapper connected' % id(self))
self._on_connect(self)
return
self._do_send()
@ -288,30 +347,17 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._on_connect_failure('Error during connect to %s:%s' %
(self.server, self.port))
else:
if self.on_timeout:
self.on_timeout()
self.renew_send_timeout()
NonBlockingTransport.read_timeout(self)
def renew_send_timeout(self):
if self.on_timeout and self.sendtimeout > 0:
self.set_timeout(self.sendtimeout)
else:
self.remove_timeout()
def set_send_timeout(self, timeout, on_timeout):
self.sendtimeout = timeout
if self.sendtimeout > 0:
self.on_timeout = on_timeout
else:
self.on_timeout = None
def set_timeout(self, timeout):
if self.state in [CONNECTING, CONNECTED] and self.get_fd() > 0:
self.idlequeue.set_read_timeout(self.get_fd(), timeout)
NonBlockingTransport.set_timeout(self, timeout)
def remove_timeout(self):
if self.get_fd():
self.idlequeue.remove_timeout(self.get_fd())
NonBlockingTransport.remove_timeout(self)
def send(self, raw_data, now=False):
'''Append raw_data to the queue of messages to be send.
@ -415,46 +461,50 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
else:
# This should never happen, so we need the debug. (If there is no handler
# on receive spacified, data are passed to Dispatcher.ProcessNonBlocking)
log.error('SOCKET Unhandled data received: %s' % received)
log.error('SOCKET %s Unhandled data received: %s' % (id(self), received))
import traceback
traceback.print_stack()
self.disconnect()
def _on_receive(self,data):
'''Preceeds passing received data to Client class. Gets rid of HTTP headers
and checks them.'''
''' preceeds on_receive callback. It peels off and checks HTTP headers in
class, in here it just calls the callback.'''
self.on_receive(data)
class NonBlockingHTTP(NonBlockingTCP):
'''
Socket wrapper that cretes HTTP message out of sent data and peels-off
HTTP headers from incoming messages
'''
def __init__(self, raise_event, on_disconnect, http_uri, http_port, http_version=None):
def __init__(self, raise_event, on_disconnect, idlequeue, on_http_request_possible,
http_uri, http_port, http_version='HTTP/1.1'):
self.http_protocol, self.http_host, self.http_path = urisplit(http_uri)
if self.http_protocol is None:
self.http_protocol = 'http'
if self.http_path == '':
http_path = '/'
self.http_port = http_port
if http_version:
self.http_version = http_version
else:
self.http_version = 'HTTP/1.1'
self.http_version = http_version
# buffer for partial responses
self.recvbuff = ''
self.expected_length = 0
NonBlockingTCP.__init__(self, raise_event, on_disconnect)
self.pending_requests = 0
self.on_http_request_possible = on_http_request_possible
NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue)
def send(self, raw_data, now=False):
NonBlockingTCP.send(
self,
self.build_http_message(raw_data),
now)
self.pending_requests += 1
def _on_receive(self,data):
'''Preceeds passing received data to Client class. Gets rid of HTTP headers
'''Preceeds passing received data to owner class. Gets rid of HTTP headers
and checks them.'''
if not self.recvbuff:
# recvbuff empty - fresh HTTP message was received
@ -470,7 +520,8 @@ class NonBlockingHTTP(NonBlockingTCP):
if self.expected_length > len(self.recvbuff):
# If we haven't received the whole HTTP mess yet, let's end the thread.
# It will be finnished from one of following poll calls on plugged socket.
# It will be finnished from one of following polls (io_watch) on plugged socket.
log.info('not enough bytes - %d expected, %d got' % (self.expected_length, len(self.recvbuff)))
return
# FIXME the reassembling doesn't work - Connection Manager on jabbim.cz
@ -481,8 +532,13 @@ class NonBlockingHTTP(NonBlockingTCP):
self.recvbuff=''
self.expected_length=0
self.pending_requests -= 1
assert(self.pending_requests >= 0)
# not-persistent connections
self.disconnect(do_callback = False)
self.on_receive(httpbody)
self.on_http_request_possible()
def build_http_message(self, httpbody, method='POST'):
'''
@ -512,7 +568,7 @@ class NonBlockingHTTP(NonBlockingTCP):
message = message.replace('\r','')
(header, httpbody) = message.split('\n\n',1)
header = header.split('\n')
statusline = header[0].split(' ')
statusline = header[0].split(' ',2)
header = header[1:]
headers = {}
for dummy in header:
@ -521,16 +577,16 @@ class NonBlockingHTTP(NonBlockingTCP):
return (statusline, headers, httpbody)
class NBProxySocket(NonBlockingTCP):
'''
Interface for proxy socket wrappers - when tunnneling XMPP over proxies,
some connecting process usually has to be done before opening stream.
'''
def __init__(self, raise_event, on_disconnect, xmpp_server, proxy_creds=(None,None)):
def __init__(self, raise_event, on_disconnect, idlequeue, xmpp_server,
proxy_creds=(None,None)):
self.proxy_user, self.proxy_pass = proxy_creds
self.xmpp_server = xmpp_server
NonBlockingTCP.__init__(self, raise_event, on_disconnect)
NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue)
def connect(self, conn_5tuple, on_connect, on_connect_failure):
@ -552,7 +608,6 @@ class NBProxySocket(NonBlockingTCP):
pass
class NBHTTPProxySocket(NBProxySocket):
''' This class can be used instead of NonBlockingTCP
HTTP (CONNECT) proxy connection class. Allows to use HTTP proxies like squid with

View File

@ -1,5 +1,5 @@
'''
Unit test for NonBlockingTcp tranport.
Unit test for NonBlockingTCP tranport.
'''
import unittest
@ -38,7 +38,7 @@ class MockClient(IdleMock):
IdleMock.__init__(self)
def do_connect(self):
self.socket=transports_nb.NonBlockingTcp(
self.socket=transports_nb.NonBlockingTCP(
on_disconnect=lambda: self.on_success(mode='SocketDisconnect')
)
@ -73,7 +73,7 @@ class MockClient(IdleMock):
class TestNonBlockingTcp(unittest.TestCase):
class TestNonBlockingTCP(unittest.TestCase):
def setUp(self):
self.idlequeue_thread = IdleQueueThread()
self.idlequeue_thread.start()
@ -100,6 +100,6 @@ class TestNonBlockingTcp(unittest.TestCase):
if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingTcp)
suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingTCP)
unittest.TextTestRunner(verbosity=2).run(suite)