From f3820706fb6e0f9057bf35443ff1b8cf12922ab8 Mon Sep 17 00:00:00 2001 From: tomk Date: Mon, 30 Jun 2008 00:02:32 +0000 Subject: [PATCH] - Refactored non-blocking transport and client classes - getaddrinfo is called in Client now - Added NonBlockingHttpBOSH transport (to tranports_nb) and BOSHClient (to client_nb) - Extended possible proxy types in configuration by "BOSH" proxy - Rewrote NonBlockingTLS to invoke success callback only after successful TLS handshake is over (formerly, the TLS Plugin returned right after sending ) --- data/glade/manage_proxies_window.glade | 3 +- src/common/connection.py | 67 +- src/common/xmpp/auth_nb.py | 3 + src/common/xmpp/client_nb.py | 595 +++++++------ src/common/xmpp/debug.py | 1 + src/common/xmpp/dispatcher_nb.py | 39 +- src/common/xmpp/idlequeue.py | 2 +- src/common/xmpp/protocol.py | 1107 ++++++++++++------------ src/common/xmpp/transports_nb.py | 1046 +++++++++++----------- src/common/xmpp/transports_new.py | 270 ------ src/config.py | 4 +- test/test_client_nb.py | 59 +- test/test_nonblockingtcp.py | 84 +- 13 files changed, 1555 insertions(+), 1725 deletions(-) delete mode 100644 src/common/xmpp/transports_new.py diff --git a/data/glade/manage_proxies_window.glade b/data/glade/manage_proxies_window.glade index 758cfcce7..b28beef97 100644 --- a/data/glade/manage_proxies_window.glade +++ b/data/glade/manage_proxies_window.glade @@ -211,7 +211,8 @@ True HTTP Connect -SOCKS5 +SOCKS5 +BOSH False True diff --git a/src/common/connection.py b/src/common/connection.py index 493140fe0..edb788aa3 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -55,6 +55,7 @@ from common.rst_xhtml_generator import create_xhtml from string import Template import logging log = logging.getLogger('gajim.c.connection') +log.setLevel(logging.DEBUG) ssl_error = { 2: _("Unable to get issuer certificate"), @@ -207,7 +208,7 @@ class Connection(ConnectionHandlers): def _disconnectedReconnCB(self): '''Called when we are disconnected''' - log.debug('disconnectedReconnCB') + log.error('disconnectedReconnCB') if gajim.account_is_connected(self.name): # we cannot change our status to offline or connecting # after we auth to server @@ -467,7 +468,6 @@ class Connection(ConnectionHandlers): proxy = None else: proxy = None - h = hostname p = 5222 ssl_p = 5223 @@ -504,7 +504,7 @@ class Connection(ConnectionHandlers): self.connect_to_next_host() def on_proxy_failure(self, reason): - log.debug('Connection to proxy failed') + log.error('Connection to proxy failed: %s' % reason) self.time_to_reconnect = None self.on_connect_failure = None self.disconnect(on_purpose = True) @@ -519,23 +519,6 @@ class Connection(ConnectionHandlers): self.last_connection.socket.disconnect() self.last_connection = None self.connection = None - if gajim.verbose: - con = common.xmpp.NonBlockingClient(self._hostname, caller = self, - on_connect = self.on_connect_success, - on_proxy_failure = self.on_proxy_failure, - on_connect_failure = self.connect_to_next_type) - else: - con = common.xmpp.NonBlockingClient(self._hostname, debug = [], - caller = self, on_connect = self.on_connect_success, - on_proxy_failure = self.on_proxy_failure, - on_connect_failure = self.connect_to_next_type) - self.last_connection = con - # increase default timeout for server responses - common.xmpp.dispatcher_nb.DEFAULT_TIMEOUT_SECONDS = self.try_connecting_for_foo_secs - con.set_idlequeue(gajim.idlequeue) - # FIXME: this is a hack; need a better way - if self.on_connect_success == self._on_new_account: - con.RegisterDisconnectHandler(self._on_new_account) if self._current_type == 'ssl': port = self._current_host['ssl_port'] @@ -546,9 +529,40 @@ class Connection(ConnectionHandlers): secur = 0 else: secur = None + + if self._proxy and self._proxy['type'] == 'bosh': + clientClass = common.xmpp.BOSHClient + else: + clientClass = common.xmpp.NonBlockingClient + + if gajim.verbose: + con = common.xmpp.NonBlockingClient( + hostname=self._current_host['host'], + port=port, + caller=self, + idlequeue=gajim.idlequeue) + else: + con = common.xmpp.NonBlockingClient( + hostname=self._current_host['host'], + debug=[], + port=port, + caller=self, + idlequeue=gajim.idlequeue) + + self.last_connection = con + # increase default timeout for server responses + common.xmpp.dispatcher_nb.DEFAULT_TIMEOUT_SECONDS = self.try_connecting_for_foo_secs + # FIXME: this is a hack; need a better way + if self.on_connect_success == self._on_new_account: + con.RegisterDisconnectHandler(self._on_new_account) + log.info('Connecting to %s: [%s:%d]', self.name, self._current_host['host'], port) - con.connect((self._current_host['host'], port), proxy=self._proxy, + con.connect( + on_connect=self.on_connect_success, + on_proxy_failure=self.on_proxy_failure, + on_connect_failure=self.connect_to_next_type, + proxy=self._proxy, secure = secur) else: self.connect_to_next_host(retry) @@ -561,6 +575,9 @@ class Connection(ConnectionHandlers): 'connection_types').split() else: self._connection_types = ['tls', 'ssl', 'plain'] + + # FIXME: remove after tls and ssl will be degubbed + #self._connection_types = ['plain'] host = self.select_next_host(self._hosts) self._current_host = host self._hosts.remove(host) @@ -975,7 +992,11 @@ class Connection(ConnectionHandlers): p.setStatus(msg) self.remove_all_transfers() self.time_to_reconnect = None - self.connection.start_disconnect(p, self._on_disconnected) + + self.connection.RegisterDisconnectHandler(self._on_disconnected) + self.connection.send(p) + self.connection.StreamTerminate() + #self.connection.start_disconnect(p, self._on_disconnected) else: self.time_to_reconnect = None self._on_disconnected() @@ -1010,7 +1031,7 @@ class Connection(ConnectionHandlers): def _on_disconnected(self): ''' called when a disconnect request has completed successfully''' self.dispatch('STATUS', 'offline') - self.disconnect() + self.disconnect(on_purpose=True) def get_status(self): return STATUS_LIST[self.connected] diff --git a/src/common/xmpp/auth_nb.py b/src/common/xmpp/auth_nb.py index b2624043f..662b3f60f 100644 --- a/src/common/xmpp/auth_nb.py +++ b/src/common/xmpp/auth_nb.py @@ -169,6 +169,9 @@ class SASL(PlugIn): self.startsasl='success' self.DEBUG('Successfully authenticated with remote server.', 'ok') handlers=self._owner.Dispatcher.dumpHandlers() + print '6' * 79 + print handlers + print '6' * 79 self._owner.Dispatcher.PlugOut() dispatcher_nb.Dispatcher().PlugIn(self._owner) self._owner.Dispatcher.restoreHandlers(handlers) diff --git a/src/common/xmpp/client_nb.py b/src/common/xmpp/client_nb.py index d82fb9e94..f362d02a9 100644 --- a/src/common/xmpp/client_nb.py +++ b/src/common/xmpp/client_nb.py @@ -17,40 +17,61 @@ # $Id: client.py,v 1.52 2006/01/02 19:40:55 normanr Exp $ ''' -Provides PlugIn class functionality to develop extentions for xmpppy. -Also provides Client and Component classes implementations as the -examples of xmpppy structures usage. +Provides Client classes implementations as examples of xmpppy structures usage. These classes can be used for simple applications "AS IS" though. ''' import socket import debug +import random -import transports_nb, dispatcher_nb, auth_nb, roster_nb +import transports_nb, dispatcher_nb, auth_nb, roster_nb, protocol from client import * +import logging +log = logging.getLogger('gajim.c.x.client_nb') + +consoleloghandler = logging.StreamHandler() +consoleloghandler.setLevel(logging.DEBUG) +consoleloghandler.setFormatter( + logging.Formatter('%(levelname)s: %(message)s') +) +log.setLevel(logging.DEBUG) +log.addHandler(consoleloghandler) +log.propagate = False + + class NBCommonClient: ''' Base for Client and Component classes.''' - def __init__(self, server, port=5222, debug=['always', 'nodebuilder'], caller=None, - on_connect=None, on_proxy_failure=None, on_connect_failure=None): - ''' Caches server name and (optionally) port to connect to. "debug" parameter specifies - the debug IDs that will go into debug output. You can either specifiy an "include" - or "exclude" list. The latter is done via adding "always" pseudo-ID to the list. - Full list: ['nodebuilder', 'dispatcher', 'gen_auth', 'SASL_auth', 'bind', 'socket', - 'CONNECTproxy', 'TLS', 'roster', 'browser', 'ibb'] . ''' + def __init__(self, hostname, idlequeue, port=5222, debug=['always', 'nodebuilder'], caller=None): - if isinstance(self, NonBlockingClient): - self.Namespace, self.DBG = 'jabber:client', DBG_CLIENT - elif isinstance(self, NBCommonClient): - self.Namespace, self.DBG = dispatcher_nb.NS_COMPONENT_ACCEPT, DBG_COMPONENT + ''' Caches connection data: + :param hostname: hostname of machine where the XMPP server is running (from Account + of from SRV request) and port to connect to. + :param idlequeue: processing idlequeue + :param port: port of listening XMPP server + :param debug: specifies the debug IDs that will go into debug output. You can either + specifiy an "include" or "exclude" list. The latter is done via adding "always" + pseudo-ID to the list. Full list: ['nodebuilder', 'dispatcher', 'gen_auth', + 'SASL_auth', 'bind', 'socket', 'CONNECTproxy', 'TLS', 'roster', 'browser', 'ibb']. + TODO: get rid of debug.py using + :param caller: calling object - it has to implement certain methods (necessary?) + + ''' + self.DBG = DBG_CLIENT + + self.Namespace = protocol.NS_CLIENT + + self.idlequeue = idlequeue self.defaultNamespace = self.Namespace self.disconnect_handlers = [] - self.Server = server + + # XMPP server and port from account or SRV + self.Server = hostname self.Port = port - # Who initiated this client - # Used to register the EventDispatcher + # caller is who initiated this client, it is sed to register the EventDispatcher self._caller = caller if debug and type(debug) != list: debug = ['always', 'nodebuilder'] @@ -62,20 +83,24 @@ class NBCommonClient: self._registered_name = None self.connected = '' self._component=0 - self.idlequeue = None self.socket = None - self.on_connect = on_connect - self.on_proxy_failure = on_proxy_failure - self.on_connect_failure = on_connect_failure + self.on_connect = None + self.on_proxy_failure = None + self.on_connect_failure = None + self.proxy = None - def set_idlequeue(self, idlequeue): - self.idlequeue = idlequeue - def disconnected(self): - ''' Called on disconnection. Calls disconnect handlers and cleans things up. ''' + def on_disconnect(self): + ''' + Called on disconnection - when connect failure occurs on running connection + (after stream is successfully opened). + Calls disconnect handlers and cleans things up. + ''' + self.connected='' self.DEBUG(self.DBG,'Disconnect detected','stop') for i in reversed(self.disconnect_handlers): + self.DEBUG(self.DBG, 'Calling disc handler %s' % i, 'stop') i() if self.__dict__.has_key('NonBlockingRoster'): self.NonBlockingRoster.PlugOut() @@ -94,96 +119,201 @@ class NBCommonClient: if self.__dict__.has_key('NonBlockingTcp'): self.NonBlockingTcp.PlugOut() - def reconnectAndReauth(self): - ''' Just disconnect. We do reconnecting in connection.py ''' - self.disconnect() - return '' - def connect(self,server=None,proxy=None, ssl=None, on_stream_start = None): - ''' Make a tcp/ip connection, protect it with tls/ssl if possible and start XMPP stream. ''' - if not server: - server = (self.Server, self.Port) - self._Server, self._Proxy, self._Ssl = server , proxy, ssl - self.on_stream_start = on_stream_start + def send(self, stanza, is_message = False, now = False): + ''' interface for putting stanzas on wire. Puts ID to stanza if needed and + sends it via socket wrapper''' + (id, stanza_to_send) = self.Dispatcher.assign_id(stanza) + + if is_message: + # somehow zeroconf-specific + self.Connection.send(stanza_to_send, True, now = now) + else: + self.Connection.send(stanza_to_send, now = now) + return id + + + + def connect(self, on_connect, on_connect_failure, on_proxy_failure=None, proxy=None, secure=None): + ''' + Open XMPP connection (open streams in both directions). + :param on_connect: called after stream is successfully opened + :param on_connect_failure: called when error occures during connection + :param on_proxy_failure: called if error occurres during TCP connection to + proxy server or during connection to the proxy + :param proxy: dictionary with proxy data. It should contain at least values + for keys 'host' and 'port' - connection details for proxy server and + optionally keys 'user' and 'pass' as proxy credentials + :param secure: + ''' + + self.on_connect = on_connect + self.on_connect_failure=on_connect_failure + self.on_proxy_failure = on_proxy_failure + self._secure = secure + self.Connection = None + if proxy: + # with proxies, client connects to proxy instead of directly to + # XMPP server from __init__. + # tcp_server is hostname used for socket connecting + tcp_server=proxy['host'] + tcp_port=proxy['port'] + self._on_tcp_failure = self.on_proxy_failure if proxy.has_key('type'): + if proxy.has_key('user') and proxy.has_key('pass'): + proxy_creds=(proxy['user'],proxy['pass']) + else: + proxy_creds=(None, None) + type_ = proxy['type'] if type_ == 'socks5': - self.socket = transports_nb.NBSOCKS5PROXYsocket( - self._on_connected, self._on_proxy_failure, - self._on_connected_failure, proxy, server) + self.socket = transports_nb.NBSOCKS5ProxySocket( + on_disconnect=self.on_disconnect, + proxy_creds=proxy_creds, + xmpp_server=(self.Server, self.Port)) elif type_ == 'http': - self.socket = transports_nb.NBHTTPPROXYsocket(self._on_connected, - self._on_proxy_failure, self._on_connected_failure, proxy, - server) + self.socket = transports_nb.NBHTTPProxySocket( + on_disconnect=self.on_disconnect, + proxy_creds=proxy_creds, + xmpp_server=(self.Server, self.Port)) + elif type_ == 'bosh': + tcp_server = transports_nb.urisplit(tcp_server)[1] + self.socket = transports_nb.NonBlockingHttpBOSH( + on_disconnect=self.on_disconnect, + bosh_uri = proxy['host'], + bosh_port = tcp_port) else: - self.socket = transports_nb.NBHTTPPROXYsocket(self._on_connected, - self._on_proxy_failure, self._on_connected_failure, proxy, - server) + self.socket = transports_nb.NBHTTPProxySocket( + on_disconnect=self.on_disconnect, + proxy_creds=(None, None), + xmpp_server=(self.Server, self.Port)) else: - self.connected = 'tcp' - self.socket = transports_nb.NonBlockingTcp(self._on_connected, - self._on_connected_failure, server) + self._on_tcp_failure = self._on_connect_failure + tcp_server=self.Server + tcp_port=self.Port + self.socket = transports_nb.NonBlockingTcp(on_disconnect = self.on_disconnect) + self.socket.PlugIn(self) - return True + + self._resolve_hostname( + hostname=tcp_server, + port=tcp_port, + on_success=self._try_next_ip, + on_failure=self._on_tcp_failure) + + + + def _resolve_hostname(self, hostname, port, on_success, on_failure): + ''' wrapper of getaddinfo call. FIXME: getaddinfo blocks''' + try: + self.ip_addresses = socket.getaddrinfo(hostname,port, + socket.AF_UNSPEC,socket.SOCK_STREAM) + except socket.gaierror, (errnum, errstr): + on_failure(err_message='Lookup failure for %s:%s - %s %s' % + (self.Server, self.Port, errnum, errstr)) + else: + on_success() + + - def get_attrs(self, on_stream_start): - self.on_stream_start = on_stream_start - self.onreceive(self._on_receive_document_attrs) + def _try_next_ip(self, err_message=None): + '''iterates over IP addresses from getaddinfo''' + if err_message: + self.DEBUG(self.DBG,err_message,'connect') + if self.ip_addresses == []: + self._on_tcp_failure(err_message='Run out of hosts for name %s:%s' % + (self.Server, self.Port)) + else: + self.current_ip = self.ip_addresses.pop(0) + self.socket.connect( + conn_5tuple=self.current_ip, + on_connect=lambda: self._xmpp_connect(socket_type='tcp'), + on_connect_failure=self._try_next_ip) - def _on_proxy_failure(self, reason): - if self.on_proxy_failure: - self.on_proxy_failure(reason) - def _on_connected_failure(self, retry = None): + def incoming_stream_version(self): + ''' gets version of xml stream''' + if self.Dispatcher.Stream._document_attrs.has_key('version'): + return self.Dispatcher.Stream._document_attrs['version'] + else: + return None + + def _xmpp_connect(self, socket_type): + self.connected = socket_type + self._xmpp_connect_machine() + + + def _xmpp_connect_machine(self, mode=None, data=None): + ''' + Finite automaton called after TCP connecting. Takes care of stream opening + and features tag handling. Calls _on_stream_start when stream is + started, and _on_connect_failure on failure. + ''' + #FIXME: use RegisterHandlerOnce instead of onreceive + log.info('=============xmpp_connect_machine() >> mode: %s, data: %s' % (mode,data)) + + def on_next_receive(mode): + if mode is None: + self.onreceive(None) + else: + self.onreceive(lambda data:self._xmpp_connect_machine(mode, data)) + + if not mode: + dispatcher_nb.Dispatcher().PlugIn(self) + on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES') + + elif mode == 'FAILURE': + self._on_connect_failure(err_message='During XMPP connect: %s' % data) + + elif mode == 'RECEIVE_DOCUMENT_ATTRIBUTES': + if data: + self.Dispatcher.ProcessNonBlocking(data) + if not hasattr(self, 'Dispatcher') or \ + self.Dispatcher.Stream._document_attrs is None: + self._xmpp_connect_machine( + mode='FAILURE', + data='Error on stream open') + if self.incoming_stream_version() == '1.0': + if not self.Dispatcher.Stream.features: + on_next_receive('RECEIVE_STREAM_FEATURES') + else: + self._xmpp_connect_machine(mode='STREAM_STARTED') + + else: + self._xmpp_connect_machine(mode='STREAM_STARTED') + + elif mode == 'RECEIVE_STREAM_FEATURES': + if data: + # sometimes are received together with document + # attributes and sometimes on next receive... + self.Dispatcher.ProcessNonBlocking(data) + if not self.Dispatcher.Stream.features: + self._xmpp_connect_machine( + mode='FAILURE', + data='Missing in 1.0 stream') + else: + self._xmpp_connect_machine(mode='STREAM_STARTED') + + elif mode == 'STREAM_STARTED': + self._on_stream_start() + + def _on_stream_start(self): + '''Called when stream is opened. To be overriden in derived classes.''' + + def _on_connect_failure(self, retry=None, err_message=None): + self.connected = None + if err_message: + self.DEBUG(self.DBG, err_message, 'connecting') if self.socket: self.socket.disconnect() - if self.on_connect_failure: - self.on_connect_failure(retry) + self.on_connect_failure(retry) + + def _on_connect(self): + self.onreceive(None) + self.on_connect(self, self.connected) - def _on_connected(self): - # FIXME: why was this needed? Please note that we're working - # in nonblocking mode, and this handler is actually called - # as soon as connection is initiated, NOT when connection - # succeeds, as the name suggests. - # # connect succeeded, so no need of this callback anymore - # self.on_connect_failure = None - self.connected = 'tcp' - if self._Ssl: - transports_nb.NonBlockingTLS().PlugIn(self, now=1) - if not self.Connection: # ssl error, stream is closed - return - self.connected = 'ssl' - self.onreceive(self._on_receive_document_attrs) - dispatcher_nb.Dispatcher().PlugIn(self) - def _on_receive_document_attrs(self, data): - if data: - self.Dispatcher.ProcessNonBlocking(data) - if not hasattr(self, 'Dispatcher') or \ - self.Dispatcher.Stream._document_attrs is None: - return - self.onreceive(None) - if self.Dispatcher.Stream._document_attrs.has_key('version') and \ - self.Dispatcher.Stream._document_attrs['version'] == '1.0': - self.onreceive(self._on_receive_stream_features) - return - if self.on_stream_start: - self.on_stream_start() - self.on_stream_start = None - return True - - def _on_receive_stream_features(self, data): - if data: - self.Dispatcher.ProcessNonBlocking(data) - if not self.Dispatcher.Stream.features: - return - # pass # If we get version 1.0 stream the features tag MUST BE presented - self.onreceive(None) - if self.on_stream_start: - self.on_stream_start() - self.on_stream_start = None - return True # moved from client.CommonClient: def RegisterDisconnectHandler(self,handler): @@ -200,11 +330,7 @@ class NBCommonClient: override this method or at least unregister it. """ raise IOError('Disconnected from server.') - def event(self,eventName,args={}): - """ Default event handler. To be overriden. """ - print "Event: ",(eventName,args) - - def isConnected(self): + def get_connect_type(self): """ Returns connection state. F.e.: None / 'tls' / 'tcp+non_sasl' . """ return self.connected @@ -212,74 +338,18 @@ class NBCommonClient: ''' get the ip address of the account, from which is made connection to the server , (e.g. me). We will create listening socket on the same ip ''' - # moved from client.CommonClient if hasattr(self, 'Connection'): return self.Connection._sock.getsockname() - -class NonBlockingClient(NBCommonClient): - ''' Example client class, based on CommonClient. ''' - def connect(self,server=None,proxy=None,secure=None,use_srv=True): - ''' Connect to jabber server. If you want to specify different ip/port to connect to you can - pass it as tuple as first parameter. If there is HTTP proxy between you and server - specify it's address and credentials (if needed) in the second argument. - If you want ssl/tls support to be discovered and enable automatically - leave third argument as None. (ssl will be autodetected only if port is 5223 or 443) - If you want to force SSL start (i.e. if port 5223 or 443 is remapped to some non-standard port) then set it to 1. - If you want to disable tls/ssl support completely, set it to 0. - Example: connect(('192.168.5.5',5222),{'host':'proxy.my.net','port':8080,'user':'me','password':'secret'}) - Returns '' or 'tcp' or 'tls', depending on the result.''' - self.__secure = secure - self.Connection = None - NBCommonClient.connect(self, server = server, proxy = proxy, ssl = secure, - on_stream_start = self._on_tcp_stream_start) - return self.connected - - - def _is_connected(self): - self.onreceive(None) - if self.on_connect: - self.on_connect(self, self.connected) - self.on_connect_failure = None - self.on_connect = None - - def _on_tcp_stream_start(self): - if not self.connected or self.__secure is not None and not self.__secure: - self._is_connected() - return True - self.isplugged = True - self.onreceive(None) - transports_nb.NonBlockingTLS().PlugIn(self) - if not self.Connection: # ssl error, stream is closed - return True - if not self.Dispatcher.Stream._document_attrs.has_key('version') or \ - not self.Dispatcher.Stream._document_attrs['version']=='1.0': - self._is_connected() - return - if not self.Dispatcher.Stream.features.getTag('starttls'): - self._is_connected() - return - self.onreceive(self._on_receive_starttls) - def _on_receive_starttls(self, data): - if data: - self.Dispatcher.ProcessNonBlocking(data) - if not self.NonBlockingTLS.starttls: - return - self.onreceive(None) - if not hasattr(self, 'NonBlockingTLS') or self.NonBlockingTLS.starttls != 'success': - self.event('tls_failed') - self._is_connected() - return - self.connected = 'tls' - self.onreceive(None) - self._is_connected() - return True - + def auth(self, user, password, resource = '', sasl = 1, on_auth = None): + + print 'auth called' ''' Authenticate connnection and bind resource. If resource is not provided random one or library name used. ''' self._User, self._Password, self._Resource, self._sasl = user, password, resource, sasl self.on_auth = on_auth - self.get_attrs(self._on_doc_attrs) + self._on_doc_attrs() return def _on_old_auth(self, res): @@ -335,6 +405,40 @@ class NonBlockingClient(NBCommonClient): self.on_auth(self, 'sasl') else: self.on_auth(self, None) + + + + +class NonBlockingClient(NBCommonClient): + ''' Example client class, based on CommonClient. ''' + + + def _on_stream_start(self): + ''' + Called after XMPP stream is opened. + In pure XMPP client, TLS negotiation may follow after esabilishing a stream. + ''' + self.onreceive(None) + if self.connected == 'tcp': + if not self.connected or self._secure is not None and not self._secure: + # if we are disconnected or TLS/SSL is not desired, return + self._on_connect() + return + if not self.Dispatcher.Stream.features.getTag('starttls'): + # if server doesn't advertise TLS in init response + self._on_connect() + return + if self.incoming_stream_version() != '1.0': + self._on_connect() + return + # otherwise start TLS + transports_nb.NonBlockingTLS().PlugIn( + self, + on_tls_success=lambda: self._xmpp_connect(socket_type='tls'), + on_tls_failure=self._on_connect_failure) + elif self.connected == 'tls': + self._on_connect() + def initRoster(self): ''' Plug in the roster. ''' @@ -354,87 +458,84 @@ class NonBlockingClient(NBCommonClient): if requestRoster: roster_nb.NonBlockingRoster().PlugIn(self) self.send(dispatcher_nb.Presence(to=jid, typ=typ)) -class Component(NBCommonClient): - ''' Component class. The only difference from CommonClient is ability to perform component authentication. ''' - def __init__(self, server, port=5347, typ=None, debug=['always', 'nodebuilder'], - domains=None, component=0, on_connect = None, on_connect_failure = None): - ''' Init function for Components. - As components use a different auth mechanism which includes the namespace of the component. - Jabberd1.4 and Ejabberd use the default namespace then for all client messages. - Jabberd2 uses jabber:client. - 'server' argument is a server name that you are connecting to (f.e. "localhost"). - 'port' can be specified if 'server' resolves to correct IP. If it is not then you'll need to specify IP - and port while calling "connect()".''' - NBCommonClient.__init__(self, server, port=port, debug=debug) - self.typ = typ - self.component=component - if domains: - self.domains=domains - else: - self.domains=[server] - self.on_connect_component = on_connect - self.on_connect_failure = on_connect_failure - - def connect(self, server=None, proxy=None): - ''' This will connect to the server, and if the features tag is found then set - the namespace to be jabber:client as that is required for jabberd2. - 'server' and 'proxy' arguments have the same meaning as in xmpp.Client.connect() ''' - if self.component: - self.Namespace=auth.NS_COMPONENT_1 - self.Server=server[0] - NBCommonClient.connect(self, server=server, proxy=proxy, - on_connect = self._on_connect, on_connect_failure = self.on_connect_failure) - - def _on_connect(self): - if self.typ=='jabberd2' or not self.typ and self.Dispatcher.Stream.features is not None: - self.defaultNamespace=auth.NS_CLIENT - self.Dispatcher.RegisterNamespace(self.defaultNamespace) - self.Dispatcher.RegisterProtocol('iq',dispatcher.Iq) - self.Dispatcher.RegisterProtocol('message',dispatcher_nb.Message) - self.Dispatcher.RegisterProtocol('presence',dispatcher_nb.Presence) - self.on_connect(self.connected) - def auth(self, name, password, dup=None, sasl=0): - ''' Authenticate component "name" with password "password".''' - self._User, self._Password, self._Resource=name, password,'' - try: - if self.component: - sasl=1 - if sasl: - auth.SASL(name,password).PlugIn(self) - if not sasl or self.SASL.startsasl=='not-supported': - if auth.NonSASL(name,password,'').PlugIn(self): - self.connected+='+old_auth' - return 'old_auth' - return - self.SASL.auth() - self.onreceive(self._on_auth_component) - except: - self.DEBUG(self.DBG,"Failed to authenticate %s" % name,'error') - - def _on_auth_component(self, data): - if data: - self.Dispatcher.ProcessNonBlocking(data) - if self.SASL.startsasl == 'in-process': - return - if self.SASL.startsasl =='success': - if self.component: - self._component = self.component - auth.NBComponentBind().PlugIn(self) - self.onreceive(_on_component_bind) - self.connected += '+sasl' - else: - raise auth.NotAuthorized(self.SASL.startsasl) - - def _on_component_bind(self, data): - if data: - self.Dispatcher.ProcessNonBlocking(data) - if self.NBComponentBind.bound is None: - return - - for domain in self.domains: - self.NBComponentBind.Bind(domain, _on_component_bound) - - def _on_component_bound(self, resp): - self.NBComponentBind.PlugOut() +class BOSHClient(NBCommonClient): + ''' + Client class implementing BOSH. + ''' + def __init__(self, *args, **kw): + '''Preceeds constructor of NBCommonClient and sets some of values that will + be used as attributes in tag''' + self.Namespace = NS_HTTP_BIND + # BOSH parameters should be given via Advanced Configuration Editor + self.bosh_hold = 1 + self.bosh_wait=60 + self.bosh_rid=-1 + self.bosh_httpversion = 'HTTP/1.1' + NBCommonClient.__init__(self, *args, **kw) + + + def connect(self, *args, **kw): + proxy = kw['proxy'] + self.bosh_protocol, self.bosh_host, self.bosh_uri = self.urisplit(proxy['host']) + self.bosh_port = proxy['port'] + NBCommonClient.connect(*args, **kw) + + def _on_stream_start(self): + ''' + Called after XMPP stream is opened. In BOSH TLS is negotiated on tranport layer + so success callback can be invoked. + (authentication is started from auth() method) + ''' + self.onreceive(None) + if self.connected == 'tcp': + self._on_connect() + + + + + + def bosh_raise_event(self, realm, event, data): + # should to extract stanza from body + self.DEBUG(self.DBG,'realm: %s, event: %s, data: %s' % (realm, event, data), + 'BOSH EventHandler') + self._caller._event_dispatcher(realm, event, data) + + + def StreamInit(self): + ''' + Init of BOSH session. Called instead of Dispatcher.StreamInit() + Initial body tag is created and sent. + ''' + #self.Dispatcher.RegisterEventHandler(self.bosh_event_handler) + self.Dispatcher.Stream = simplexml.NodeBuilder() + self.Dispatcher.Stream._dispatch_depth = 2 + self.Dispatcher.Stream.dispatch = self.Dispatcher.dispatch + self.Dispatcher.Stream.stream_header_received = self._check_stream_start + self.Dispatcher.Stream.features = None + + r = random.Random() + r.seed() + # with 50-bit random initial rid, session would have to go up + # to 7881299347898368 messages to raise rid over 2**53 + # (see http://www.xmpp.org/extensions/xep-0124.html#rids) + self.bosh_rid = r.getrandbits(50) + + initial_body_tag = BOSHBody( + attrs={'content': 'text/xml; charset=utf-8', + 'hold': str(self.bosh_hold), + # "to" should be domain, not hostname of machine + 'to': self.Server, + 'wait': str(self.bosh_wait), + 'rid': str(self.bosh_rid), + 'xmpp:version': '1.0', + 'xmlns:xmpp': 'urn:xmpp:xbosh'} + ) + + if locale.getdefaultlocale()[0]: + initial_body_tag.setAttr('xml:lang', + locale.getdefaultlocale()[0].split('_')[0]) + initial_body_tag.setAttr('xmpp:version', '1.0') + initial_body_tag.setAttr('xmlns:xmpp', 'urn:xmpp:xbosh') + self.send(initial_body_tag) diff --git a/src/common/xmpp/debug.py b/src/common/xmpp/debug.py index 60480b66c..80ab6be9a 100644 --- a/src/common/xmpp/debug.py +++ b/src/common/xmpp/debug.py @@ -393,6 +393,7 @@ class Debug: colors={} def Show(self, flag, msg, prefix=''): + msg=str(msg) msg=msg.replace('\r','\\r').replace('\n','\\n').replace('><','>\n <') if not colors_enabled: pass elif self.colors.has_key(prefix): msg=self.colors[prefix]+msg+color_none diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index 515a09d10..4633db28b 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -47,7 +47,7 @@ class Dispatcher(PlugIn): self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler, \ self.RegisterEventHandler, self.UnregisterCycleHandler, self.RegisterCycleHandler, \ self.RegisterHandlerOnce, self.UnregisterHandler, self.RegisterProtocol, \ - self.SendAndWaitForResponse, self.send,self.disconnect, \ + self.SendAndWaitForResponse, self.assign_id, self.StreamTerminate, \ self.SendAndCallForResponse, self.getAnID, self.Event] def getAnID(self): @@ -79,6 +79,8 @@ class Dispatcher(PlugIn): def plugin(self, owner): ''' Plug the Dispatcher instance into Client class instance and send initial stream header. Used internally.''' + self.DEBUG('Dispatcher plugin', 'PlugIn') + self._init() self._owner.lastErrNode = None self._owner.lastErr = None @@ -116,6 +118,10 @@ class Dispatcher(PlugIn): locale.getdefaultlocale()[0].split('_')[0]) self._owner.send("%s>" % str(self._metastream)[:-2]) + def StreamTerminate(self): + ''' Send a stream terminator. ''' + self._owner.send('') + def _check_stream_start(self, ns, tag, attrs): if ns<>NS_STREAMS or tag<>'stream': raise ValueError('Incorrect stream start: (%s,%s). Terminating.' % (tag, ns)) @@ -139,7 +145,7 @@ class Dispatcher(PlugIn): return 0 except ExpatError: self.DEBUG('Invalid XML received from server. Forcing disconnect.', 'error') - self._owner.Connection.pollend() + self._owner.Connection.disconnect() return 0 if len(self._pendingExceptions) > 0: _pendingException = self._pendingExceptions.pop() @@ -244,7 +250,7 @@ class Dispatcher(PlugIn): def returnStanzaHandler(self,conn,stanza): ''' Return stanza back to the sender with error set. ''' if stanza.getType() in ['get','set']: - conn.send(Error(stanza,ERR_FEATURE_NOT_IMPLEMENTED)) + conn._owner.send(Error(stanza,ERR_FEATURE_NOT_IMPLEMENTED)) def streamErrorHandler(self,conn,error): name,text='error',error.getData() @@ -387,7 +393,7 @@ class Dispatcher(PlugIn): ''' Put stanza on the wire and wait for recipient's response to it. ''' if timeout is None: timeout = DEFAULT_TIMEOUT_SECONDS - self._witid = self.send(stanza) + self._witid = self._owner.send(stanza) if func: self.on_responses[self._witid] = (func, args) if timeout: @@ -401,11 +407,10 @@ class Dispatcher(PlugIn): Additional callback arguments can be specified in args. ''' self.SendAndWaitForResponse(stanza, 0, func, args) - def send(self, stanza, is_message = False, now = False): - ''' Serialise stanza and put it on the wire. Assign an unique ID to it before send. - Returns assigned ID.''' + def assign_id(self, stanza): + ''' Assign an unique ID to stanza and return assigned ID.''' if type(stanza) in [type(''), type(u'')]: - return self._owner.Connection.send(stanza, now = now) + return (None, stanza) if not isinstance(stanza, Protocol): _ID=None elif not stanza.getID(): @@ -417,23 +422,7 @@ class Dispatcher(PlugIn): _ID=stanza.getID() if self._owner._registered_name and not stanza.getAttr('from'): stanza.setAttr('from', self._owner._registered_name) - if self._owner._component and stanza.getName() != 'bind': - to=self._owner.Server - if stanza.getTo() and stanza.getTo().getDomain(): - to=stanza.getTo().getDomain() - frm=stanza.getFrom() - if frm.getDomain(): - frm=frm.getDomain() - route=Protocol('route', to=to, frm=frm, payload=[stanza]) - stanza=route stanza.setNamespace(self._owner.Namespace) stanza.setParent(self._metastream) - if is_message: - self._owner.Connection.send(stanza, True, now = now) - else: - self._owner.Connection.send(stanza, now = now) - return _ID + return (_ID, stanza) - def disconnect(self): - ''' Send a stream terminator. ''' - self._owner.Connection.send('') diff --git a/src/common/xmpp/idlequeue.py b/src/common/xmpp/idlequeue.py index 955af1be1..e9c4f0fde 100644 --- a/src/common/xmpp/idlequeue.py +++ b/src/common/xmpp/idlequeue.py @@ -33,7 +33,7 @@ class IdleObject: ''' called on new write event (connect in sockets is a pollout) ''' pass - def read_timeout(self, fd): + def read_timeout(self): ''' called when timeout has happend ''' pass diff --git a/src/common/xmpp/protocol.py b/src/common/xmpp/protocol.py index a9140c8f9..797a8e9af 100644 --- a/src/common/xmpp/protocol.py +++ b/src/common/xmpp/protocol.py @@ -171,27 +171,27 @@ temporary-auth-failure -- -- -- The authentication failed because of a tempora ERRORS,_errorcodes={},{} for ns,errname,errpool in [(NS_XMPP_STREAMS,'STREAM',xmpp_stream_error_conditions), - (NS_STANZAS ,'ERR' ,xmpp_stanza_error_conditions), - (NS_SASL ,'SASL' ,sasl_error_conditions)]: - for err in errpool.split('\n')[1:]: - cond,code,typ,text=err.split(' -- ') - name=errname+'_'+cond.upper().replace('-','_') - locals()[name]=ns+' '+cond - ERRORS[ns+' '+cond]=[code,typ,text] - if code: _errorcodes[code]=cond + (NS_STANZAS ,'ERR' ,xmpp_stanza_error_conditions), + (NS_SASL ,'SASL' ,sasl_error_conditions)]: + for err in errpool.split('\n')[1:]: + cond,code,typ,text=err.split(' -- ') + name=errname+'_'+cond.upper().replace('-','_') + locals()[name]=ns+' '+cond + ERRORS[ns+' '+cond]=[code,typ,text] + if code: _errorcodes[code]=cond del ns,errname,errpool,err,cond,code,typ,text def isResultNode(node): - """ Returns true if the node is a positive reply. """ - return node and node.getType()=='result' + """ Returns true if the node is a positive reply. """ + return node and node.getType()=='result' def isErrorNode(node): - """ Returns true if the node is a negative reply. """ - return node and node.getType()=='error' + """ Returns true if the node is a negative reply. """ + return node and node.getType()=='error' class NodeProcessed(Exception): - """ Exception that should be raised by handler when the handling should be stopped. """ + """ Exception that should be raised by handler when the handling should be stopped. """ class StreamError(Exception): - """ Base exception class for stream errors.""" + """ Base exception class for stream errors.""" class BadFormat(StreamError): pass class BadNamespacePrefix(StreamError): pass class Conflict(StreamError): pass @@ -218,559 +218,570 @@ class UnsupportedVersion(StreamError): pass class XMLNotWellFormed(StreamError): pass stream_exceptions = {'bad-format': BadFormat, - 'bad-namespace-prefix': BadNamespacePrefix, - 'conflict': Conflict, - 'connection-timeout': ConnectionTimeout, - 'host-gone': HostGone, - 'host-unknown': HostUnknown, - 'improper-addressing': ImproperAddressing, - 'internal-server-error': InternalServerError, - 'invalid-from': InvalidFrom, - 'invalid-id': InvalidID, - 'invalid-namespace': InvalidNamespace, - 'invalid-xml': InvalidXML, - 'not-authorized': NotAuthorized, - 'policy-violation': PolicyViolation, - 'remote-connection-failed': RemoteConnectionFailed, - 'resource-constraint': ResourceConstraint, - 'restricted-xml': RestrictedXML, - 'see-other-host': SeeOtherHost, - 'system-shutdown': SystemShutdown, - 'undefined-condition': UndefinedCondition, - 'unsupported-encoding': UnsupportedEncoding, - 'unsupported-stanza-type': UnsupportedStanzaType, - 'unsupported-version': UnsupportedVersion, - 'xml-not-well-formed': XMLNotWellFormed} + 'bad-namespace-prefix': BadNamespacePrefix, + 'conflict': Conflict, + 'connection-timeout': ConnectionTimeout, + 'host-gone': HostGone, + 'host-unknown': HostUnknown, + 'improper-addressing': ImproperAddressing, + 'internal-server-error': InternalServerError, + 'invalid-from': InvalidFrom, + 'invalid-id': InvalidID, + 'invalid-namespace': InvalidNamespace, + 'invalid-xml': InvalidXML, + 'not-authorized': NotAuthorized, + 'policy-violation': PolicyViolation, + 'remote-connection-failed': RemoteConnectionFailed, + 'resource-constraint': ResourceConstraint, + 'restricted-xml': RestrictedXML, + 'see-other-host': SeeOtherHost, + 'system-shutdown': SystemShutdown, + 'undefined-condition': UndefinedCondition, + 'unsupported-encoding': UnsupportedEncoding, + 'unsupported-stanza-type': UnsupportedStanzaType, + 'unsupported-version': UnsupportedVersion, + 'xml-not-well-formed': XMLNotWellFormed} class JID: - """ JID object. JID can be built from string, modified, compared, serialised into string. """ - def __init__(self, jid=None, node='', domain='', resource=''): - """ Constructor. JID can be specified as string (jid argument) or as separate parts. - Examples: - JID('node@domain/resource') - JID(node='node',domain='domain.org') - """ - if not jid and not domain: raise ValueError('JID must contain at least domain name') - elif type(jid)==type(self): self.node,self.domain,self.resource=jid.node,jid.domain,jid.resource - elif domain: self.node,self.domain,self.resource=node,domain,resource - else: - if jid.find('@')+1: self.node,jid=jid.split('@',1) - else: self.node='' - if jid.find('/')+1: self.domain,self.resource=jid.split('/',1) - else: self.domain,self.resource=jid,'' - def getNode(self): - """ Return the node part of the JID """ - return self.node - def setNode(self,node): - """ Set the node part of the JID to new value. Specify None to remove the node part.""" - self.node=node.lower() - def getDomain(self): - """ Return the domain part of the JID """ - return self.domain - def setDomain(self,domain): - """ Set the domain part of the JID to new value.""" - self.domain=domain.lower() - def getResource(self): - """ Return the resource part of the JID """ - return self.resource - def setResource(self,resource): - """ Set the resource part of the JID to new value. Specify None to remove the resource part.""" - self.resource=resource - def getStripped(self): - """ Return the bare representation of JID. I.e. string value w/o resource. """ - return self.__str__(0) - def __eq__(self, other): - """ Compare the JID to another instance or to string for equality. """ - try: other=JID(other) - except ValueError: return 0 - return self.resource==other.resource and self.__str__(0) == other.__str__(0) - def __ne__(self, other): - """ Compare the JID to another instance or to string for non-equality. """ - return not self.__eq__(other) - def bareMatch(self, other): - """ Compare the node and domain parts of the JID's for equality. """ - return self.__str__(0) == JID(other).__str__(0) - def __str__(self,wresource=1): - """ Serialise JID into string. """ - if self.node: jid=self.node+'@'+self.domain - else: jid=self.domain - if wresource and self.resource: return jid+'/'+self.resource - return jid - def __hash__(self): - """ Produce hash of the JID, Allows to use JID objects as keys of the dictionary. """ - return hash(self.__str__()) + """ JID object. JID can be built from string, modified, compared, serialised into string. """ + def __init__(self, jid=None, node='', domain='', resource=''): + """ Constructor. JID can be specified as string (jid argument) or as separate parts. + Examples: + JID('node@domain/resource') + JID(node='node',domain='domain.org') + """ + if not jid and not domain: raise ValueError('JID must contain at least domain name') + elif type(jid)==type(self): self.node,self.domain,self.resource=jid.node,jid.domain,jid.resource + elif domain: self.node,self.domain,self.resource=node,domain,resource + else: + if jid.find('@')+1: self.node,jid=jid.split('@',1) + else: self.node='' + if jid.find('/')+1: self.domain,self.resource=jid.split('/',1) + else: self.domain,self.resource=jid,'' + def getNode(self): + """ Return the node part of the JID """ + return self.node + def setNode(self,node): + """ Set the node part of the JID to new value. Specify None to remove the node part.""" + self.node=node.lower() + def getDomain(self): + """ Return the domain part of the JID """ + return self.domain + def setDomain(self,domain): + """ Set the domain part of the JID to new value.""" + self.domain=domain.lower() + def getResource(self): + """ Return the resource part of the JID """ + return self.resource + def setResource(self,resource): + """ Set the resource part of the JID to new value. Specify None to remove the resource part.""" + self.resource=resource + def getStripped(self): + """ Return the bare representation of JID. I.e. string value w/o resource. """ + return self.__str__(0) + def __eq__(self, other): + """ Compare the JID to another instance or to string for equality. """ + try: other=JID(other) + except ValueError: return 0 + return self.resource==other.resource and self.__str__(0) == other.__str__(0) + def __ne__(self, other): + """ Compare the JID to another instance or to string for non-equality. """ + return not self.__eq__(other) + def bareMatch(self, other): + """ Compare the node and domain parts of the JID's for equality. """ + return self.__str__(0) == JID(other).__str__(0) + def __str__(self,wresource=1): + """ Serialise JID into string. """ + if self.node: jid=self.node+'@'+self.domain + else: jid=self.domain + if wresource and self.resource: return jid+'/'+self.resource + return jid + def __hash__(self): + """ Produce hash of the JID, Allows to use JID objects as keys of the dictionary. """ + return hash(self.__str__()) + + class Protocol(Node): - """ A "stanza" object class. Contains methods that are common for presences, iqs and messages. """ - def __init__(self, name=None, to=None, typ=None, frm=None, attrs={}, payload=[], timestamp=None, xmlns=None, node=None): - """ Constructor, name is the name of the stanza i.e. 'message' or 'presence' or 'iq'. - to is the value of 'to' attribure, 'typ' - 'type' attribute - frn - from attribure, attrs - other attributes mapping, payload - same meaning as for simplexml payload definition - timestamp - the time value that needs to be stamped over stanza - xmlns - namespace of top stanza node - node - parsed or unparsed stana to be taken as prototype. - """ - if not attrs: attrs={} - if to: attrs['to']=to - if frm: attrs['from']=frm - if typ: attrs['type']=typ - Node.__init__(self, tag=name, attrs=attrs, payload=payload, node=node) - if not node and xmlns: self.setNamespace(xmlns) - if self['to']: self.setTo(self['to']) - if self['from']: self.setFrom(self['from']) - if node and type(self)==type(node) and self.__class__==node.__class__ and self.attrs.has_key('id'): del self.attrs['id'] - self.timestamp=None - for x in self.getTags('x',namespace=NS_DELAY): - try: - if x.getAttr('stamp')'text': return tag.getName() - return errtag.getData() - def getErrorMsg(self): - """ Return the textual description of the error (if present) or the error condition """ - errtag=self.getTag('error') - if errtag: - for tag in errtag.getChildren(): - if tag.getName()=='text': return tag.getData() - return self.getError() - def getErrorCode(self): - """ Return the error code. Obsolete. """ - return self.getTagAttr('error','code') - def setError(self,error,code=None): - """ Set the error code. Obsolete. Use error-conditions instead. """ - if code: - if str(code) in _errorcodes.keys(): error=ErrorNode(_errorcodes[str(code)],text=error) - else: error=ErrorNode(ERR_UNDEFINED_CONDITION,code=code,typ='cancel',text=error) - elif type(error) in [type(''),type(u'')]: error=ErrorNode(error) - self.setType('error') - self.addChild(node=error) - def setTimestamp(self,val=None): - """Set the timestamp. timestamp should be the yyyymmddThhmmss string.""" - if not val: val=time.strftime('%Y%m%dT%H:%M:%S', time.gmtime()) - self.timestamp=val - self.setTag('x',{'stamp':self.timestamp},namespace=NS_DELAY) - def getProperties(self): - """ Return the list of namespaces to which belongs the direct childs of element""" - props=[] - for child in self.getChildren(): - prop=child.getNamespace() - if prop not in props: props.append(prop) - return props - def __setitem__(self,item,val): - """ Set the item 'item' to the value 'val'.""" - if item in ['to','from']: val=JID(val) - return self.setAttr(item,val) + """ A "stanza" object class. Contains methods that are common for presences, iqs and messages. """ + def __init__(self, name=None, to=None, typ=None, frm=None, attrs={}, payload=[], timestamp=None, xmlns=None, node=None): + """ Constructor, name is the name of the stanza i.e. 'message' or 'presence' or 'iq'. + to is the value of 'to' attribure, 'typ' - 'type' attribute + frn - from attribure, attrs - other attributes mapping, + payload - same meaning as for simplexml payload definition + timestamp - the time value that needs to be stamped over stanza + xmlns - namespace of top stanza node + node - parsed or unparsed stana to be taken as prototype. + """ + if not attrs: attrs={} + if to: attrs['to']=to + if frm: attrs['from']=frm + if typ: attrs['type']=typ + Node.__init__(self, tag=name, attrs=attrs, payload=payload, node=node) + if not node and xmlns: self.setNamespace(xmlns) + if self['to']: self.setTo(self['to']) + if self['from']: self.setFrom(self['from']) + if node and type(self)==type(node) and self.__class__==node.__class__ and self.attrs.has_key('id'): del self.attrs['id'] + self.timestamp=None + for x in self.getTags('x',namespace=NS_DELAY): + try: + if x.getAttr('stamp')'text': return tag.getName() + return errtag.getData() + def getErrorMsg(self): + """ Return the textual description of the error (if present) or the error condition """ + errtag=self.getTag('error') + if errtag: + for tag in errtag.getChildren(): + if tag.getName()=='text': return tag.getData() + return self.getError() + def getErrorCode(self): + """ Return the error code. Obsolete. """ + return self.getTagAttr('error','code') + def setError(self,error,code=None): + """ Set the error code. Obsolete. Use error-conditions instead. """ + if code: + if str(code) in _errorcodes.keys(): error=ErrorNode(_errorcodes[str(code)],text=error) + else: error=ErrorNode(ERR_UNDEFINED_CONDITION,code=code,typ='cancel',text=error) + elif type(error) in [type(''),type(u'')]: error=ErrorNode(error) + self.setType('error') + self.addChild(node=error) + def setTimestamp(self,val=None): + """Set the timestamp. timestamp should be the yyyymmddThhmmss string.""" + if not val: val=time.strftime('%Y%m%dT%H:%M:%S', time.gmtime()) + self.timestamp=val + self.setTag('x',{'stamp':self.timestamp},namespace=NS_DELAY) + def getProperties(self): + """ Return the list of namespaces to which belongs the direct childs of element""" + props=[] + for child in self.getChildren(): + prop=child.getNamespace() + if prop not in props: props.append(prop) + return props + def __setitem__(self,item,val): + """ Set the item 'item' to the value 'val'.""" + if item in ['to','from']: val=JID(val) + return self.setAttr(item,val) + +class BOSHBody(Protocol): + ''' + tag that wraps usual XMPP stanzas in BOSH + ''' + def __init__(self, to=None, frm=None, attrs={}, payload=[], node=None): + Protocol.__init__(self, name='body', to=to, frm=frm, attrs=attrs, + payload=payload, xmlns=NS_HTTP_BIND, node=node) class Message(Protocol): - """ XMPP Message stanza - "push" mechanism.""" - def __init__(self, to=None, body=None, xhtml=None, typ=None, subject=None, attrs={}, frm=None, payload=[], timestamp=None, xmlns=NS_CLIENT, node=None): - """ Create message object. You can specify recipient, text of message, type of message - any additional attributes, sender of the message, any additional payload (f.e. jabber:x:delay element) and namespace in one go. - Alternatively you can pass in the other XML object as the 'node' parameted to replicate it as message. """ - Protocol.__init__(self, 'message', to=to, typ=typ, attrs=attrs, frm=frm, payload=payload, timestamp=timestamp, xmlns=xmlns, node=node) - if body: self.setBody(body) - if xhtml: self.setXHTML(xhtml) - if subject is not None: self.setSubject(subject) - def getBody(self): - """ Returns text of the message. """ - return self.getTagData('body') - def getXHTML(self, xmllang=None): - """ Returns serialized xhtml-im element text of the message. + """ XMPP Message stanza - "push" mechanism.""" + def __init__(self, to=None, body=None, xhtml=None, typ=None, subject=None, attrs={}, frm=None, payload=[], timestamp=None, xmlns=NS_CLIENT, node=None): + """ Create message object. You can specify recipient, text of message, type of message + any additional attributes, sender of the message, any additional payload (f.e. jabber:x:delay element) and namespace in one go. + Alternatively you can pass in the other XML object as the 'node' parameted to replicate it as message. """ + Protocol.__init__(self, 'message', to=to, typ=typ, attrs=attrs, frm=frm, payload=payload, timestamp=timestamp, xmlns=xmlns, node=node) + if body: self.setBody(body) + if xhtml: self.setXHTML(xhtml) + if subject is not None: self.setSubject(subject) + def getBody(self): + """ Returns text of the message. """ + return self.getTagData('body') + def getXHTML(self, xmllang=None): + """ Returns serialized xhtml-im element text of the message. - TODO: Returning a DOM could make rendering faster.""" - xhtml = self.getTag('html') - if xhtml: - if xmllang: - body = xhtml.getTag('body', attrs={'xml:lang':xmllang}) - else: - body = xhtml.getTag('body') - return str(body) - return None - def getSubject(self): - """ Returns subject of the message. """ - return self.getTagData('subject') - def getThread(self): - """ Returns thread of the message. """ - return self.getTagData('thread') - def setBody(self,val): - """ Sets the text of the message. """ - self.setTagData('body',val) + TODO: Returning a DOM could make rendering faster.""" + xhtml = self.getTag('html') + if xhtml: + if xmllang: + body = xhtml.getTag('body', attrs={'xml:lang':xmllang}) + else: + body = xhtml.getTag('body') + return str(body) + return None + def getSubject(self): + """ Returns subject of the message. """ + return self.getTagData('subject') + def getThread(self): + """ Returns thread of the message. """ + return self.getTagData('thread') + def setBody(self,val): + """ Sets the text of the message. """ + self.setTagData('body',val) - def setXHTML(self,val,xmllang=None): - """ Sets the xhtml text of the message (XEP-0071). - The parameter is the "inner html" to the body.""" - try: - if xmllang: - dom = NodeBuilder('' + val + '').getDom() - else: - dom = NodeBuilder(''+val+'',0).getDom() - if self.getTag('html'): - self.getTag('html').addChild(node=dom) - else: - self.setTag('html',namespace=NS_XHTML_IM).addChild(node=dom) - except Exception, e: - print "Error", e - pass #FIXME: log. we could not set xhtml (parse error, whatever) - def setSubject(self,val): - """ Sets the subject of the message. """ - self.setTagData('subject',val) - def setThread(self,val): - """ Sets the thread of the message. """ - self.setTagData('thread',val) - def buildReply(self,text=None): - """ Builds and returns another message object with specified text. - The to, from and thread properties of new message are pre-set as reply to this message. """ - m=Message(to=self.getFrom(),frm=self.getTo(),body=text,node=self) - th=self.getThread() - if th: m.setThread(th) - return m - def getStatusCode(self): - """Returns the status code of the message (for groupchat config - change)""" - attrs = [] - for xtag in self.getTags('x'): - for child in xtag.getTags('status'): - attrs.append(child.getAttr('code')) - return attrs + def setXHTML(self,val,xmllang=None): + """ Sets the xhtml text of the message (XEP-0071). + The parameter is the "inner html" to the body.""" + try: + if xmllang: + dom = NodeBuilder('' + val + '').getDom() + else: + dom = NodeBuilder(''+val+'',0).getDom() + if self.getTag('html'): + self.getTag('html').addChild(node=dom) + else: + self.setTag('html',namespace=NS_XHTML_IM).addChild(node=dom) + except Exception, e: + print "Error", e + pass #FIXME: log. we could not set xhtml (parse error, whatever) + def setSubject(self,val): + """ Sets the subject of the message. """ + self.setTagData('subject',val) + def setThread(self,val): + """ Sets the thread of the message. """ + self.setTagData('thread',val) + def buildReply(self,text=None): + """ Builds and returns another message object with specified text. + The to, from and thread properties of new message are pre-set as reply to this message. """ + m=Message(to=self.getFrom(),frm=self.getTo(),body=text,node=self) + th=self.getThread() + if th: m.setThread(th) + return m + def getStatusCode(self): + """Returns the status code of the message (for groupchat config + change)""" + attrs = [] + for xtag in self.getTags('x'): + for child in xtag.getTags('status'): + attrs.append(child.getAttr('code')) + return attrs class Presence(Protocol): - """ XMPP Presence object.""" - def __init__(self, to=None, typ=None, priority=None, show=None, status=None, attrs={}, frm=None, timestamp=None, payload=[], xmlns=NS_CLIENT, node=None): - """ Create presence object. You can specify recipient, type of message, priority, show and status values - any additional attributes, sender of the presence, timestamp, any additional payload (f.e. jabber:x:delay element) and namespace in one go. - Alternatively you can pass in the other XML object as the 'node' parameted to replicate it as presence. """ - Protocol.__init__(self, 'presence', to=to, typ=typ, attrs=attrs, frm=frm, payload=payload, timestamp=timestamp, xmlns=xmlns, node=node) - if priority: self.setPriority(priority) - if show: self.setShow(show) - if status: self.setStatus(status) - def getPriority(self): - """ Returns the priority of the message. """ - return self.getTagData('priority') - def getShow(self): - """ Returns the show value of the message. """ - return self.getTagData('show') - def getStatus(self): - """ Returns the status string of the message. """ - return self.getTagData('status') - def setPriority(self,val): - """ Sets the priority of the message. """ - self.setTagData('priority',val) - def setShow(self,val): - """ Sets the show value of the message. """ - self.setTagData('show',val) - def setStatus(self,val): - """ Sets the status string of the message. """ - self.setTagData('status',val) + """ XMPP Presence object.""" + def __init__(self, to=None, typ=None, priority=None, show=None, status=None, attrs={}, frm=None, timestamp=None, payload=[], xmlns=NS_CLIENT, node=None): + """ Create presence object. You can specify recipient, type of message, priority, show and status values + any additional attributes, sender of the presence, timestamp, any additional payload (f.e. jabber:x:delay element) and namespace in one go. + Alternatively you can pass in the other XML object as the 'node' parameted to replicate it as presence. """ + Protocol.__init__(self, 'presence', to=to, typ=typ, attrs=attrs, frm=frm, payload=payload, timestamp=timestamp, xmlns=xmlns, node=node) + if priority: self.setPriority(priority) + if show: self.setShow(show) + if status: self.setStatus(status) + def getPriority(self): + """ Returns the priority of the message. """ + return self.getTagData('priority') + def getShow(self): + """ Returns the show value of the message. """ + return self.getTagData('show') + def getStatus(self): + """ Returns the status string of the message. """ + return self.getTagData('status') + def setPriority(self,val): + """ Sets the priority of the message. """ + self.setTagData('priority',val) + def setShow(self,val): + """ Sets the show value of the message. """ + self.setTagData('show',val) + def setStatus(self,val): + """ Sets the status string of the message. """ + self.setTagData('status',val) - def _muc_getItemAttr(self,tag,attr): - for xtag in self.getTags('x'): - for child in xtag.getTags(tag): - return child.getAttr(attr) - def _muc_getSubTagDataAttr(self,tag,attr): - for xtag in self.getTags('x'): - for child in xtag.getTags('item'): - for cchild in child.getTags(tag): - return cchild.getData(),cchild.getAttr(attr) - return None,None - def getRole(self): - """Returns the presence role (for groupchat)""" - return self._muc_getItemAttr('item','role') - def getAffiliation(self): - """Returns the presence affiliation (for groupchat)""" - return self._muc_getItemAttr('item','affiliation') - def getNewNick(self): - """Returns the status code of the presence (for groupchat)""" - return self._muc_getItemAttr('item','nick') - def getJid(self): - """Returns the presence jid (for groupchat)""" - return self._muc_getItemAttr('item','jid') - def getReason(self): - """Returns the reason of the presence (for groupchat)""" - return self._muc_getSubTagDataAttr('reason','')[0] - def getActor(self): - """Returns the reason of the presence (for groupchat)""" - return self._muc_getSubTagDataAttr('actor','jid')[1] - def getStatusCode(self): - """Returns the status code of the presence (for groupchat)""" - attrs = [] - for xtag in self.getTags('x'): - for child in xtag.getTags('status'): - attrs.append(child.getAttr('code')) - return attrs + def _muc_getItemAttr(self,tag,attr): + for xtag in self.getTags('x'): + for child in xtag.getTags(tag): + return child.getAttr(attr) + def _muc_getSubTagDataAttr(self,tag,attr): + for xtag in self.getTags('x'): + for child in xtag.getTags('item'): + for cchild in child.getTags(tag): + return cchild.getData(),cchild.getAttr(attr) + return None,None + def getRole(self): + """Returns the presence role (for groupchat)""" + return self._muc_getItemAttr('item','role') + def getAffiliation(self): + """Returns the presence affiliation (for groupchat)""" + return self._muc_getItemAttr('item','affiliation') + def getNewNick(self): + """Returns the status code of the presence (for groupchat)""" + return self._muc_getItemAttr('item','nick') + def getJid(self): + """Returns the presence jid (for groupchat)""" + return self._muc_getItemAttr('item','jid') + def getReason(self): + """Returns the reason of the presence (for groupchat)""" + return self._muc_getSubTagDataAttr('reason','')[0] + def getActor(self): + """Returns the reason of the presence (for groupchat)""" + return self._muc_getSubTagDataAttr('actor','jid')[1] + def getStatusCode(self): + """Returns the status code of the presence (for groupchat)""" + attrs = [] + for xtag in self.getTags('x'): + for child in xtag.getTags('status'): + attrs.append(child.getAttr('code')) + return attrs class Iq(Protocol): - """ XMPP Iq object - get/set dialog mechanism. """ - def __init__(self, typ=None, queryNS=None, attrs={}, to=None, frm=None, payload=[], xmlns=NS_CLIENT, node=None): - """ Create Iq object. You can specify type, query namespace - any additional attributes, recipient of the iq, sender of the iq, any additional payload (f.e. jabber:x:data node) and namespace in one go. - Alternatively you can pass in the other XML object as the 'node' parameted to replicate it as an iq. """ - Protocol.__init__(self, 'iq', to=to, typ=typ, attrs=attrs, frm=frm, xmlns=xmlns, node=node) - if payload: self.setQueryPayload(payload) - if queryNS: self.setQueryNS(queryNS) - def getQueryNS(self): - """ Return the namespace of the 'query' child element.""" - tag=self.getTag('query') - if tag: return tag.getNamespace() - def getQuerynode(self): - """ Return the 'node' attribute value of the 'query' child element.""" - return self.getTagAttr('query','node') - def getQueryPayload(self): - """ Return the 'query' child element payload.""" - tag=self.getTag('query') - if tag: return tag.getPayload() - def getQueryChildren(self): - """ Return the 'query' child element child nodes.""" - tag=self.getTag('query') - if tag: return tag.getChildren() - def setQueryNS(self,namespace): - """ Set the namespace of the 'query' child element.""" - self.setTag('query').setNamespace(namespace) - def setQueryPayload(self,payload): - """ Set the 'query' child element payload.""" - self.setTag('query').setPayload(payload) - def setQuerynode(self,node): - """ Set the 'node' attribute value of the 'query' child element.""" - self.setTagAttr('query','node',node) - def buildReply(self,typ): - """ Builds and returns another Iq object of specified type. - The to, from and query child node of new Iq are pre-set as reply to this Iq. """ - iq=Iq(typ,to=self.getFrom(),frm=self.getTo(),attrs={'id':self.getID()}) - if self.getTag('query'): iq.setQueryNS(self.getQueryNS()) - return iq + """ XMPP Iq object - get/set dialog mechanism. """ + def __init__(self, typ=None, queryNS=None, attrs={}, to=None, frm=None, payload=[], xmlns=NS_CLIENT, node=None): + """ Create Iq object. You can specify type, query namespace + any additional attributes, recipient of the iq, sender of the iq, any additional payload (f.e. jabber:x:data node) and namespace in one go. + Alternatively you can pass in the other XML object as the 'node' parameted to replicate it as an iq. """ + Protocol.__init__(self, 'iq', to=to, typ=typ, attrs=attrs, frm=frm, xmlns=xmlns, node=node) + if payload: self.setQueryPayload(payload) + if queryNS: self.setQueryNS(queryNS) + def getQueryNS(self): + """ Return the namespace of the 'query' child element.""" + tag=self.getTag('query') + if tag: return tag.getNamespace() + def getQuerynode(self): + """ Return the 'node' attribute value of the 'query' child element.""" + return self.getTagAttr('query','node') + def getQueryPayload(self): + """ Return the 'query' child element payload.""" + tag=self.getTag('query') + if tag: return tag.getPayload() + def getQueryChildren(self): + """ Return the 'query' child element child nodes.""" + tag=self.getTag('query') + if tag: return tag.getChildren() + def setQueryNS(self,namespace): + """ Set the namespace of the 'query' child element.""" + self.setTag('query').setNamespace(namespace) + def setQueryPayload(self,payload): + """ Set the 'query' child element payload.""" + self.setTag('query').setPayload(payload) + def setQuerynode(self,node): + """ Set the 'node' attribute value of the 'query' child element.""" + self.setTagAttr('query','node',node) + def buildReply(self,typ): + """ Builds and returns another Iq object of specified type. + The to, from and query child node of new Iq are pre-set as reply to this Iq. """ + iq=Iq(typ,to=self.getFrom(),frm=self.getTo(),attrs={'id':self.getID()}) + if self.getTag('query'): iq.setQueryNS(self.getQueryNS()) + return iq class ErrorNode(Node): - """ XMPP-style error element. - In the case of stanza error should be attached to XMPP stanza. - In the case of stream-level errors should be used separately. """ - def __init__(self,name,code=None,typ=None,text=None): - """ Create new error node object. - Mandatory parameter: name - name of error condition. - Optional parameters: code, typ, text. Used for backwards compartibility with older jabber protocol.""" - if ERRORS.has_key(name): - cod,type,txt=ERRORS[name] - ns=name.split()[0] - else: cod,ns,type,txt='500',NS_STANZAS,'cancel','' - if typ: type=typ - if code: cod=code - if text: txt=text - Node.__init__(self,'error',{},[Node(name)]) - if type: self.setAttr('type',type) - if not cod: self.setName('stream:error') - if txt: self.addChild(node=Node(ns+' text',{},[txt])) - if cod: self.setAttr('code',cod) + """ XMPP-style error element. + In the case of stanza error should be attached to XMPP stanza. + In the case of stream-level errors should be used separately. """ + def __init__(self,name,code=None,typ=None,text=None): + """ Create new error node object. + Mandatory parameter: name - name of error condition. + Optional parameters: code, typ, text. Used for backwards compartibility with older jabber protocol.""" + if ERRORS.has_key(name): + cod,type,txt=ERRORS[name] + ns=name.split()[0] + else: cod,ns,type,txt='500',NS_STANZAS,'cancel','' + if typ: type=typ + if code: cod=code + if text: txt=text + Node.__init__(self,'error',{},[Node(name)]) + if type: self.setAttr('type',type) + if not cod: self.setName('stream:error') + if txt: self.addChild(node=Node(ns+' text',{},[txt])) + if cod: self.setAttr('code',cod) class Error(Protocol): - """ Used to quickly transform received stanza into error reply.""" - def __init__(self,node,error,reply=1): - """ Create error reply basing on the received 'node' stanza and the 'error' error condition. - If the 'node' is not the received stanza but locally created ('to' and 'from' fields needs not swapping) - specify the 'reply' argument as false.""" - if reply: Protocol.__init__(self,to=node.getFrom(),frm=node.getTo(),node=node) - else: Protocol.__init__(self,node=node) - self.setError(error) - if node.getType()=='error': self.__str__=self.__dupstr__ - def __dupstr__(self,dup1=None,dup2=None): - """ Dummy function used as preventor of creating error node in reply to error node. - I.e. you will not be able to serialise "double" error into string. - """ - return '' + """ Used to quickly transform received stanza into error reply.""" + def __init__(self,node,error,reply=1): + """ Create error reply basing on the received 'node' stanza and the 'error' error condition. + If the 'node' is not the received stanza but locally created ('to' and 'from' fields needs not swapping) + specify the 'reply' argument as false.""" + if reply: Protocol.__init__(self,to=node.getFrom(),frm=node.getTo(),node=node) + else: Protocol.__init__(self,node=node) + self.setError(error) + if node.getType()=='error': self.__str__=self.__dupstr__ + def __dupstr__(self,dup1=None,dup2=None): + """ Dummy function used as preventor of creating error node in reply to error node. + I.e. you will not be able to serialise "double" error into string. + """ + return '' class DataField(Node): - """ This class is used in the DataForm class to describe the single data item. - If you are working with jabber:x:data (XEP-0004, XEP-0068, XEP-0122) - then you will need to work with instances of this class. """ - def __init__(self,name=None,value=None,typ=None,required=0,desc=None,options=[],node=None): - """ Create new data field of specified name,value and type. - Also 'required','desc' and 'options' fields can be set. - Alternatively other XML object can be passed in as the 'node' parameted to replicate it as a new datafiled. - """ - Node.__init__(self,'field',node=node) - if name: self.setVar(name) - if type(value) in [list,tuple]: self.setValues(value) - elif value: self.setValue(value) - if typ: self.setType(typ) - elif not typ and not node: self.setType('text-single') - if required: self.setRequired(required) - if desc: self.setDesc(desc) - if options: self.setOptions(options) - def setRequired(self,req=1): - """ Change the state of the 'required' flag. """ - if req: self.setTag('required') - else: - try: self.delChild('required') - except ValueError: return - def isRequired(self): - """ Returns in this field a required one. """ - return self.getTag('required') - def setDesc(self,desc): - """ Set the description of this field. """ - self.setTagData('desc',desc) - def getDesc(self): - """ Return the description of this field. """ - return self.getTagData('desc') - def setValue(self,val): - """ Set the value of this field. """ - self.setTagData('value',val) - def getValue(self): - return self.getTagData('value') - def setValues(self,lst): - """ Set the values of this field as values-list. - Replaces all previous filed values! If you need to just add a value - use addValue method.""" - while self.getTag('value'): self.delChild('value') - for val in lst: self.addValue(val) - def addValue(self,val): - """ Add one more value to this field. Used in 'get' iq's or such.""" - self.addChild('value',{},[val]) - def getValues(self): - """ Return the list of values associated with this field.""" - ret=[] - for tag in self.getTags('value'): ret.append(tag.getData()) - return ret - def getOptions(self): - """ Return label-option pairs list associated with this field.""" - ret=[] - for tag in self.getTags('option'): ret.append([tag.getAttr('label'),tag.getTagData('value')]) - return ret - def setOptions(self,lst): - """ Set label-option pairs list associated with this field.""" - while self.getTag('option'): self.delChild('option') - for opt in lst: self.addOption(opt) - def addOption(self,opt): - """ Add one more label-option pair to this field.""" - if type(opt) in [str,unicode]: self.addChild('option').setTagData('value',opt) - else: self.addChild('option',{'label':opt[0]}).setTagData('value',opt[1]) - def getType(self): - """ Get type of this field. """ - return self.getAttr('type') - def setType(self,val): - """ Set type of this field. """ - return self.setAttr('type',val) - def getVar(self): - """ Get 'var' attribute value of this field. """ - return self.getAttr('var') - def setVar(self,val): - """ Set 'var' attribute value of this field. """ - return self.setAttr('var',val) + """ This class is used in the DataForm class to describe the single data item. + If you are working with jabber:x:data (XEP-0004, XEP-0068, XEP-0122) + then you will need to work with instances of this class. """ + def __init__(self,name=None,value=None,typ=None,required=0,desc=None,options=[],node=None): + """ Create new data field of specified name,value and type. + Also 'required','desc' and 'options' fields can be set. + Alternatively other XML object can be passed in as the 'node' parameted to replicate it as a new datafiled. + """ + Node.__init__(self,'field',node=node) + if name: self.setVar(name) + if type(value) in [list,tuple]: self.setValues(value) + elif value: self.setValue(value) + if typ: self.setType(typ) + elif not typ and not node: self.setType('text-single') + if required: self.setRequired(required) + if desc: self.setDesc(desc) + if options: self.setOptions(options) + def setRequired(self,req=1): + """ Change the state of the 'required' flag. """ + if req: self.setTag('required') + else: + try: self.delChild('required') + except ValueError: return + def isRequired(self): + """ Returns in this field a required one. """ + return self.getTag('required') + def setDesc(self,desc): + """ Set the description of this field. """ + self.setTagData('desc',desc) + def getDesc(self): + """ Return the description of this field. """ + return self.getTagData('desc') + def setValue(self,val): + """ Set the value of this field. """ + self.setTagData('value',val) + def getValue(self): + return self.getTagData('value') + def setValues(self,lst): + """ Set the values of this field as values-list. + Replaces all previous filed values! If you need to just add a value - use addValue method.""" + while self.getTag('value'): self.delChild('value') + for val in lst: self.addValue(val) + def addValue(self,val): + """ Add one more value to this field. Used in 'get' iq's or such.""" + self.addChild('value',{},[val]) + def getValues(self): + """ Return the list of values associated with this field.""" + ret=[] + for tag in self.getTags('value'): ret.append(tag.getData()) + return ret + def getOptions(self): + """ Return label-option pairs list associated with this field.""" + ret=[] + for tag in self.getTags('option'): ret.append([tag.getAttr('label'),tag.getTagData('value')]) + return ret + def setOptions(self,lst): + """ Set label-option pairs list associated with this field.""" + while self.getTag('option'): self.delChild('option') + for opt in lst: self.addOption(opt) + def addOption(self,opt): + """ Add one more label-option pair to this field.""" + if type(opt) in [str,unicode]: self.addChild('option').setTagData('value',opt) + else: self.addChild('option',{'label':opt[0]}).setTagData('value',opt[1]) + def getType(self): + """ Get type of this field. """ + return self.getAttr('type') + def setType(self,val): + """ Set type of this field. """ + return self.setAttr('type',val) + def getVar(self): + """ Get 'var' attribute value of this field. """ + return self.getAttr('var') + def setVar(self,val): + """ Set 'var' attribute value of this field. """ + return self.setAttr('var',val) class DataForm(Node): - """ DataForm class. Used for manipulating dataforms in XMPP. - Relevant XEPs: 0004, 0068, 0122. - Can be used in disco, pub-sub and many other applications.""" - def __init__(self, typ=None, data=[], title=None, node=None): - """ - Create new dataform of type 'typ'. 'data' is the list of DataField - instances that this dataform contains, 'title' - the title string. - You can specify the 'node' argument as the other node to be used as - base for constructing this dataform. + """ DataForm class. Used for manipulating dataforms in XMPP. + Relevant XEPs: 0004, 0068, 0122. + Can be used in disco, pub-sub and many other applications.""" + def __init__(self, typ=None, data=[], title=None, node=None): + """ + Create new dataform of type 'typ'. 'data' is the list of DataField + instances that this dataform contains, 'title' - the title string. + You can specify the 'node' argument as the other node to be used as + base for constructing this dataform. - title and instructions is optional and SHOULD NOT contain newlines. - Several instructions MAY be present. - 'typ' can be one of ('form' | 'submit' | 'cancel' | 'result' ) - 'typ' of reply iq can be ( 'result' | 'set' | 'set' | 'result' ) respectively. - 'cancel' form can not contain any fields. All other forms contains AT LEAST one field. - 'title' MAY be included in forms of type "form" and "result" - """ - Node.__init__(self,'x',node=node) - if node: - newkids=[] - for n in self.getChildren(): - if n.getName()=='field': newkids.append(DataField(node=n)) - else: newkids.append(n) - self.kids=newkids - if typ: self.setType(typ) - self.setNamespace(NS_DATA) - if title: self.setTitle(title) - if type(data)==type({}): - newdata=[] - for name in data.keys(): newdata.append(DataField(name,data[name])) - data=newdata - for child in data: - if type(child) in [type(''),type(u'')]: self.addInstructions(child) - elif child.__class__.__name__=='DataField': self.kids.append(child) - else: self.kids.append(DataField(node=child)) - def getType(self): - """ Return the type of dataform. """ - return self.getAttr('type') - def setType(self,typ): - """ Set the type of dataform. """ - self.setAttr('type',typ) - def getTitle(self): - """ Return the title of dataform. """ - return self.getTagData('title') - def setTitle(self,text): - """ Set the title of dataform. """ - self.setTagData('title',text) - def getInstructions(self): - """ Return the instructions of dataform. """ - return self.getTagData('instructions') - def setInstructions(self,text): - """ Set the instructions of dataform. """ - self.setTagData('instructions',text) - def addInstructions(self,text): - """ Add one more instruction to the dataform. """ - self.addChild('instructions',{},[text]) - def getField(self,name): - """ Return the datafield object with name 'name' (if exists). """ - return self.getTag('field',attrs={'var':name}) - def setField(self,name): - """ Create if nessessary or get the existing datafield object with name 'name' and return it. """ - f=self.getField(name) - if f: return f - return self.addChild(node=DataField(name)) - def asDict(self): - """ Represent dataform as simple dictionary mapping of datafield names to their values.""" - ret={} - for field in self.getTags('field'): - name=field.getAttr('var') - typ=field.getType() - if type(typ) in [type(''),type(u'')] and typ[-6:]=='-multi': - val=[] - for i in field.getTags('value'): val.append(i.getData()) - else: val=field.getTagData('value') - ret[name]=val - if self.getTag('instructions'): ret['instructions']=self.getInstructions() - return ret - def __getitem__(self,name): - """ Simple dictionary interface for getting datafields values by their names.""" - item=self.getField(name) - if item: return item.getValue() - raise IndexError('No such field') - def __setitem__(self,name,val): - """ Simple dictionary interface for setting datafields values by their names.""" - return self.setField(name).setValue(val) + title and instructions is optional and SHOULD NOT contain newlines. + Several instructions MAY be present. + 'typ' can be one of ('form' | 'submit' | 'cancel' | 'result' ) + 'typ' of reply iq can be ( 'result' | 'set' | 'set' | 'result' ) respectively. + 'cancel' form can not contain any fields. All other forms contains AT LEAST one field. + 'title' MAY be included in forms of type "form" and "result" + """ + Node.__init__(self,'x',node=node) + if node: + newkids=[] + for n in self.getChildren(): + if n.getName()=='field': newkids.append(DataField(node=n)) + else: newkids.append(n) + self.kids=newkids + if typ: self.setType(typ) + self.setNamespace(NS_DATA) + if title: self.setTitle(title) + if type(data)==type({}): + newdata=[] + for name in data.keys(): newdata.append(DataField(name,data[name])) + data=newdata + for child in data: + if type(child) in [type(''),type(u'')]: self.addInstructions(child) + elif child.__class__.__name__=='DataField': self.kids.append(child) + else: self.kids.append(DataField(node=child)) + def getType(self): + """ Return the type of dataform. """ + return self.getAttr('type') + def setType(self,typ): + """ Set the type of dataform. """ + self.setAttr('type',typ) + def getTitle(self): + """ Return the title of dataform. """ + return self.getTagData('title') + def setTitle(self,text): + """ Set the title of dataform. """ + self.setTagData('title',text) + def getInstructions(self): + """ Return the instructions of dataform. """ + return self.getTagData('instructions') + def setInstructions(self,text): + """ Set the instructions of dataform. """ + self.setTagData('instructions',text) + def addInstructions(self,text): + """ Add one more instruction to the dataform. """ + self.addChild('instructions',{},[text]) + def getField(self,name): + """ Return the datafield object with name 'name' (if exists). """ + return self.getTag('field',attrs={'var':name}) + def setField(self,name): + """ Create if nessessary or get the existing datafield object with name 'name' and return it. """ + f=self.getField(name) + if f: return f + return self.addChild(node=DataField(name)) + def asDict(self): + """ Represent dataform as simple dictionary mapping of datafield names to their values.""" + ret={} + for field in self.getTags('field'): + name=field.getAttr('var') + typ=field.getType() + if type(typ) in [type(''),type(u'')] and typ[-6:]=='-multi': + val=[] + for i in field.getTags('value'): val.append(i.getData()) + else: val=field.getTagData('value') + ret[name]=val + if self.getTag('instructions'): ret['instructions']=self.getInstructions() + return ret + def __getitem__(self,name): + """ Simple dictionary interface for getting datafields values by their names.""" + item=self.getField(name) + if item: return item.getValue() + raise IndexError('No such field') + def __setitem__(self,name,val): + """ Simple dictionary interface for setting datafields values by their names.""" + return self.setField(name).setValue(val) diff --git a/src/common/xmpp/transports_nb.py b/src/common/xmpp/transports_nb.py index a4f847764..4ca3c51aa 100644 --- a/src/common/xmpp/transports_nb.py +++ b/src/common/xmpp/transports_nb.py @@ -27,10 +27,34 @@ import errno import time import traceback -import thread +import threading import logging log = logging.getLogger('gajim.c.x.transports_nb') +consoleloghandler = logging.StreamHandler() +consoleloghandler.setLevel(logging.DEBUG) +consoleloghandler.setFormatter( + logging.Formatter('%(levelname)s: %(message)s') +) +log.setLevel(logging.DEBUG) +log.addHandler(consoleloghandler) +log.propagate = False + + +def urisplit(self, uri): + ''' + Function for splitting URI string to tuple (protocol, host, path). + e.g. urisplit('http://httpcm.jabber.org/webclient') returns + ('http', 'httpcm.jabber.org', '/webclient') + ''' + import re + regex = '(([^:/]+)(://))?([^/]*)(/?.*)' + grouped = re.match(regex, uri).groups() + proto, host, path = grouped[1], grouped[3], grouped[4] + return proto, host, path + + + # I don't need to load gajim.py just because of few TLS variables, so I changed # %s/common\.gajim\.DATA_DIR/\'\.\.\/data\'/c @@ -39,13 +63,438 @@ log = logging.getLogger('gajim.c.x.transports_nb') # To change it back do: # %s/\'\.\.\/data\'/common\.gajim\.DATA_DIR/c # %s/\'%s\/\.gajim\/cacerts\.pem\'\ %\ os\.environ\[\'HOME\'\]/common\.gajim\.MY_CACERTS/c +# TODO: make the paths configurable - as constructor parameters or sth + # import common.gajim +# timeout to connect to the server socket, it doesn't include auth +CONNECT_TIMEOUT_SECONDS = 30 + +# how long to wait for a disconnect to complete +DISCONNECT_TIMEOUT_SECONDS = 10 + +# size of the buffer which reads data from server +# if lower, more stanzas will be fragmented and processed twice +RECV_BUFSIZE = 32768 # 2x maximum size of ssl packet, should be plenty +#RECV_BUFSIZE = 16 # FIXME: (#2634) gajim breaks with this setting: it's inefficient but should work. DATA_RECEIVED='DATA RECEIVED' DATA_SENT='DATA SENT' + +DISCONNECTED ='DISCONNECTED' +CONNECTING ='CONNECTING' +CONNECTED ='CONNECTED' +DISCONNECTING ='DISCONNECTING' + +class NonBlockingTcp(PlugIn, IdleObject): + ''' + Non-blocking TCP socket wrapper + ''' + def __init__(self, on_disconnect): + ''' + Class constructor. + ''' + + PlugIn.__init__(self) + IdleObject.__init__(self) + + self.on_disconnect = on_disconnect + + self.on_connect = None + self.on_connect_failure = None + self.sock = None + self.idlequeue = None + self.on_receive = None + self.DBG_LINE='socket' + self.state = DISCONNECTED + + # writable, readable - keep state of the last pluged flags + # This prevents replug of same object with the same flags + self.writable = True + self.readable = False + + # queue with messages to be send + self.sendqueue = [] + + # time to wait for SOME stanza to come and then send keepalive + self.sendtimeout = 0 + + # in case we want to something different than sending keepalives + self.on_timeout = None + + # bytes remained from the last send message + self.sendbuff = '' + self._exported_methods=[self.disconnect, self.onreceive, self.set_send_timeout, + self.set_timeout, self.remove_timeout] + + def plugin(self, owner): + owner.Connection=self + print 'plugin called' + self.idlequeue = owner.idlequeue + + def plugout(self): + self._owner.Connection = None + self._owner = None + + + def get_fd(self): + try: + tmp = self._sock.fileno() + return tmp + except: + return 0 + + def connect(self, conn_5tuple, on_connect, on_connect_failure): + ''' + Creates and connects socket to server and port defined in conn_5tupe which + should be list item returned from getaddrinfo. + :param conn_5tuple: 5-tuple returned from getaddrinfo + :param on_connect: callback called on successful tcp connection + :param on_connect_failure: callback called on failure when estabilishing tcp + connection + ''' + self.on_connect = on_connect + self.on_connect_failure = on_connect_failure + (self.server, self.port) = conn_5tuple[4] + log.debug('NonBlocking Connect :: About tot connect to %s:%s' % conn_5tuple[4]) + try: + self._sock = socket.socket(*conn_5tuple[:3]) + except socket.error, (errnum, errstr): + on_connect_failure('NonBlockingTcp: Error while creating socket: %s %s' % (errnum, errstr)) + return + + self._send = self._sock.send + self._recv = self._sock.recv + self.fd = self._sock.fileno() + self.idlequeue.plug_idle(self, True, False) + + errnum = 0 + ''' variable for errno symbol that will be found from exception raised from connect() ''' + + # set timeout for TCP connecting - if nonblocking connect() fails, pollend + # is called. If if succeeds pollout is called. + self.idlequeue.set_read_timeout(self.get_fd(), CONNECT_TIMEOUT_SECONDS) + + try: + self._sock.setblocking(False) + self._sock.connect((self.server,self.port)) + except Exception, (errnum, errstr): + pass + + if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): + # connecting in progress + self.set_state(CONNECTING) + log.debug('After connect. "%s" raised => CONNECTING' % errstr) + # on_connect/failure will be called from self.pollin/self.pollout + return + elif errnum in (0, 10056, errno.EISCONN): + # already connected - this branch is very unlikely, nonblocking connect() will + # return EINPROGRESS exception in most cases. When here, we don't need timeout + # on connected descriptor and success callback can be called. + log.debug('After connect. "%s" raised => CONNECTED' % errstr) + self._on_connect(self) + return + + # if there was some other error, call failure callback and unplug transport + # which will also remove read_timeouts for descriptor + self._on_connect_failure('Exception while connecting to %s:%s - %s %s' % + (self.server, self.port, errnum, errstr)) + + def _on_connect(self, data): + ''' preceeds call of on_connect callback ''' + self.set_state(CONNECTED) + self.idlequeue.remove_timeout(self.get_fd()) + self.on_connect() + + + def set_state(self, newstate): + assert(newstate in [DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING]) + if (self.state, newstate) in [(CONNECTING, DISCONNECTING), (DISCONNECTED, DISCONNECTING)]: + log.info('strange move: %s -> %s' % (self.state, newstate)) + self.state = newstate + + + def _on_connect_failure(self,err_message): + ''' preceeds call of on_connect_failure callback ''' + # In case of error while connecting we need to close socket + # but we don't want to call DisconnectHandlers from client, + # thus the do_callback=False + self.disconnect(do_callback=False) + self.on_connect_failure(err_message=err_message) + + + + def pollin(self): + '''called when receive on plugged socket is possible ''' + log.debug('pollin called, state == %s' % self.state) + self._do_receive() + + def pollout(self): + '''called when send to plugged socket is possible''' + log.debug('pollout called, state == %s' % self.state) + + if self.state==CONNECTING: + self._on_connect(self) + return + self._do_send() + + def pollend(self): + log.debug('pollend called, state == %s' % self.state) + + if self.state==CONNECTING: + self._on_connect_failure('Error during connect to %s:%s' % + (self.server, self.port)) + else : + self.disconnect() + + def disconnect(self, do_callback=True): + if self.state == DISCONNECTED: + return + self.idlequeue.unplug_idle(self.get_fd()) + try: + self._sock.shutdown(socket.SHUT_RDWR) + self._sock.close() + except socket.error, (errnum, errstr): + log.error('Error disconnecting a socket: %s %s' % (errnum,errstr)) + self.set_state(DISCONNECTED) + if do_callback: + # invoke callback given in __init__ + self.on_disconnect() + + def read_timeout(self): + ''' + Implemntation of IdleObject function called on timeouts from IdleQueue. + ''' + log.debug('read_timeout called, state == %s' % self.state) + if self.state==CONNECTING: + # if read_timeout is called during connecting, connect() didn't end yet + # thus we have to call the tcp failure callback + self._on_connect_failure('Error during connect to %s:%s' % + (self.server, self.port)) + else: + if self.on_timeout: + self.on_timeout() + self.renew_send_timeout() + + def renew_send_timeout(self): + if self.on_timeout and self.sendtimeout > 0: + self.set_timeout(self.sendtimeout) + else: + self.remove_timeout() + + def set_send_timeout(self, timeout, on_timeout): + self.sendtimeout = timeout + if self.sendtimeout > 0: + self.on_timeout = on_timeout + else: + self.on_timeout = None + + def set_timeout(self, timeout): + if self.state in [CONNECTING, CONNECTED] and self.get_fd() > 0: + self.idlequeue.set_read_timeout(self.get_fd(), timeout) + + def remove_timeout(self): + if self.get_fd(): + self.idlequeue.remove_timeout(self.get_fd()) + + def send(self, raw_data, now=False): + '''Append raw_data to the queue of messages to be send. + If supplied data is unicode string, encode it to utf-8. + ''' + + if self.state not in [CONNECTED, DISCONNECTING]: + log.error('Trying to send %s when transport is %s.' % + (raw_data, self.state)) + return + r = raw_data + if isinstance(r, unicode): + r = r.encode('utf-8') + elif not isinstance(r, str): + r = ustr(r).encode('utf-8') + if now: + self.sendqueue.insert(0, r) + self._do_send() + else: + self.sendqueue.append(r) + self._plug_idle() + + + + def _plug_idle(self): + # readable if socket is connected or disconnecting + readable = self.state != DISCONNECTED + # writeable if sth to send + if self.sendqueue or self.sendbuff: + writable = True + else: + writable = False + print 'About to plug fd %d, W:%s, R:%s' % (self.get_fd(), writable, readable) + if self.writable != writable or self.readable != readable: + print 'Really plugging fd %d, W:%s, R:%s' % (self.get_fd(), writable, readable) + self.idlequeue.plug_idle(self, writable, readable) + else: + print 'Not plugging - is already plugged' + + + + def _do_send(self): + if not self.sendbuff: + if not self.sendqueue: + return None # nothing to send + self.sendbuff = self.sendqueue.pop(0) + try: + send_count = self._send(self.sendbuff) + if send_count: + sent_data = self.sendbuff[:send_count] + self.sendbuff = self.sendbuff[send_count:] + self._plug_idle() + self._raise_event(DATA_SENT, sent_data) + + except socket.error, e: + log.error('_do_send:', exc_info=True) + traceback.print_exc() + self.disconnect() + + def _raise_event(self, event_type, data): + if data and data.strip(): + log.debug('raising event from transport: %s %s' % (event_type,data)) + if hasattr(self._owner, 'Dispatcher'): + self._owner.Dispatcher.Event('', event_type, data) + + def onreceive(self, recv_handler): + ''' Sets the on_receive callback. Do not confuse it with + on_receive() method, which is the callback itself.''' + if not recv_handler: + if hasattr(self._owner, 'Dispatcher'): + self.on_receive = self._owner.Dispatcher.ProcessNonBlocking + else: + self.on_receive = None + return + self.on_receive = recv_handler + + + def _do_receive(self): + ''' Reads all pending incoming data. Calls owner's disconnected() method if appropriate.''' + ERR_DISCONN = -2 # Misc error signifying that we got disconnected + received = None + errnum = 0 + errstr = 'No Error Set' + + try: + # get as many bites, as possible, but not more than RECV_BUFSIZE + received = self._recv(RECV_BUFSIZE) + except (socket.error, socket.herror, socket.gaierror), (errnum, errstr): + # save exception number and message to errnum, errstr + log.debug("_do_receive: got %s:" % received , exc_info=True) + + if received == '': + errnum = ERR_DISCONN + errstr = "Connection closed unexpectedly" + + if errnum in (ERR_DISCONN, errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN): + # ECONNRESET - connection you are trying to access has been reset by the peer + # ENOTCONN - Transport endpoint is not connected + # ESHUTDOWN - shutdown(2) has been called on a socket to close down the + # sending end of the transmision, and then data was attempted to be sent + log.error("Connection to %s lost: %s %s" % ( self.server, errnum, errstr)) + self.disconnect() + return + + if received is None: + # in case of some other exception + # FIXME: is this needed?? + if errnum != 0: + self.DEBUG(self.DBG, errstr, 'error') + log.error("CConnection to %s lost: %s %s" % (self.server, errnum, errstr)) + if not errors_only and self.state in [CONNECTING, CONNECTED]: + self.pollend(retry=True) + return + received = '' + + # we have received some bytes, stop the timeout! + self.renew_send_timeout() + if self.on_receive: + self._raise_event(DATA_RECEIVED, received) + self._on_receive(received) + else: + # This should never happen, so we need the debug + log.error('SOCKET Unhandled data received: %s' % received) + self.disconnect() + + def _on_receive(self, data): + # Overriding this method allows modifying received data before it is passed + # to callback. + self.on_receive(data) + + +class NonBlockingHttpBOSH(NonBlockingTcp): + ''' + Socket wrapper that makes HTTP message out of send data and peels-off + HTTP headers from incoming messages + ''' + + def __init__(self, bosh_uri, bosh_port, on_disconnect): + self.bosh_protocol, self.bosh_host, self.bosh_path = self.urisplit(bosh_uri) + if self.bosh_protocol is None: + self.bosh_protocol = 'http' + if self.bosh_path == '': + bosh_path = '/' + self.bosh_port = bosh_port + + def send(self, raw_data, now=False): + + NonBlockingTcp.send( + self, + self.build_http_message(raw_data), + now) + + def _on_receive(self,data): + '''Preceeds pass of received data to Client class. Gets rid of HTTP headers + and checks them.''' + statusline, headers, httpbody = self.parse_http_message(data) + if statusline[1] != '200': + log.error('HTTP Error: %s %s' % (statusline[1], statusline[2])) + self.disconnect() + self.on_receive(httpbody) + + + def build_http_message(self, httpbody): + ''' + Builds bosh http message with given body. + Values for headers and status line fields are taken from class variables. + ) + ''' + headers = ['POST %s HTTP/1.1' % self.bosh_path, + 'Host: %s:%s' % (self.bosh_host, self.bosh_port), + 'Content-Type: text/xml; charset=utf-8', + 'Content-Length: %s' % len(str(httpbody)), + '\r\n'] + headers = '\r\n'.join(headers) + return('%s%s\r\n' % (headers, httpbody)) + + def parse_http_message(self, message): + ''' + splits http message to tuple ( + statusline - list of e.g. ['HTTP/1.1', '200', 'OK'], + headers - dictionary of headers e.g. {'Content-Length': '604', + 'Content-Type': 'text/xml; charset=utf-8'}, + httpbody - string with http body + ) + ''' + message = message.replace('\r','') + (header, httpbody) = message.split('\n\n') + header = header.split('\n') + statusline = header[0].split(' ') + header = header[1:] + headers = {} + for dummy in header: + row = dummy.split(' ',1) + headers[row[0][:-1]] = row[1] + return (statusline, headers, httpbody) + + USE_PYOPENSSL = False try: @@ -62,16 +511,6 @@ except ImportError: print >> sys.stderr, "PyOpenSSL not found, falling back to Python builtin SSL objects (insecure)." print >> sys.stderr, "=" * 79 -# timeout to connect to the server socket, it doesn't include auth -CONNECT_TIMEOUT_SECONDS = 30 - -# how long to wait for a disconnect to complete -DISCONNECT_TIMEOUT_SECONDS = 10 - -# size of the buffer which reads data from server -# if lower, more stanzas will be fragmented and processed twice -RECV_BUFSIZE = 32768 # 2x maximum size of ssl packet, should be plenty -#RECV_BUFSIZE = 16 # FIXME: (#2634) gajim breaks with this setting: it's inefficient but should work. def torf(cond, tv, fv): if cond: return tv @@ -243,439 +682,6 @@ class StdlibSSLWrapper(SSLWrapper): raise SSLWrapper.Error(self.sock or self.sslobj, e) return 0 -class NonBlockingTcp(PlugIn, IdleObject): - ''' This class can be used instead of transports.Tcp in threadless implementations ''' - def __init__(self, on_connect = None, on_connect_failure = None, server=None, use_srv = True): - ''' Cache connection point 'server'. 'server' is the tuple of (host, port) - absolutely the same as standard tcp socket uses. - on_connect - called when we connect to the socket - on_connect_failure - called if there was error connecting to socket - ''' - IdleObject.__init__(self) - PlugIn.__init__(self) - self.DBG_LINE='socket' - self._exported_methods=[self.send, self.disconnect, self.onreceive, self.set_send_timeout, - self.start_disconnect, self.set_timeout, self.remove_timeout] - self._server = server - self.on_connect = on_connect - self.on_connect_failure = on_connect_failure - self.on_receive = None - self.on_disconnect = None - self.printed_error = False - - # 0 - not connected - # 1 - connected - # -1 - about to disconnect (when we wait for final events to complete) - # -2 - disconnected - self.state = 0 - - # queue with messages to be send - self.sendqueue = [] - - # bytes remained from the last send message - self.sendbuff = '' - - # time to wait for SOME stanza to come and then send keepalive - self.sendtimeout = 0 - - # in case we want to something different than sending keepalives - self.on_timeout = None - - # writable, readable - keep state of the last pluged flags - # This prevents replug of same object with the same flags - self.writable = True - self.readable = False - self.ais = None - - def plugin(self, owner): - ''' Fire up connection. Return non-empty string on success. - Also registers self.disconnected method in the owner's dispatcher. - Called internally. ''' - self.idlequeue = owner.idlequeue - self.printed_error = False - if not self._server: - self._server=(self._owner.Server,5222) - if self.connect(self._server) is False: - return False - return True - - def read_timeout(self): - if self.state == 0: - self.idlequeue.unplug_idle(self.fd) - if self.on_connect_failure: - self.on_connect_failure() - else: - if self.on_timeout: - self.on_timeout() - self.renew_send_timeout() - - def connect(self,server=None, proxy = None, secure = None): - ''' Try to establish connection. ''' - if not server: - server=self._server - else: - self._server = server - self.printed_error = False - self.state = 0 - try: - self.set_timeout(CONNECT_TIMEOUT_SECONDS) - if len(server) == 2 and type(server[0]) in (str, unicode) and not \ - self.ais: - # FIXME: blocks here - self.ais = socket.getaddrinfo(server[0],server[1],socket.AF_UNSPEC,socket.SOCK_STREAM) - log.info('Found IPs: %s', self.ais) - else: - self.ais = (server,) - self.connect_to_next_ip() - return - except socket.gaierror, e: - log.info('Lookup failure for %s: %s[%s]', self.getName(), e[1], repr(e[0]), exc_info=True) - except: - log.error('Exception trying to connect to %s:', self.getName(), exc_info=True) - - if self.on_connect_failure: - self.on_connect_failure() - - def _plug_idle(self): - # readable if socket is connected or disconnecting - readable = self.state != 0 - # writeable if sth to send - if self.sendqueue or self.sendbuff: - writable = True - else: - writable = False - if self.writable != writable or self.readable != readable: - self.idlequeue.plug_idle(self, writable, readable) - - def pollout(self): - print 'pollout called - send possible' - if self.state == 0: - self.connect_to_next_ip() - return - self._do_send() - - def plugout(self): - ''' Disconnect from the remote server and unregister self.disconnected method from - the owner's dispatcher. ''' - self.disconnect() - self._owner.Connection = None - self._owner = None - - def pollin(self): - print 'pollin called - receive possible' - self._do_receive() - - def pollend(self, retry=False): - if not self.printed_error: - self.printed_error = True - try: self._do_receive(errors_only=True) - except: log.error("pollend: Got exception from _do_receive:", exc_info=True) - conn_failure_cb = self.on_connect_failure - self.disconnect() - if conn_failure_cb: - conn_failure_cb(retry) - - def disconnect(self): - if self.state == -2: # already disconnected - return - self.state = -2 - self.sendqueue = None - self.remove_timeout() - try: - self._owner.disconnected() - except: - pass - self.idlequeue.unplug_idle(self.fd) - sock = getattr(self, '_sock', None) - if sock: - try: - sock.shutdown(socket.SHUT_RDWR) - except socket.error, e: - if e[0] != errno.ENOTCONN: - log.error("Error shutting down socket for %s:", self.getName(), exc_info=True) - try: sock.close() - except: log.error("Error closing socket for %s:", self.getName(), exc_info=True) - # socket descriptor cannot be (un)plugged anymore - self.fd = -1 - if self.on_disconnect: - self.on_disconnect() - self.on_connect_failure = None - - def end_disconnect(self): - ''' force disconnect only if we are still trying to disconnect ''' - if self.state == -1: - self.disconnect() - - def start_disconnect(self, to_send, on_disconnect): - self.on_disconnect = on_disconnect - - # flush the sendqueue - while self.sendqueue: - self._do_send() - - self.sendqueue = [] - self.send(to_send) - self.send('') - self.state = -1 # about to disconnect - self.idlequeue.set_alarm(self.end_disconnect, DISCONNECT_TIMEOUT_SECONDS) - - def set_timeout(self, timeout): - if self.state >= 0 and self.fd > 0: - self.idlequeue.set_read_timeout(self.fd, timeout) - - def remove_timeout(self): - if self.fd: - self.idlequeue.remove_timeout(self.fd) - - def onreceive(self, recv_handler): - ''' Sets the on_receive callback. Do not confuse it with - on_receive() method, which is the callback itself. - - If recv_handler==None, it tries to set that callback assuming that - our owner also has a Dispatcher object plugged in, to its - ProcessNonBlocking method.''' - if not recv_handler: - if hasattr(self._owner, 'Dispatcher'): - self.on_receive = self._owner.Dispatcher.ProcessNonBlocking - else: - self.on_receive = None - return - _tmp = self.on_receive - # make sure this cb is not overriden by recursive calls - if not recv_handler(None) and _tmp == self.on_receive: - self.on_receive = recv_handler - - def _do_receive(self, errors_only=False): - ''' Reads all pending incoming data. Calls owner's disconnected() method if appropriate.''' - ERR_DISCONN = -2 # Misc error signifying that we got disconnected - ERR_OTHER = -1 # Other error - received = None - errnum = 0 - errtxt = 'No Error Set' - try: - # get as many bites, as possible, but not more than RECV_BUFSIZE - received = self._recv(RECV_BUFSIZE) - except (socket.error, socket.herror, socket.gaierror), e: - log.debug("_do_receive: got %s:", e.__class__, exc_info=True) - #traceback.print_exc() - #print "Current Stack:" - #traceback.print_stack() - errnum = e[0] - errtxt = str(errnum) + ':' + e[1] - except socket.sslerror, e: - log.error("_do_receive: got unknown %s:", e.__class__, exc_info=True) - #traceback.print_exc() - #print "Current Stack:" - #traceback.print_stack() - errnum = ERR_OTHER - errtxt = repr("socket.sslerror: " + e.args) - except SSLWrapper.Error, e: - log.debug("Caught: %s", str(e)) - errnum = gattr(e, 'errno', ERR_OTHER) - if not errnum: errnum = ERR_OTHER # unset, but we must put a status - errtxt = gattr(e, 'strerror') or repr(e.args) - - if received == '': - errnum = ERR_DISCONN - errtxt = "Connection closed unexpectedly" - - if errnum in (ERR_DISCONN, errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN): - log.error("Connection to %s lost: %s [%d]", self.getName(), errtxt, errnum) - self.printed_error = True - if not errors_only: - self.pollend(retry=(errnum in (ERR_DISCONN, errno.ECONNRESET))) - # don't process result, because it will raise an error - return - - if received is None: - if errnum != 0: - self.DEBUG(errtxt, 'error') - log.error("Connection to %s lost: %s [%d]", self.getName(), errtxt, errnum) - self.printed_error = True - if not errors_only and self.state >= 0: - self.pollend(retry=True) - return - received = '' - - if errors_only or self.state < 0: - return - - # we have received some bites, stop the timeout! - self.renew_send_timeout() - if self.on_receive: - if received.strip(): - self.DEBUG(received, 'got') - if hasattr(self._owner, 'Dispatcher'): - self._owner.Dispatcher.Event('', DATA_RECEIVED, received) - self.on_receive(received) - else: - # This should never happed, so we need the debug - self.DEBUG('Unhandled data received: %s' % received,'got') - self.disconnect() - if self.on_connect_failure: - self.on_connect_failure() - return True - - def _do_send(self): - if not self.sendbuff: - if not self.sendqueue: - return None # nothing to send - self.sendbuff = self.sendqueue.pop(0) - self.sent_data = self.sendbuff - try: - send_count = self._send(self.sendbuff) - if send_count: - self.sendbuff = self.sendbuff[send_count:] - if not self.sendbuff and not self.sendqueue: - if self.state < 0: - self.idlequeue.unplug_idle(self.fd) - self._on_send() - self.disconnect() - return - # we are not waiting for write - self._plug_idle() - self._on_send() - except socket.error, e: - if e[0] == socket.SSL_ERROR_WANT_WRITE: - return True - log.error("_do_send:", exc_info=True) - #traceback.print_exc() - if self.state < 0: - self.disconnect() - return - if self._on_send_failure: - self._on_send_failure() - return - return True - - def connect_to_next_ip(self): - if self.state != 0: - return - if len(self.ais) == 0: - if self.on_connect_failure: - self.on_connect_failure() - return - ai = self.ais.pop(0) - log.info('Trying to connect to %s:%s', ai[4][0], ai[4][1]) - try: - self._sock = socket.socket(*ai[:3]) - self._server=ai[4] - except socket.error, e: - errnum, errstr = e - - # Ignore "Socket already connected". - # FIXME: This happens when we switch an already - # connected socket to SSL (STARTTLS). Instead of - # ignoring the error, the socket should only be - # connected to once. See #2846 and #3396. - workaround = (errno.EALREADY, 10056, 56) - - # 10035 - winsock equivalent of EINPROGRESS - if errnum not in (errno.EINPROGRESS, 10035) + workaround: - log.error('Could not connect to %s: %s [%s]', ai[4][0], errnum, - errstr, exc_info=True) - #traceback.print_exc() - self.connect_to_next_ip() - return - self.fd = self._sock.fileno() - self.idlequeue.plug_idle(self, True, False) - self._send = self._sock.send - self._recv = self._sock.recv - self._do_connect() - - def _do_connect(self): - errnum = 0 - - try: - print "==============sock.connect called" - self._sock.connect(self._server) - self._sock.setblocking(False) - except Exception, ee: - (errnum, errstr) = ee - # in progress, or would block - print "errnum: %s" % errnum - if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): - self.state = 1 - return - # 10056 - already connected, only on win32 - # code 'WS*' is not available on GNU, so we use its numeric value - elif errnum not in (0, 10056, errno.EISCONN): - log.error('Could not connect to %s: %s [%s]', self._server[0], errnum, - errstr) - self.connect_to_next_ip() - return - self.remove_timeout() - self._owner.Connection=self - self.state = 1 - - self._sock.setblocking(False) - self._plug_idle() - if self.on_connect: - self.on_connect() - self.on_connect = None - - def send(self, raw_data, now = False): - '''Append raw_data to the queue of messages to be send. - If supplied data is unicode string, encode it to utf-8. - ''' - - if self.state <= 0: - return - r = raw_data - if isinstance(r, unicode): - r = r.encode('utf-8') - elif not isinstance(r, str): - r = ustr(r).encode('utf-8') - if now: - self.sendqueue.insert(0, r) - self._do_send() - else: - self.sendqueue.append(r) - - self._plug_idle() - - def _on_send(self): - if self.sent_data and self.sent_data.strip(): - self.DEBUG(self.sent_data,'sent') - if hasattr(self._owner, 'Dispatcher'): - self._owner.Dispatcher.Event('', DATA_SENT, self.sent_data) - self.sent_data = None - - def _on_send_failure(self): - self.DEBUG("Socket error while sending data",'error') - self._owner.disconnected() - self.sent_data = None - - def set_send_timeout(self, timeout, on_timeout): - self.sendtimeout = timeout - if self.sendtimeout > 0: - self.on_timeout = on_timeout - else: - self.on_timeout = None - - def renew_send_timeout(self): - if self.on_timeout and self.sendtimeout > 0: - self.set_timeout(self.sendtimeout) - else: - self.remove_timeout() - - def getHost(self): - ''' Return the 'host' value that is connection is [will be] made to.''' - return self._server[0] - - def getName(self): - ''' Return the server's name, or 'getHost()' if not available.''' - retval = None - try: - retval = gattr(self._owner, 'name') - except: - pass - if retval: return retval - return self.getHost() - - def getPort(self): - ''' Return the 'port' value that is connection is [will be] made to.''' - return self._server[1] class NonBlockingTLS(PlugIn): ''' TLS connection used to encrypts already estabilished tcp connection.''' @@ -687,7 +693,7 @@ class NonBlockingTLS(PlugIn): "SSL_CB_ALERT": 0x4000, "SSL_CB_HANDSHAKE_START": 0x10, "SSL_CB_HANDSHAKE_DONE": 0x20} - def PlugIn(self, owner, now=0, on_tls_start = None): + def PlugIn(self, owner, on_tls_success, on_tls_failure, now=0): ''' If the 'now' argument is true then starts using encryption immidiatedly. If 'now' in false then starts encryption as soon as TLS feature is declared by the server (if it were already declared - it is ok). @@ -696,7 +702,8 @@ class NonBlockingTLS(PlugIn): return # Already enabled. PlugIn.PlugIn(self, owner) DBG_LINE='NonBlockingTLS' - self.on_tls_start = on_tls_start + self.on_tls_success = on_tls_success + self.on_tls_faliure = on_tls_failure if now: try: res = self._startSSL() @@ -705,7 +712,7 @@ class NonBlockingTLS(PlugIn): #traceback.print_exc() self._owner.socket.pollend() return - self.tls_start() + on_tls_success() return res if self._owner.Dispatcher.Stream.features: try: @@ -725,23 +732,17 @@ class NonBlockingTLS(PlugIn): self._owner.Dispatcher.PlugOut() self._owner = None - def tls_start(self): - if self.on_tls_start: - self.on_tls_start() - self.on_tls_start = None - def FeaturesHandler(self, conn, feats): ''' Used to analyse server tag for TLS support. If TLS is supported starts the encryption negotiation. Used internally ''' if not feats.getTag('starttls', namespace=NS_TLS): self.DEBUG("TLS unsupported by remote server.", 'warn') - self.tls_start() + self.on_tls_failure("TLS unsupported by remote server.") return self.DEBUG("TLS supported by remote server. Requesting TLS start.", 'ok') self._owner.RegisterHandlerOnce('proceed', self.StartTLSHandler, xmlns=NS_TLS) self._owner.RegisterHandlerOnce('failure', self.StartTLSHandler, xmlns=NS_TLS) self._owner.send('' % NS_TLS) - self.tls_start() raise NodeProcessed def _dumpX509(self, cert, stream=sys.stderr): @@ -824,9 +825,10 @@ class NonBlockingTLS(PlugIn): try: self.starttls='in progress' tcpsock._sslObj.do_handshake() - # Errors are handeled in _do_receive function except: - pass + log.error('Error while TLS handshake: ', exc_info=True) + self.on_tls_failure('Error while TLS Handshake') + return tcpsock._sslObj.setblocking(False) log.debug("Synchronous handshake completed") #log.debug("Async handshake started...") @@ -865,63 +867,73 @@ class NonBlockingTLS(PlugIn): ''' Handle server reply if TLS is allowed to process. Behaves accordingly. Used internally.''' if starttls.getNamespace() <> NS_TLS: + self.on_tls_failure('Unknown namespace: %s' % starttls.getNamespace()) return self.starttls = starttls.getName() if self.starttls == 'failure': - self.DEBUG('Got starttls response: ' + self.starttls,'error') + self.on_tls_failure('TLS received: %s' % self.starttls) return self.DEBUG('Got starttls proceed response. Switching to TLS/SSL...','ok') try: self._startSSL() except Exception, e: log.error("StartTLSHandler:", exc_info=True) + self.on_tls_failure('in StartTLSHandler') #traceback.print_exc() - self._owner.socket.pollend() return self._owner.Dispatcher.PlugOut() - dispatcher_nb.Dispatcher().PlugIn(self._owner) + self.on_tls_success() + #dispatcher_nb.Dispatcher().PlugIn(self._owner) - -class NBHTTPPROXYsocket(NonBlockingTcp): - ''' This class can be used instead of transports.HTTPPROXYsocket - HTTP (CONNECT) proxy connection class. Uses TCPsocket as the base class - redefines only connect method. Allows to use HTTP proxies like squid with - (optionally) simple authentication (using login and password). - +class NBProxySocket(NonBlockingTcp): ''' - def __init__(self, on_connect =None, on_proxy_failure=None, on_connect_failure = None,proxy = None,server = None,use_srv=True): - ''' Caches proxy and target addresses. - 'proxy' argument is a dictionary with mandatory keys 'host' and 'port' (proxy address) - and optional keys 'user' and 'password' to use for authentication. - 'server' argument is a tuple of host and port - just like TCPsocket uses. ''' - self.on_connect_proxy = on_connect - self.on_proxy_failure = on_proxy_failure - self.on_connect_failure = on_connect_failure - NonBlockingTcp.__init__(self, self._on_tcp_connect, on_connect_failure, server, use_srv) - self.DBG_LINE=DBG_CONNECT_PROXY - self.server = server - self.proxy=proxy + Interface for proxy socket wrappers - when tunnneling XMPP over proxies, + some connecting process usually has to be done before opening stream. + ''' + def __init__(self, on_disconnect, xmpp_server, proxy_creds=(None,None)): + self.proxy_user, self.proxy_pass = proxy_creds + self.xmpp_server = xmpp_server + NonBlockingTcp.__init__(self, on_disconnect) + - def plugin(self, owner): - ''' Starts connection. Used interally. Returns non-empty string on success.''' - owner.debug_flags.append(DBG_CONNECT_PROXY) - NonBlockingTcp.plugin(self,owner) + def connect(self, conn_5tuple, on_connect, on_connect_failure): + ''' + connect method is extended by proxy credentials and xmpp server hostname + and port because those are needed for + The idea is to insert Proxy-specific mechanism after TCP connect and + before XMPP stream opening (which is done from client). + ''' - def connect(self,dupe=None): + self.after_proxy_connect = on_connect + + NonBlockingTcp.connect(self, + conn_5tuple=conn_5tuple, + on_connect =self._on_tcp_connect, + on_connect_failure =on_connect_failure) + + def _on_tcp_connect(self): + pass + + + +class NBHTTPProxySocket(NBProxySocket): + ''' This class can be used instead of NonBlockingTcp + HTTP (CONNECT) proxy connection class. Allows to use HTTP proxies like squid with + (optionally) simple authentication (using login and password). + ''' + + def _on_tcp_connect(self): ''' Starts connection. Connects to proxy, supplies login and password to it (if were specified while creating instance). Instructs proxy to make connection to the target server. Returns non-empty sting on success. ''' - NonBlockingTcp.connect(self, (self.proxy['host'], self.proxy['port'])) - - def _on_tcp_connect(self): - self.DEBUG('Proxy server contacted, performing authentification','start') - connector = ['CONNECT %s:%s HTTP/1.0'%self.server, + log.debug('Proxy server contacted, performing authentification') + connector = ['CONNECT %s:%s HTTP/1.0' % self.xmpp_server, 'Proxy-Connection: Keep-Alive', 'Pragma: no-cache', - 'Host: %s:%s'%self.server, + 'Host: %s:%s' % self.xmpp_server, 'User-Agent: HTTPPROXYsocket/v0.1'] - if self.proxy.has_key('user') and self.proxy.has_key('password'): - credentials = '%s:%s' % ( self.proxy['user'], self.proxy['password']) + if self.proxy_user and self.proxy_pass: + credentials = '%s:%s' % (self.proxy_user, self.proxy_pass) credentials = base64.encodestring(credentials).strip() connector.append('Proxy-Authorization: Basic '+credentials) connector.append('\r\n') @@ -937,12 +949,11 @@ class NBHTTPPROXYsocket(NonBlockingTcp): except: log.error("_on_headers_sent:", exc_info=True) #traceback.print_exc() - self.on_proxy_failure('Invalid proxy reply') + self._on_connect_failure('Invalid proxy reply') return if code <> '200': - self.DEBUG('Invalid proxy reply: %s %s %s' % (proto, code, desc),'error') - self._owner.disconnected() - self.on_proxy_failure('Invalid proxy reply') + log.error('Invalid proxy reply: %s %s %s' % (proto, code, desc)) + self._on_connect_failure('Invalid proxy reply') return if len(reply) != 2: pass @@ -951,55 +962,24 @@ class NBHTTPPROXYsocket(NonBlockingTcp): def _on_proxy_auth(self, reply): if self.reply.find('\n\n') == -1: if reply is None: - self.on_proxy_failure('Proxy authentification failed') + self._on_connect_failure('Proxy authentification failed') return if reply.find('\n\n') == -1: self.reply += reply.replace('\r', '') - self.on_proxy_failure('Proxy authentification failed') + self._on_connect_failure('Proxy authentification failed') return - self.DEBUG('Authentification successfull. Jabber server contacted.','ok') - if self.on_connect_proxy: - self.on_connect_proxy() + log.debug('Authentification successfull. Jabber server contacted.') + self._on_connect(self) - def DEBUG(self, text, severity): - ''' Overwrites DEBUG tag to allow debug output be presented as "CONNECTproxy".''' - return self._owner.DEBUG(DBG_CONNECT_PROXY, text, severity) -class NBSOCKS5PROXYsocket(NonBlockingTcp): +class NBSOCKS5ProxySocket(NBProxySocket): '''SOCKS5 proxy connection class. Uses TCPsocket as the base class redefines only connect method. Allows to use SOCKS5 proxies with (optionally) simple authentication (only USERNAME/PASSWORD auth). ''' - def __init__(self, on_connect = None, on_proxy_failure = None, - on_connect_failure = None, proxy = None, server = None, use_srv = True): - ''' Caches proxy and target addresses. - 'proxy' argument is a dictionary with mandatory keys 'host' and 'port' - (proxy address) and optional keys 'user' and 'password' to use for - authentication. 'server' argument is a tuple of host and port - - just like TCPsocket uses. ''' - self.on_connect_proxy = on_connect - self.on_proxy_failure = on_proxy_failure - self.on_connect_failure = on_connect_failure - NonBlockingTcp.__init__(self, self._on_tcp_connect, on_connect_failure, - server, use_srv) - self.DBG_LINE=DBG_CONNECT_PROXY - self.server = server - self.proxy = proxy - self.ipaddr = None + # TODO: replace DEBUG with ordinrar logging, replace on_proxy_failure() with + # _on_connect_failure, at the end call _on_connect() - def plugin(self, owner): - ''' Starts connection. Used interally. Returns non-empty string on - success.''' - owner.debug_flags.append(DBG_CONNECT_PROXY) - NonBlockingTcp.plugin(self, owner) - - def connect(self, dupe = None): - ''' Starts connection. Connects to proxy, supplies login and password to - it (if were specified while creating instance). Instructs proxy to make - connection to the target server. Returns non-empty sting on success. - ''' - NonBlockingTcp.connect(self, (self.proxy['host'], self.proxy['port'])) - def _on_tcp_connect(self): self.DEBUG('Proxy server contacted, performing authentification', 'start') if self.proxy.has_key('user') and self.proxy.has_key('password'): diff --git a/src/common/xmpp/transports_new.py b/src/common/xmpp/transports_new.py deleted file mode 100644 index 36992ba95..000000000 --- a/src/common/xmpp/transports_new.py +++ /dev/null @@ -1,270 +0,0 @@ -from idlequeue import IdleObject -from client import PlugIn -import threading, socket, errno - -import logging -log = logging.getLogger('gajim.c.x.transports_nb') -consoleloghandler = logging.StreamHandler() -consoleloghandler.setLevel(logging.DEBUG) -consoleloghandler.setFormatter( - logging.Formatter('%(levelname)s: %(message)s') -) -log.setLevel(logging.DEBUG) -log.addHandler(consoleloghandler) -log.propagate = False - -''' -this module will replace transports_nb.py -For now, it can be run from test/test_nonblockingtcp.py -* set credentials in the testing script -''' - - -class NBgetaddrinfo(threading.Thread): - ''' - Class for nonblocking call of getaddrinfo. Maybe unnecessary. - ''' - def __init__(self, server, on_success, on_failure, timeout_sec): - ''' - Call is started from constructor. It is not needed to hold reference on - created instance. - :param server: tuple (hostname, port) for DNS request - :param on_success: callback for successful DNS request - :param on_failure: called when DNS request couldn't be performed - :param timeout_sec: max seconds to wait for return from getaddrinfo. After - this time, on_failure is called with error message. - ''' - threading.Thread.__init__(self) - self.on_success = on_success - self.on_failure = on_failure - self.server = server - self.lock = threading.Lock() - self.already_called = False - self.timer = threading.Timer(timeout_sec, self.on_timeout) - self.timer.start() - self.start() - - def on_timeout(self): - ''' - Called by timer. Means that getaddrinfo takes too long and will be - interrupted. - ''' - self.do_call(False, 'NBgetaddrinfo timeout while looking up %s:%s' % self.server) - - def do_call(self, success, data): - ''' - Method called either on success and failure. In case of timeout it will be - called twice but only the first (failure) call will be performed. - :param success: True if getaddrinfo returned properly, False if there was an - error or on timeout. - :param data: error message if failure, list of address structures if success - ''' - log.debug('NBgetaddrinfo::do_call(): %s' % repr(data)) - self.timer.cancel() - self.lock.acquire() - if not self.already_called: - self.already_called = True - self.lock.release() - if success: - self.on_success(data) - else: - self.on_failure(data) - return - else: - self.lock.release() - return - - def run(self): - try: - ips = socket.getaddrinfo(self.server[0],self.server[1],socket.AF_UNSPEC, - socket.SOCK_STREAM) - except socket.gaierror, e: - self.do_call(False, 'Lookup failure for %s: %s %s' % - (repr(self.server), e[0], e[1])) - except Exception, e: - self.do_call(False, 'Exception while DNS lookup of %s: %s' % - (repr(e), repr(self.server))) - else: - self.do_call(True, ips) - - - -DISCONNECTED ='DISCONNECTED' -CONNECTING ='CONNECTING' -CONNECTED ='CONNECTED' -DISCONNECTING ='DISCONNECTING' - -CONNECT_TIMEOUT_SECONDS = 5 -'''timeout to connect to the server socket, it doesn't include auth''' - -DISCONNECT_TIMEOUT_SECONDS = 10 -'''how long to wait for a disconnect to complete''' - -class NonBlockingTcp(PlugIn, IdleObject): - def __init__(self, on_xmpp_connect=None, on_xmpp_failure=None): - ''' - Class constructor. All parameters can be reset in tcp_connect or xmpp_connect - calls. - - ''' - PlugIn.__init__(self) - IdleObject.__init__(self) - self.on_tcp_connect = None - self.on_tcp_failure = None - self.sock = None - self.idlequeue = None - self.DBG_LINE='socket' - self.state = DISCONNECTED - ''' - CONNECTING - after non-blocking socket.connect() until TCP connection is estabilished - CONNECTED - after TCP connection is estabilished - DISCONNECTING - - DISCONNECTED - ''' - self._exported_methods=[self.send, self.disconnect, self.onreceive, self.set_send_timeout, - self.start_disconnect, self.set_timeout, self.remove_timeout] - - - def connect(self, conn_5tuple, on_tcp_connect, on_tcp_failure, idlequeue): - ''' - Creates and connects socket to server and port defined in conn_5tupe which - should be list item returned from getaddrinfo. - :param conn_5tuple: 5-tuple returned from getaddrinfo - :param on_tcp_connect: callback called on successful tcp connection - :param on_tcp_failure: callback called on failure when estabilishing tcp - connection - :param idlequeue: idlequeue for socket - ''' - self.on_tcp_connect = on_tcp_connect - self.on_tcp_failure = on_tcp_failure - self.conn_5tuple = conn_5tuple - try: - self.sock = socket.socket(*conn_5tuple[:3]) - except socket.error, (errnum, errstr): - on_tcp_failure('NonBlockingTcp: Error while creating socket: %s %s' % (errnum, errstr)) - return - - self.idlequeue = idlequeue - self.fd = self.sock.fileno() - self.idlequeue.plug_idle(self, True, False) - - errnum = 0 - ''' variable for errno symbol that will be found from exception raised from connect() ''' - - # set timeout for TCP connecting - if nonblocking connect() fails, pollend - # is called. If if succeeds pollout is called. - self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT_SECONDS) - - try: - self.sock.setblocking(False) - self.sock.connect(conn_5tuple[4]) - except Exception, (errnum, errstr): - pass - - if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): - # connecting in progress - self.state = CONNECTING - log.debug('After nonblocking connect. "%s" raised => CONNECTING' % errstr) - # on_tcp_connect/failure will be called from self.pollin/self.pollout - return - elif errnum in (0, 10056, errno.EISCONN): - # already connected - this branch is very unlikely, nonblocking connect() will - # return EINPROGRESS exception in most cases. Anyway, we don't need timeout - # on connected descriptor - log.debug('After nonblocking connect. "%s" raised => CONNECTED' % errstr) - self._on_tcp_connect(self) - return - - # if there was some other error, call failure callback and unplug transport - # which will also remove read_timeouts for descriptor - self._on_tcp_failure('Exception while connecting to %s: %s - %s' % - (conn_5tuple[4], errnum, errstr)) - - def _on_tcp_connect(self, data): - ''' This method preceeds actual call of on_tcp_connect callback - ''' - self.state = CONNECTED - self.idlequeue.remove_timeout(self.fd) - self.on_tcp_connect(data) - - - def _on_tcp_failure(self,err_msg): - ''' This method preceeds actual call of on_tcp_failure callback - ''' - self.state = DISCONNECTED - self.idlequeue.unplug_idle(self.fd) - self.on_tcp_failure(err_msg) - - def pollin(self): - '''called when receive on plugged socket is possible ''' - log.debug('pollin called, state == %s' % self.state) - - def pollout(self): - '''called when send to plugged socket is possible''' - log.debug('pollout called, state == %s' % self.state) - - if self.state==CONNECTING: - self._on_tcp_connect(self) - return - - def pollend(self): - '''called when remote site closed connection''' - log.debug('pollend called, state == %s' % self.state) - if self.state==CONNECTING: - self._on_tcp_failure('Error during connect to %s:%s' % self.conn_5tuple[4]) - - def read_timeout(self): - ''' - Implemntation of IdleObject function called on timeouts from IdleQueue. - ''' - log.debug('read_timeout called, state == %s' % self.state) - if self.state==CONNECTING: - # if read_timeout is called during connecting, connect() didn't end yet - # thus we have to close the socket - try: - self.sock.close() - except socket.error, (errnum, errmsg): - log.error('Error while closing socket on connection timeout: %s %s' - % (errnum, errmsg)) - self._on_tcp_failure('Error during connect to %s:%s' % self.conn_5tuple[4]) - - - - def disconnect(self, on_disconnect=None): - if self.state == DISCONNECTED: - return - self.idlequeue.unplug_idle(self.fd) - try: - self.sock.shutdown(socket.SHUT_RDWR) - except socket.error, (errnum, errstr): - log.error('Error while disconnecting: %s %s' % (errnum,errstr)) - - try: - self.sock.close() - except socket.error, (errnum, errmsg): - log.error('Error closing socket: %s %s' % (errnum,errstr)) - if on_disconnect: - on_disconnect() - - - - - - - def send(self, data, now=False): - pass - - def onreceive(self): - pass - - def set_send_timeout(self): - pass - - def set_timeout(self): - pass - - def remove_timeout(self): - pass - - def start_disconnect(self): - pass diff --git a/src/config.py b/src/config.py index 4e4c49161..c678599e8 100644 --- a/src/config.py +++ b/src/config.py @@ -1199,7 +1199,7 @@ class ManageProxiesWindow: proxypass_entry.set_text(gajim.config.get_per('proxies', proxy, 'pass')) proxytype = gajim.config.get_per('proxies', proxy, 'type') - types = ['http', 'socks5'] + types = ['http', 'socks5', 'bosh'] self.proxytype_combobox.set_active(types.index(proxytype)) if gajim.config.get_per('proxies', proxy, 'user'): useauth_checkbutton.set_active(True) @@ -1227,7 +1227,7 @@ class ManageProxiesWindow: model.set_value(iter, 0, new_name) def on_proxytype_combobox_changed(self, widget): - types = ['http', 'socks5'] + types = ['http', 'socks5', 'bosh'] type_ = self.proxytype_combobox.get_active() proxy = self.proxyname_entry.get_text().decode('utf-8') gajim.config.set_per('proxies', proxy, 'type', types[type_]) diff --git a/test/test_client_nb.py b/test/test_client_nb.py index 3ddc75eef..b0aa88be8 100644 --- a/test/test_client_nb.py +++ b/test/test_client_nb.py @@ -22,7 +22,7 @@ xmpp_server_port = ('xmpp.example.org',5222) Script will connect to the machine. ''' -credentials = ['login', 'pass', 'testclient'] +credentials = ['loginn', 'passwo', 'testresour'] ''' [username, password, passphrase] Script will autheticate itself with this credentials on above mentioned server. @@ -41,19 +41,6 @@ class TestNonBlockingClient(unittest.TestCase): self.idlequeue_thread = IdleQueueThread() self.connection = MockConnectionClass() - self.client = client_nb.NonBlockingClient( - server=xmpp_server_port[0], - port=xmpp_server_port[1], - on_connect=lambda *args: self.connection.on_connect(True, *args), - on_connect_failure=lambda *args: self.connection.on_connect(False, *args), - caller=self.connection - ) - ''' - NonBlockingClient instance with parameters from global variables and with - callbacks from dummy connection. - ''' - - self.client.set_idlequeue(self.idlequeue_thread.iq) self.idlequeue_thread.start() def tearDown(self): @@ -70,17 +57,33 @@ class TestNonBlockingClient(unittest.TestCase): :param server_port: tuple of (hostname, port) for where the client should connect. + ''' - self.client.connect(server_port) + self.client = client_nb.NonBlockingClient( + hostname=server_port[0], + port=server_port[1], + caller=self.connection, + idlequeue=self.idlequeue_thread.iq, + ) + ''' + NonBlockingClient instance with parameters from global variables and with + callbacks from dummy connection. + ''' + + self.client.connect( + on_connect=lambda *args: self.connection.on_connect(True, *args), + on_connect_failure=lambda *args: self.connection.on_connect(False, *args), + secure=False + ) print 'waiting for callback from client constructor' self.connection.wait() # if on_connect was called, client has to be connected and vice versa if self.connection.connect_succeeded: - self.assert_(self.client.isConnected()) + self.assert_(self.client.get_connect_type()) else: - self.assert_(not self.client.isConnected()) + self.assert_(not self.client.get_connect_type()) def client_auth(self, username, password, resource, sasl): ''' @@ -100,7 +103,9 @@ class TestNonBlockingClient(unittest.TestCase): ''' Does disconnecting of connected client. Returns when TCP connection is closed. ''' - self.client.start_disconnect(None, on_disconnect=self.connection.set_event) + #self.client.start_disconnect(None, on_disconnect=self.connection.set_event) + self.client.RegisterDisconnectHandler(self.connection.set_event) + self.client.disconnect() print 'waiting for disconnecting...' self.connection.wait() @@ -113,7 +118,7 @@ class TestNonBlockingClient(unittest.TestCase): self.open_stream(xmpp_server_port) # if client is not connected, lets raise the AssertionError - self.assert_(self.client.isConnected()) + self.assert_(self.client.get_connect_type()) # (client.disconnect() is already called from NBClient._on_connected_failure # so there's need to call it in this case @@ -130,7 +135,7 @@ class TestNonBlockingClient(unittest.TestCase): then disconnected. ''' self.open_stream(xmpp_server_port) - self.assert_(self.client.isConnected()) + self.assert_(self.client.get_connect_type()) self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0) self.assert_(self.connection.con) self.assert_(self.connection.auth=='old_auth') @@ -141,7 +146,8 @@ class TestNonBlockingClient(unittest.TestCase): Connect to nonexisting host. DNS request for A records should return nothing. ''' self.open_stream(('fdsfsdf.fdsf.fss', 5222)) - self.assert_(not self.client.isConnected()) + print 'nonexthost: %s' % self.client.get_connect_type() + self.assert_(not self.client.get_connect_type()) def test_connect_to_wrong_port(self): ''' @@ -149,14 +155,14 @@ class TestNonBlockingClient(unittest.TestCase): but there shouldn't be XMPP server running on specified port. ''' self.open_stream((xmpp_server_port[0], 31337)) - self.assert_(not self.client.isConnected()) + self.assert_(not self.client.get_connect_type()) def test_connect_with_wrong_creds(self): ''' Connecting with invalid password. ''' self.open_stream(xmpp_server_port) - self.assert_(self.client.isConnected()) + self.assert_(self.client.get_connect_type()) self.client_auth(credentials[0], "wrong pass", credentials[2], sasl=1) self.assert_(self.connection.auth is None) self.do_disconnect() @@ -168,9 +174,10 @@ class TestNonBlockingClient(unittest.TestCase): if __name__ == '__main__': - #suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingClient) - suite = unittest.TestSuite() - suite.addTest(TestNonBlockingClient('test_proper_connect_sasl')) + suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingClient) + #suite = unittest.TestSuite() + #suite.addTest(TestNonBlockingClient('test_proper_connect_oldauth')) + #suite.addTest(TestNonBlockingClient('test_connect_to_nonexisting_host')) unittest.TextTestRunner(verbosity=2).run(suite) diff --git a/test/test_nonblockingtcp.py b/test/test_nonblockingtcp.py index cf6d31a2d..7987d3278 100644 --- a/test/test_nonblockingtcp.py +++ b/test/test_nonblockingtcp.py @@ -12,7 +12,7 @@ gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..') sys.path.append(gajim_root + '/src/common/xmpp') sys.path.append(gajim_root + '/src/common') -import transports_new, debug +import transports_nb from client import * xmpp_server = ('xmpp.example.org',5222) @@ -21,60 +21,48 @@ xmpp_server = ('xmpp.example.org',5222) Script will connect to the machine. ''' -dns_timeout = 10 -''' -timeout for DNS A-request (for getaddrinfo() call) -''' + +import socket +ips = socket.getaddrinfo(xmpp_server[0], xmpp_server[1], socket.AF_UNSPEC,socket.SOCK_STREAM) + +# change xmpp_server on real values +ip = ips[0] + class MockClient(IdleMock): - def __init__(self, server, port): + def __init__(self, idlequeue): + self.idlequeue=idlequeue self.debug_flags=['all', 'nodebuilder'] self._DEBUG = debug.Debug(['socket']) self.DEBUG = self._DEBUG.Show - self.server = server - self.port = port IdleMock.__init__(self) - self.tcp_connected = False - self.ip_addresses = [] - self.socket = None - def do_dns_request(self): - transports_new.NBgetaddrinfo( - server=(self.server, self.port), - on_success=lambda *args:self.on_success('DNSrequest', *args), - on_failure=self.on_failure, - timeout_sec=dns_timeout + def do_connect(self): + self.socket=transports_nb.NonBlockingTcp( + on_disconnect=lambda: self.on_success(mode='SocketDisconnect') + ) + + self.socket.PlugIn(self) + + self.socket.connect( + conn_5tuple=ip, + on_connect=lambda: self.on_success(mode='TCPconnect'), + on_connect_failure=self.on_failure ) self.wait() - - def try_next_ip(self, err_message=None): - if err_message: - print err_message - if self.ip_addresses == []: - self.on_failure('Run out of hosts') - return - current_ip = self.ip_addresses.pop(0) - self.NonBlockingTcp.connect( - conn_5tuple=current_ip, - on_tcp_connect=lambda *args: self.on_success('TCPconnect',*args), - on_tcp_failure=self.try_next_ip, - idlequeue=self.idlequeue - ) + def do_disconnect(self): + self.socket.disconnect() self.wait() - - def set_idlequeue(self, idlequeue): - self.idlequeue=idlequeue - def on_failure(self, data): print 'Error: %s' % data self.set_event() - def on_success(self, mode, data): - if mode == "DNSrequest": - self.ip_addresses = data - elif mode == "TCPconnect": + def on_success(self, mode, data=None): + if mode == "TCPconnect": + pass + if mode == "SocketDisconnect": pass self.set_event() @@ -87,12 +75,10 @@ class MockClient(IdleMock): class TestNonBlockingTcp(unittest.TestCase): def setUp(self): - self.nbtcp = transports_new.NonBlockingTcp() - self.client = MockClient(*xmpp_server) self.idlequeue_thread = IdleQueueThread() self.idlequeue_thread.start() - self.client.set_idlequeue(self.idlequeue_thread.iq) - self.nbtcp.PlugIn(self.client) + self.client = MockClient( + idlequeue=self.idlequeue_thread.iq) def tearDown(self): self.idlequeue_thread.stop_thread() @@ -100,12 +86,12 @@ class TestNonBlockingTcp(unittest.TestCase): def testSth(self): - self.client.do_dns_request() - if self.client.ip_addresses == []: - print 'No IP found for given hostname: %s' % self.client.server - return - else: - self.client.try_next_ip() + + self.client.do_connect() + self.assert_(self.client.socket.state == 'CONNECTED') + self.client.do_disconnect() + self.assert_(self.client.socket.state == 'DISCONNECTED') +