From acdf4ff1434c6f9f7b9f477d9d19887918b816e2 Mon Sep 17 00:00:00 2001 From: tomk Date: Sun, 17 Aug 2008 22:57:48 +0000 Subject: [PATCH] improved disconnect handling, added comments, fixed minor bugs --- src/common/connection.py | 17 +- src/common/xmpp/bosh.py | 58 +++- src/common/xmpp/client_nb.py | 395 +++++++++++++++------------- src/common/xmpp/proxy_connectors.py | 1 + src/common/xmpp/tls_nb.py | 11 +- src/common/xmpp/transports_nb.py | 156 ++++++----- 6 files changed, 369 insertions(+), 269 deletions(-) diff --git a/src/common/connection.py b/src/common/connection.py index d191c8237..cb09ee795 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -581,8 +581,8 @@ class Connection(ConnectionHandlers): if self.on_connect_success == self._on_new_account: con.RegisterDisconnectHandler(self._on_new_account) - log.info('Connecting to %s: [%s:%d]', self.name, - self._current_host['host'], port) + self.log_hosttype_info(port) + con.connect( hostname=self._current_host['host'], port=port, @@ -594,6 +594,19 @@ class Connection(ConnectionHandlers): else: self.connect_to_next_host(retry) + def log_hosttype_info(self, port): + msg = '>>>>>> Connecting to %s [%s:%d], type = %s' % (self.name, + self._current_host['host'], port, self._current_type) + log.info(msg) + if self._proxy: + msg = '>>>>>> ' + if self._proxy['type']=='bosh': + msg = '%s over BOSH %s:%s' % (msg, self._proxy['bosh_uri'], self._proxy['bosh_port']) + if self._proxy['type'] in ['http','socks5'] or self._proxy['bosh_useproxy']: + msg = '%s over proxy %s:%s' % (msg, self._proxy['host'], self._proxy['port']) + log.info(msg) + + def _connect_failure(self, con_type = None): if not con_type: diff --git a/src/common/xmpp/bosh.py b/src/common/xmpp/bosh.py index 5d1796a99..c6ae1beba 100644 --- a/src/common/xmpp/bosh.py +++ b/src/common/xmpp/bosh.py @@ -30,10 +30,9 @@ log = logging.getLogger('gajim.c.x.bosh') KEY_COUNT = 10 +# Fake file descriptor - it's used for setting read_timeout in idlequeue for +# BOSH Transport. In TCP-derived transports this is file descriptor of socket. FAKE_DESCRIPTOR = -1337 -'''Fake file descriptor - it's used for setting read_timeout in idlequeue for -BOSH Transport. -In TCP-derived transports it is file descriptor of socket''' class NonBlockingBOSH(NonBlockingTransport): @@ -126,10 +125,11 @@ class NonBlockingBOSH(NonBlockingTransport): def on_http_request_possible(self): ''' - Called after HTTP response is received - when another request is possible. + Called when HTTP request it's possible to send a HTTP request. It can be when + socket is connected or when HTTP response arrived. There should be always one pending request to BOSH CM. ''' - log.info('on_http_req possible, state:\n%s' % self.get_current_state()) + log.debug('on_http_req possible, state:\n%s' % self.get_current_state()) if self.get_state()==DISCONNECTED: return #Hack for making the non-secure warning dialog work @@ -137,6 +137,10 @@ class NonBlockingBOSH(NonBlockingTransport): if (hasattr(self._owner, 'NonBlockingNonSASL') or hasattr(self._owner, 'SASL')): self.send_BOSH(None) else: + # If we already got features and no auth module was plugged yet, we are + # probably waiting for confirmation of the "not-secure-connection" dialog. + # We don't send HTTP request in that case. + # see http://lists.jabber.ru/pipermail/ejabberd/2008-August/004027.html return else: self.send_BOSH(None) @@ -144,18 +148,20 @@ class NonBlockingBOSH(NonBlockingTransport): def get_socket_in(self, state): + ''' gets sockets in desired state ''' for s in self.http_socks: if s.get_state()==state: return s return None def get_free_socket(self): + ''' Selects and returns socket eligible for sending a data to.''' if self.http_pipelining: return self.get_socket_in(CONNECTED) else: last_recv_time, tmpsock = 0, None for s in self.http_socks: - # we're interested only into CONNECTED socket with no req pending + # we're interested only in CONNECTED socket with no requests pending if s.get_state()==CONNECTED and s.pending_requests==0: # if there's more of them, we want the one with the least recent data receive # (lowest last_recv_time) @@ -169,6 +175,10 @@ class NonBlockingBOSH(NonBlockingTransport): def send_BOSH(self, payload): + ''' + Tries to send a stanza in payload by appeding it to a buffer and plugging a + free socket for writing. + ''' total_pending_reqs = sum([s.pending_requests for s in self.http_socks]) # when called after HTTP response (Payload=None) and when there are already @@ -192,7 +202,8 @@ class NonBlockingBOSH(NonBlockingTransport): self.get_current_state()) return - # when there's free CONNECTED socket, we flush the data + # when there's free CONNECTED socket, we plug it for write and the data will + # be sent when write is possible if self.get_free_socket(): self.plug_socket() return @@ -209,8 +220,7 @@ class NonBlockingBOSH(NonBlockingTransport): if s: self.connect_and_flush(s) else: - #if len(self.http_socks) > 1: return - print 'connecting sock' + # otherwise create and connect a new one ss = self.get_new_http_socket() self.http_socks.append(ss) self.connect_and_flush(ss) @@ -225,6 +235,15 @@ class NonBlockingBOSH(NonBlockingTransport): log.error('=====!!!!!!!!====> Couldn\'t get free socket in plug_socket())') def build_stanza(self, socket): + ''' + Builds a BOSH body tag from data in buffers and adds key, rid and ack + attributes to it. + This method is called from _do_send() of underlying transport. This is to + ensure rid and keys will be processed in correct order. If I generate them + before plugging a socket for write (and did it for two sockets/HTTP + connections) in parallel, they might be sent in wrong order, which results + in violating the BOSH session and server-side disconnect. + ''' if self.prio_bosh_stanzas: stanza, add_payload = self.prio_bosh_stanzas.pop(0) if add_payload: @@ -244,7 +263,6 @@ class NonBlockingBOSH(NonBlockingTransport): log.info('sending msg with rid=%s to sock %s' % (stanza.getAttr('rid'), id(socket))) - #socket.send(stanza) self.renew_bosh_wait_timeout(self.bosh_wait + 3) return stanza @@ -266,8 +284,12 @@ class NonBlockingBOSH(NonBlockingTransport): self.wait_cb_time) def on_persistent_fallback(self, socket): - log.warn('Fallback to nonpersistent HTTP (no pipelining as well)') + ''' + Called from underlying transport when server closes TCP connection. + :param socket: disconnected transport object + ''' if socket.http_persistent: + log.warn('Fallback to nonpersistent HTTP (no pipelining as well)') socket.http_persistent = False self.http_persistent = False self.http_pipelining = False @@ -279,6 +301,9 @@ class NonBlockingBOSH(NonBlockingTransport): def handle_body_attrs(self, stanza_attrs): + ''' + Called for each incoming body stanza from dispatcher. Checks body attributes. + ''' self.remove_bosh_wait_timeout() if self.after_init: @@ -315,11 +340,13 @@ class NonBlockingBOSH(NonBlockingTransport): def append_stanza(self, stanza): + ''' appends stanza to a buffer to send ''' if stanza: if isinstance(stanza, tuple): # stanza is tuple of BOSH stanza and bool value for whether to add payload self.prio_bosh_stanzas.append(stanza) else: + # stanza is XMPP stanza. Will be boshified before send. self.stanza_buffer.append(stanza) @@ -391,7 +418,6 @@ class NonBlockingBOSH(NonBlockingTransport): if self.use_proxy_auth: http_dict['proxy_user'], http_dict['proxy_pass'] = self.proxy_creds - s = NonBlockingHTTPBOSH( raise_event=self.raise_event, on_disconnect=self.disconnect, @@ -402,6 +428,7 @@ class NonBlockingBOSH(NonBlockingTransport): http_dict = http_dict, proxy_dict = self.proxy_dict, on_persistent_fallback = self.on_persistent_fallback) + s.onreceive(self.on_received_http) s.set_stanza_build_cb(self.build_stanza) return s @@ -439,6 +466,10 @@ def get_rand_number(): class AckChecker(): + ''' + Class for generating rids and generating and checking acknowledgements in + BOSH messages. + ''' def __init__(self): self.rid = get_rand_number() self.ack = 1 @@ -481,6 +512,9 @@ class AckChecker(): class KeyStack(): + ''' + Class implementing key sequences for BOSH messages + ''' def __init__(self, count): self.count = count self.keys = [] diff --git a/src/common/xmpp/client_nb.py b/src/common/xmpp/client_nb.py index 35d5079f4..b7603cc08 100644 --- a/src/common/xmpp/client_nb.py +++ b/src/common/xmpp/client_nb.py @@ -16,11 +16,6 @@ # $Id: client.py,v 1.52 2006/01/02 19:40:55 normanr Exp $ -''' -Provides Client classes implementations as examples of xmpppy structures usage. -These classes can be used for simple applications "AS IS" though. -''' - import socket import transports_nb, dispatcher_nb, auth_nb, roster_nb, protocol, bosh @@ -32,16 +27,19 @@ import logging log = logging.getLogger('gajim.c.x.client_nb') -class NBCommonClient: - ''' Base for Client and Component classes.''' +class NonBlockingClient: + ''' + Client class is XMPP connection mountpoint. Objects for authentication, + network communication, roster, xml parsing ... are plugged to client object. + Client implements the abstract behavior - mostly negotioation and callbacks + handling, whereas underlying modules take care of feature-specific logic. + ''' def __init__(self, domain, idlequeue, caller=None): - - ''' Caches connection data: + ''' + Caches connection data: :param domain: domain - for to: attribute (from account info) :param idlequeue: processing idlequeue - :param port: port of listening XMPP server - :param caller: calling object - it has to implement certain methods (necessary?) - + :param caller: calling object - it has to implement method _event_dispatcher ''' self.Namespace = protocol.NS_CLIENT self.defaultNamespace = self.Namespace @@ -62,19 +60,22 @@ class NBCommonClient: self.on_connect_failure = None self.proxy = None self.got_features = False + self.stream_started = False + self.disconnecting = False + self.protocol_type = 'XMPP' - def on_disconnect(self): + def disconnect(self, message=''): ''' - Called on disconnection - when connect failure occurs on running connection - (after stream is successfully opened). - Calls disconnect handlers and cleans things up. + Called on disconnection - disconnect callback is picked based on state of the + client. ''' - - self.connected='' - for i in reversed(self.disconnect_handlers): - log.debug('Calling disconnect handler %s' % i) - i() + + # to avoid recursive calls + if self.disconnecting: return + + log.warn('Disconnecting NBClient: %s' % message) + if self.__dict__.has_key('NonBlockingRoster'): self.NonBlockingRoster.PlugOut() if self.__dict__.has_key('NonBlockingBind'): @@ -89,7 +90,41 @@ class NBCommonClient: self.NonBlockingHTTP.PlugOut() if self.__dict__.has_key('NonBlockingBOSH'): self.NonBlockingBOSH.PlugOut() + + connected = self.connected + stream_started = self.stream_started + + self.connected = '' + self.stream_started = False + + self.disconnecting = True + log.debug('Client disconnected..') + if connected == '': + # if we're disconnecting before connection to XMPP sever is opened, we don't + # call disconnect handlers but on_connect_failure callback + if self.proxy: + # with proxy, we have different failure callback + log.debug('calling on_proxy_failure cb') + self.on_proxy_failure(reason=message) + else: + log.debug('ccalling on_connect_failure cb') + self.on_connect_failure() + else: + # we are connected to XMPP server + if not stream_started: + # if error occur before XML stream was opened, e.g. no response on init + # request, we call the on_connect_failure callback because proper + # connection is not estabilished yet and it's not a proxy issue + log.debug('calling on_connect_failure cb') + self.on_connect_failure() + else: + # with open connection, we are calling the disconnect handlers + for i in reversed(self.disconnect_handlers): + log.debug('Calling disconnect handler %s' % i) + i() + self.disconnecting = False + def connect(self, on_connect, on_connect_failure, hostname=None, port=5222, @@ -101,11 +136,14 @@ class NBCommonClient: :param on_connect: called after stream is successfully opened :param on_connect_failure: called when error occures during connection :param on_proxy_failure: called if error occurres during TCP connection to - proxy server or during connection to the proxy + proxy server or during proxy connecting process :param proxy: dictionary with proxy data. It should contain at least values for keys 'host' and 'port' - connection details for proxy server and optionally keys 'user' and 'pass' as proxy credentials - :param secure_tuple: + :param secure_tuple: tuple of (desired connection type, cacerts and mycerts) + connection type can be 'ssl' - TLS estabilished after TCP connection, + 'tls' - TLS estabilished after negotiation with starttls, or 'plain'. + cacerts, mycerts - see tls_nb.NonBlockingTLS constructor for more details ''' self.on_connect = on_connect self.on_connect_failure=on_connect_failure @@ -113,16 +151,72 @@ class NBCommonClient: self.secure, self.cacerts, self.mycerts = secure_tuple self.Connection = None self.Port = port + self.proxy = proxy + if hostname: + xmpp_hostname = hostname + else: + xmpp_hostname = self.Server + + estabilish_tls = self.secure == 'ssl' + certs = (self.cacerts, self.mycerts) + + proxy_dict = {} + tcp_host=xmpp_hostname + tcp_port=self.Port + + if proxy: + # with proxies, client connects to proxy instead of directly to + # XMPP server ((hostname, port)) + # tcp_host is hostname of machine used for socket connection + # (DNS request will be done for proxy or BOSH CM hostname) + tcp_host, tcp_port, proxy_user, proxy_pass = \ + transports_nb.get_proxy_data_from_dict(proxy) + + + if proxy['type'] == 'bosh': + self.socket = bosh.NonBlockingBOSH( + on_disconnect = self.disconnect, + raise_event = self.raise_event, + idlequeue = self.idlequeue, + estabilish_tls = estabilish_tls, + certs = certs, + proxy_creds = (proxy_user, proxy_pass), + xmpp_server = (xmpp_hostname, self.Port), + domain = self.Server, + bosh_dict = proxy) + self.protocol_type = 'BOSH' + self.wait_for_restart_response = proxy['bosh_wait_for_restart_response'] + + else: + proxy_dict['type'] = proxy['type'] + proxy_dict['xmpp_server'] = (xmpp_hostname, self.Port) + proxy_dict['credentials'] = (proxy_user, proxy_pass) + + if not proxy or proxy['type'] != 'bosh': + self.socket = transports_nb.NonBlockingTCP( + on_disconnect = self.disconnect, + raise_event = self.raise_event, + idlequeue = self.idlequeue, + estabilish_tls = estabilish_tls, + certs = certs, + proxy_dict = proxy_dict) + + self.socket.PlugIn(self) + + self._resolve_hostname( + hostname=tcp_host, + port=tcp_port, + on_success=self._try_next_ip) - def _resolve_hostname(self, hostname, port, on_success, on_failure): - ''' wrapper of getaddinfo call. FIXME: getaddinfo blocks''' + def _resolve_hostname(self, hostname, port, on_success): + ''' wrapper for getaddinfo call. FIXME: getaddinfo blocks''' try: self.ip_addresses = socket.getaddrinfo(hostname,port, socket.AF_UNSPEC,socket.SOCK_STREAM) except socket.gaierror, (errnum, errstr): - on_failure('Lookup failure for %s:%s, hostname: %s - %s' % + self.disconnect(message= 'Lookup failure for %s:%s, hostname: %s - %s' % (self.Server, self.Port, hostname, errstr)) else: on_success() @@ -130,12 +224,13 @@ class NBCommonClient: def _try_next_ip(self, err_message=None): - '''iterates over IP addresses from getaddinfo''' + '''iterates over IP addresses from getaddrinfo''' if err_message: log.debug('While looping over DNS A records: %s' % err_message) if self.ip_addresses == []: - self._on_tcp_failure('Run out of hosts for name %s:%s' % - (self.Server, self.Port)) + msg = 'Run out of hosts for name %s:%s.' % (self.Server, self.Port) + msg = msg + ' Error for last IP: %s' % err_message + self.disconnect(msg) else: self.current_ip = self.ip_addresses.pop(0) self.socket.connect( @@ -152,19 +247,23 @@ class NBCommonClient: return None def _xmpp_connect(self, socket_type): - if socket_type == 'plain' and self.Connection.ssl_lib: - socket_type = 'ssl' + ''' + Starts XMPP connecting process - opens the XML stream. Is called after TCP + connection is estabilished or after switch to TLS when successfully + negotiated with . + ''' + if socket_type == 'plain' and self.Connection.ssl_lib: socket_type = 'ssl' self.connected = socket_type self._xmpp_connect_machine() def _xmpp_connect_machine(self, mode=None, data=None): ''' - Finite automaton called after TCP connecting. Takes care of stream opening - and features tag handling. Calls _on_stream_start when stream is - started, and _on_connect_failure on failure. + Finite automaton taking care of stream opening and features tag + handling. Calls _on_stream_start when stream is started, and disconnect() + on failure. ''' - log.info('-------------xmpp_connect_machine() >> mode: %s, data: %s' % (mode,str(data)[:20] )) + log.info('-------------xmpp_connect_machine() >> mode: %s, data: %s...' % (mode,str(data)[:20] )) def on_next_receive(mode): log.info('setting %s on next receive' % mode) @@ -182,7 +281,7 @@ class NBCommonClient: on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES') elif mode == 'FAILURE': - self._on_connect_failure(err_message='During XMPP connect: %s' % data) + self.disconnect('During XMPP connect: %s' % data) elif mode == 'RECEIVE_DOCUMENT_ATTRIBUTES': if data: @@ -219,8 +318,38 @@ class NBCommonClient: elif mode == 'STREAM_STARTED': self._on_stream_start() + def _on_stream_start(self): + ''' + Called after XMPP stream is opened. + TLS negotiation may follow after esabilishing a stream. + ''' + self.stream_started = True + self.onreceive(None) + if self.connected == 'plain': + if self.secure == 'plain': + # if we want plain connection, we're done now + self._on_connect() + return + if not self.Dispatcher.Stream.features.getTag('starttls'): + # if server doesn't advertise TLS in init response, we can't do more + log.warn('While connecting with type = "tls": TLS unsupported by remote server') + self._on_connect() + return + if self.incoming_stream_version() != '1.0': + # if stream version is less than 1.0, we can't do more + log.warn('While connecting with type = "tls": stream version is less than 1.0') + self._on_connect() + return + # otherwise start TLS negotioation + self.stream_started = False + log.info("TLS supported by remote server. Requesting TLS start.") + self._tls_negotiation_handler() + elif self.connected in ['ssl', 'tls']: + self._on_connect() + def _tls_negotiation_handler(self, con=None, tag=None): + ''' takes care of TLS negotioation with ''' log.info('-------------tls_negotiaton_handler() >> tag: %s' % tag) if not con and not tag: # starting state when we send the @@ -232,72 +361,44 @@ class NBCommonClient: else: # we got or if tag.getNamespace() <> NS_TLS: - self._on_connect_failure('Unknown namespace: %s' % tag.getNamespace()) + self.disconnect('Unknown namespace: %s' % tag.getNamespace()) return tagname = tag.getName() if tagname == 'failure': - self._on_connect_failure('TLS received: %s' % tag) + self.disconnect('TLS received: %s' % tag) return log.info('Got starttls proceed response. Switching to TLS/SSL...') # following call wouldn't work for BOSH transport but it doesn't matter - # because TLS negotiation with BOSH is forbidden + # because negotiation with BOSH is forbidden self.Connection.tls_init( on_succ = lambda: self._xmpp_connect(socket_type='tls'), - on_fail = lambda: self._on_connect_failure('error while etabilishing TLS')) + on_fail = lambda: self.disconnect('error while etabilishing TLS')) - def _on_stream_start(self): - '''Called when stream is opened. To be overriden in derived classes.''' - - def _on_connect_failure(self, retry=None, err_message=None): - self.connected = '' - if err_message: - log.debug('While connecting: %s' % err_message) - if self.socket: - self.socket.disconnect() - self.on_connect_failure(retry) - def _on_connect(self): + ''' preceeds call of on_connect callback ''' self.onreceive(None) self.on_connect(self, self.connected) def raise_event(self, event_type, data): + ''' + raises event to connection instance - DATA_SENT and DATA_RECIVED events are + used in XML console to show XMPP traffic + ''' log.info('raising event from transport: >>>>>%s<<<<<\n_____________\n%s\n_____________\n' % (event_type,data)) if hasattr(self, 'Dispatcher'): self.Dispatcher.Event('', event_type, data) - # moved from client.CommonClient (blocking client from xmpppy): - def RegisterDisconnectHandler(self,handler): - """ Register handler that will be called on disconnect.""" - self.disconnect_handlers.append(handler) - - def UnregisterDisconnectHandler(self,handler): - """ Unregister handler that is called on disconnect.""" - self.disconnect_handlers.remove(handler) - - def DisconnectHandler(self): - """ Default disconnect handler. Just raises an IOError. - If you choosed to use this class in your production client, - override this method or at least unregister it. """ - raise IOError('Disconnected from server.') - - def get_connect_type(self): - """ Returns connection state. F.e.: None / 'tls' / 'plain+non_sasl' . """ - return self.connected - - def get_peerhost(self): - ''' get the ip address of the account, from which is made connection - to the server , (e.g. me). - We will create listening socket on the same ip ''' - # FIXME: tuple (ip, port) is expected (and checked for) but port num is useless - return self.socket.peerhost - + # follows code for authentication, resource bind, session and roster download + # def auth(self, user, password, resource = '', sasl = 1, on_auth = None): - ''' Authenticate connnection and bind resource. If resource is not provided - random one or library name used. ''' + ''' + Authenticate connnection and bind resource. If resource is not provided + random one or library name used. + ''' self._User, self._Password, self._Resource, self._sasl = user, password, resource, sasl self.on_auth = on_auth self._on_doc_attrs() @@ -318,7 +419,6 @@ class NBCommonClient: self._Resource = 'xmpppy' auth_nb.NonBlockingNonSASL(self._User, self._Password, self._Resource, self._on_old_auth).PlugIn(self) return - #self.onreceive(self._on_start_sasl) self.SASL.auth() return True @@ -349,7 +449,8 @@ class NBCommonClient: self.onreceive(self._on_auth_bind) return return - + + def _on_auth_bind(self, data): if data: self.Dispatcher.ProcessNonBlocking(data) @@ -386,107 +487,35 @@ class NBCommonClient: self.send(dispatcher_nb.Presence(to=jid, typ=typ)) - -class NonBlockingClient(NBCommonClient): - ''' Example client class, based on CommonClient. ''' - - def __init__(self, domain, idlequeue, caller=None): - NBCommonClient.__init__(self, domain, idlequeue, caller) - self.protocol_type = 'XMPP' - - def connect(self, on_connect, on_connect_failure, hostname=None, port=5222, - on_proxy_failure=None, proxy=None, secure_tuple=None): - - NBCommonClient.connect(self, on_connect, on_connect_failure, hostname, port, - on_proxy_failure, proxy, secure_tuple) - - if hostname: - xmpp_hostname = hostname - else: - xmpp_hostname = self.Server - - estabilish_tls = self.secure == 'ssl' - certs = (self.cacerts, self.mycerts) - - self._on_tcp_failure = self._on_connect_failure - proxy_dict = {} - tcp_host=xmpp_hostname - tcp_port=self.Port - - if proxy: - # with proxies, client connects to proxy instead of directly to - # XMPP server ((hostname, port)) - # tcp_host is hostname of machine used for socket connection - # (DNS request will be done for proxy or BOSH CM hostname) - tcp_host, tcp_port, proxy_user, proxy_pass = \ - transports_nb.get_proxy_data_from_dict(proxy) - - - if proxy['type'] == 'bosh': - self.socket = bosh.NonBlockingBOSH( - on_disconnect = self.on_disconnect, - raise_event = self.raise_event, - idlequeue = self.idlequeue, - estabilish_tls = estabilish_tls, - certs = certs, - proxy_creds = (proxy_user, proxy_pass), - xmpp_server = (xmpp_hostname, self.Port), - domain = self.Server, - bosh_dict = proxy) - self.protocol_type = 'BOSH' - self.wait_for_restart_response = proxy['bosh_wait_for_restart_response'] - - else: - self._on_tcp_failure = self.on_proxy_failure - proxy_dict['type'] = proxy['type'] - proxy_dict['xmpp_server'] = (xmpp_hostname, self.Port) - proxy_dict['credentials'] = (proxy_user, proxy_pass) - - if not proxy or proxy['type'] != 'bosh': - self.socket = transports_nb.NonBlockingTCP( - on_disconnect = self.on_disconnect, - raise_event = self.raise_event, - idlequeue = self.idlequeue, - estabilish_tls = estabilish_tls, - certs = certs, - proxy_dict = proxy_dict) - - self.socket.PlugIn(self) - - self._resolve_hostname( - hostname=tcp_host, - port=tcp_port, - on_success=self._try_next_ip, - on_failure=self._on_tcp_failure) - - - - def _on_stream_start(self): - ''' - Called after XMPP stream is opened. - In pure XMPP client, TLS negotiation may follow after esabilishing a stream. - ''' - self.onreceive(None) - if self.connected == 'plain': - if self.secure == 'plain': - # if we want plain connection, we're done now - self._on_connect() - return - if not self.Dispatcher.Stream.features.getTag('starttls'): - # if server doesn't advertise TLS in init response, we can't do more - log.warn('While connecting with type = "tls": TLS unsupported by remote server') - self._on_connect() - return - if self.incoming_stream_version() != '1.0': - # if stream version is less than 1.0, we can't do more - log.warn('While connecting with type = "tls": stream version is less than 1.0') - self._on_connect() - return - # otherwise start TLS - log.info("TLS supported by remote server. Requesting TLS start.") - self._tls_negotiation_handler() - elif self.connected in ['ssl', 'tls']: - self._on_connect() - + # following methods are moved from blocking client class from xmpppy: + def RegisterDisconnectHandler(self,handler): + ''' Register handler that will be called on disconnect.''' + self.disconnect_handlers.append(handler) + + def UnregisterDisconnectHandler(self,handler): + ''' Unregister handler that is called on disconnect.''' + self.disconnect_handlers.remove(handler) + + def DisconnectHandler(self): + ''' + Default disconnect handler. Just raises an IOError. If you choosed to use + this class in your production client, override this method or at least + unregister it. + ''' + raise IOError('Disconnected from server.') + + def get_connect_type(self): + ''' Returns connection state. F.e.: None / 'tls' / 'plain+non_sasl'. ''' + return self.connected + + def get_peerhost(self): + ''' + Gets the ip address of the account, from which is made connection to the + server , (e.g. IP and port of gajim's socket. We will create listening socket + on the same ip + ''' + # FIXME: tuple (ip, port) is expected (and checked for) but port num is + # useless + return self.socket.peerhost diff --git a/src/common/xmpp/proxy_connectors.py b/src/common/xmpp/proxy_connectors.py index 1b769288a..1cb30d3b1 100644 --- a/src/common/xmpp/proxy_connectors.py +++ b/src/common/xmpp/proxy_connectors.py @@ -19,6 +19,7 @@ import struct, socket, base64 ''' Module containing classes for proxy connecting. So far its HTTP CONNECT and SOCKS5 proxy. +Authentication to NTLM (Microsoft implementation) proxies can be next. ''' import logging diff --git a/src/common/xmpp/tls_nb.py b/src/common/xmpp/tls_nb.py index 72faab425..08f00a7f9 100644 --- a/src/common/xmpp/tls_nb.py +++ b/src/common/xmpp/tls_nb.py @@ -3,6 +3,7 @@ ## ## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov ## modified by Dimitur Kirov +## modified by Tomas Karasek ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by @@ -190,7 +191,7 @@ class PyOpenSSLWrapper(SSLWrapper): return 0 class StdlibSSLWrapper(SSLWrapper): - '''Wrapper class for Python's socket.ssl read() and write() methods''' + '''Wrapper class for Python socket.ssl read() and write() methods''' def __init__(self, *args): self.parent = SSLWrapper @@ -221,6 +222,10 @@ class NonBlockingTLS(PlugIn): ''' TLS connection used to encrypts already estabilished tcp connection.''' def __init__(self, cacerts, mycerts): + ''' + :param cacerts: path to pem file with certificates of known XMPP servers + :param mycerts: path to pem file with certificates of user trusted servers + ''' PlugIn.__init__(self) self.cacerts = cacerts self.mycerts = mycerts @@ -239,11 +244,9 @@ class NonBlockingTLS(PlugIn): log.info('Starting TLS estabilishing') PlugIn.PlugIn(self, owner) try: - self._owner._plug_idle(writable=False, readable=False) res = self._startSSL() except Exception, e: log.error("PlugIn: while trying _startSSL():", exc_info=True) - #traceback.print_exc() return False return res @@ -278,7 +281,6 @@ class NonBlockingTLS(PlugIn): if result: log.debug("Synchronous handshake completed") - self._owner._plug_idle(writable=True, readable=False) return True else: return False @@ -361,7 +363,6 @@ class NonBlockingTLS(PlugIn): def _ssl_verify_callback(self, sslconn, cert, errnum, depth, ok): # Exceptions can't propagate up through this callback, so print them here. try: - print 'in ssl verify callback' self._owner.ssl_fingerprint_sha1 = cert.digest('sha1') if errnum == 0: return True diff --git a/src/common/xmpp/transports_nb.py b/src/common/xmpp/transports_nb.py index 166dd2956..840adcbb7 100644 --- a/src/common/xmpp/transports_nb.py +++ b/src/common/xmpp/transports_nb.py @@ -84,11 +84,28 @@ CONNECTING = 'CONNECTING' PROXY_CONNECTING = 'PROXY_CONNECTING' CONNECTED = 'CONNECTED' STATES = [DISCONNECTED, CONNECTING, PROXY_CONNECTING, CONNECTED, DISCONNECTING] -# transports have different arguments in constructor and same in connect() -# method + +# Transports have different arguments in constructor and same in connect() +# method. class NonBlockingTransport(PlugIn): + ''' + Abstract class representing a trasport - object responsible for connecting to + XMPP server and putting stanzas on wire in desired form. + ''' def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs): + ''' + Each trasport class can have different constructor but it has to have at + least all the arguments of NonBlockingTransport constructor. + + :param raise_event: callback for monitoring of sent and received data + :param on_disconnect: callback called on disconnection during runtime + :param idlequeue: processing idlequeue + :param estabilish_tls: boolean whether to estabilish TLS connection after TCP + connection is done + :param certs: tuple of (cacerts, mycerts) see tls_nb.NonBlockingTLS + constructor for more details + ''' PlugIn.__init__(self) self.raise_event = raise_event self.on_disconnect = on_disconnect @@ -103,7 +120,7 @@ class NonBlockingTransport(PlugIn): self.certs = certs # type of used ssl lib (if any) will be assigned to this member var self.ssl_lib = None - self._exported_methods=[self.disconnect, self.onreceive, self.set_send_timeout, + self._exported_methods=[self.onreceive, self.set_send_timeout, self.set_timeout, self.remove_timeout, self.start_disconnect] # time to wait for SOME stanza to come and then send keepalive @@ -118,10 +135,15 @@ class NonBlockingTransport(PlugIn): def plugout(self): self._owner.Connection = None self._owner = None + self.disconnect(do_callback=False) def connect(self, conn_5tuple, on_connect, on_connect_failure): ''' - connect method should have the same declaration in all derived transports + Creates and connects transport to server and port defined in conn_5tupe which + should be item from list returned from getaddrinfo. + :param conn_5tuple: 5-tuple returned from getaddrinfo + :param on_connect: callback called on successful connect to the server + :param on_connect_failure: callback called on failure when connecting ''' self.on_connect = on_connect self.on_connect_failure = on_connect_failure @@ -164,8 +186,13 @@ class NonBlockingTransport(PlugIn): self.on_disconnect() def onreceive(self, recv_handler): - ''' Sets the on_receive callback. Do not confuse it with - on_receive() method, which is the callback itself.''' + ''' + Sets the on_receive callback. Do not confuse it with on_receive() method, + which is the callback itself. + onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is the + default one that will decide what to do with received stanza based on its + tag name and namespace. + ''' if not recv_handler: if hasattr(self._owner, 'Dispatcher'): self.on_receive = self._owner.Dispatcher.ProcessNonBlocking @@ -176,9 +203,9 @@ class NonBlockingTransport(PlugIn): def tcp_connecting_started(self): self.set_state(CONNECTING) - # on_connect/on_conn_failure will be called from self.pollin/self.pollout def read_timeout(self): + ''' called when there's no response from server in defined timeout ''' if self.on_timeout: self.on_timeout() self.renew_send_timeout() @@ -212,12 +239,13 @@ class NonBlockingTransport(PlugIn): class NonBlockingTCP(NonBlockingTransport, IdleObject): ''' - Non-blocking TCP socket wrapper + Non-blocking TCP socket wrapper. It is used for simple XMPP connection. Can be + connected via proxy and can estabilish TLS connection. ''' def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs, proxy_dict=None): ''' - Class constructor. + :param proxy_dict: dictionary with proxy data as loaded from config file ''' NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs) @@ -227,7 +255,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): # bytes remained from the last send message self.sendbuff = '' self.proxy_dict = proxy_dict - self.on_remote_disconnect = self.disconnect() + self.on_remote_disconnect = self.disconnect def start_disconnect(self): @@ -236,14 +264,6 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self.disconnect() def connect(self, conn_5tuple, on_connect, on_connect_failure): - ''' - Creates and connects socket to server and port defined in conn_5tupe which - should be list item returned from getaddrinfo. - :param conn_5tuple: 5-tuple returned from getaddrinfo - :param on_connect: callback called on successful tcp connection - :param on_connect_failure: callback called on failure when estabilishing tcp - connection - ''' NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure) log.info('NonBlockingTCP Connect :: About to connect to %s:%s' % (self.server, self.port)) @@ -258,12 +278,13 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self._recv = self._sock.recv self.fd = self._sock.fileno() - # we want to be notified when send is possible to connected socket + # we want to be notified when send is possible to connected socket because + # it means the TCP connection is estabilished self._plug_idle(writable=True, readable=False) self.peerhost = None + #variable for errno symbol that will be found from exception raised from connect() errnum = 0 - ''' variable for errno symbol that will be found from exception raised from connect() ''' # set timeout for TCP connecting - if nonblocking connect() fails, pollend # is called. If if succeeds pollout is called. @@ -280,15 +301,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): log.info('After NB connect() of %s. "%s" raised => CONNECTING' % (id(self),errstr)) self.tcp_connecting_started() return - elif errnum in (0, 10056, errno.EISCONN): - # already connected - this branch is probably useless, nonblocking connect() will - # return EINPROGRESS exception in most cases. When here, we don't need timeout - # on connected descriptor and success callback can be called. - log.info('After connect. "%s" raised => CONNECTED' % errstr) - self._on_connect(self) - return - # if there was some other error, call failure callback and unplug transport + # if there was some other exception, call failure callback and unplug transport # which will also remove read_timeouts for descriptor self._on_connect_failure('Exception while connecting to %s:%s - %s %s' % (self.server, self.port, errnum, errstr)) @@ -312,8 +326,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): def _on_connect(self): ''' - Preceeds invoking of on_connect callback. TCP connection is estabilished at - this time. + Preceeds invoking of on_connect callback. TCP connection is already + estabilished by this this time. ''' if self.estabilish_tls: self.tls_init( @@ -324,6 +338,9 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): def tls_init(self, on_succ, on_fail): + ''' + Estabilishes a TLS/SSL on TCP connection by plugging a NonBlockingTLS module + ''' cacerts, mycerts = self.certs result = tls_nb.NonBlockingTLS(cacerts, mycerts).PlugIn(self) if result: on_succ() @@ -342,6 +359,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): if self.get_state()==CONNECTING: log.info('%s socket wrapper connected' % id(self)) self.idlequeue.remove_timeout(self.fd) + self._plug_idle(writable=False, readable=False) self.peerhost = self._sock.getsockname() if self.proxy_dict: self._connect_to_proxy() else: self._on_connect() @@ -349,6 +367,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self._do_send() def pollend(self): + '''called on error on TCP connection''' log.info('pollend called, state == %s' % self.get_state()) if self.get_state()==CONNECTING: @@ -358,8 +377,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self.disconnect() def disconnect(self, do_callback=True): - if self.get_state() == DISCONNECTED: - return + if self.get_state() == DISCONNECTED: return self.set_state(DISCONNECTED) self.idlequeue.unplug_idle(self.fd) if self.__dict__.has_key('NonBlockingTLS'): self.NonBlockingTLS.PlugOut() @@ -367,14 +385,12 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self._sock.shutdown(socket.SHUT_RDWR) self._sock.close() except socket.error, (errnum, errstr): - log.error('Error while disconnecting a socket: %s %s' % (errnum,errstr)) + log.error('Error while disconnecting socket: %s' % errstr) self.fd = -1 NonBlockingTransport.disconnect(self, do_callback) def read_timeout(self): - ''' - Implemntation of IdleObject function called on timeouts from IdleQueue. - ''' + ''' method called when timeout passed ''' log.warn('read_timeout called, state == %s' % self.get_state()) if self.get_state()==CONNECTING: # if read_timeout is called during connecting, connect() didn't end yet @@ -403,11 +419,9 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): If supplied data is unicode string, encode it to utf-8. ''' NonBlockingTransport.send(self, raw_data, now) - r = raw_data - if isinstance(r, unicode): - r = r.encode('utf-8') - elif not isinstance(r, str): - r = ustr(r).encode('utf-8') + + r = self.encode_stanza(raw_data) + if now: self.sendqueue.insert(0, r) self._do_send() @@ -416,6 +430,12 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self._plug_idle(writable=True, readable=True) + def encode_stanza(self, stanza): + if isinstance(stanza, unicode): + stanza = stanza.encode('utf-8') + elif not isinstance(stanza, str): + stanza = ustr(stanza).encode('utf-8') + return stanza def _plug_idle(self, writable, readable): @@ -433,12 +453,14 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): def _do_send(self): + ''' + Called when send() to connected socket is possible. First message from + sendqueue will be sent. + ''' if not self.sendbuff: if not self.sendqueue: log.warn('calling send on empty buffer and queue') - self._plug_idle( - writable= ((self.sendqueue!=[]) or (self.sendbuff!='')), - readable=True) + self._plug_idle(writable=False, readable=True) return None self.sendbuff = self.sendqueue.pop(0) try: @@ -471,13 +493,16 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): except tls_nb.SSLWrapper.Error, e: log.info("_do_receive, caught SSL error, got %s:" % received , exc_info=True) errnum, errstr = e.exc - + + if received == '': errstr = 'zero bytes on recv' + if (self.ssl_lib is None and received == '') or \ (self.ssl_lib == tls_nb.PYSTDLIB and errnum == 8 ) or \ (self.ssl_lib == tls_nb.PYOPENSSL and errnum == -1 ): # 8 in stdlib: errstr == EOF occured in violation of protocol # -1 in pyopenssl: errstr == Unexpected EOF - log.info("Disconnected by remote server: %s %s" % (errnum, errstr), exc_info=True) + log.info("Disconnected by remote server: #%s, %s" % (errnum, errstr)) + print self.on_remote_disconnect self.on_remote_disconnect() return @@ -489,8 +514,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): # this branch is for case of non-fatal SSL errors - None is returned from # recv() but no errnum is set - if received is None: - return + if received is None: return # we have received some bytes, stop the timeout! self.renew_send_timeout() @@ -519,6 +543,13 @@ class NonBlockingHTTP(NonBlockingTCP): def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs, on_http_request_possible, on_persistent_fallback, http_dict, proxy_dict = None): + ''' + :param on_http_request_possible: method to call when HTTP request to socket + owned by transport is possible. + :param on_persistent_fallback: callback called when server ends TCP + connection. It doesn't have to be fatal for HTTP session. + :param http_dict: dictionary with data for HTTP request and headers + ''' NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs, proxy_dict) @@ -551,8 +582,10 @@ class NonBlockingHTTP(NonBlockingTCP): def _on_receive(self,data): - '''Preceeds passing received data to owner class. Gets rid of HTTP headers - and checks them.''' + ''' + Preceeds passing received data to owner class. Gets rid of HTTP headers and + checks them. + ''' if self.get_state() == PROXY_CONNECTING: NonBlockingTCP._on_receive(self, data) return @@ -648,7 +681,10 @@ class NonBlockingHTTP(NonBlockingTCP): class NonBlockingHTTPBOSH(NonBlockingHTTP): - + ''' + Class for BOSH HTTP connections. Slightly redefines HTTP transport by calling + bosh bodytag generating callback before putting data on wire. + ''' def set_stanza_build_cb(self, build_cb): self.build_cb = build_cb @@ -659,24 +695,10 @@ class NonBlockingHTTPBOSH(NonBlockingHTTP): return if not self.sendbuff: stanza = self.build_cb(socket=self) + stanza = self.encode_stanza(stanza) stanza = self.build_http_message(httpbody=stanza) - if isinstance(stanza, unicode): - stanza = stanza.encode('utf-8') - elif not isinstance(stanza, str): - stanza = ustr(stanza).encode('utf-8') self.sendbuff = stanza - try: - send_count = self._send(self.sendbuff) - if send_count: - sent_data = self.sendbuff[:send_count] - self.sendbuff = self.sendbuff[send_count:] - self._plug_idle(writable = self.sendbuff != '', readable = True) - self.raise_event(DATA_SENT, sent_data) - - except socket.error, e: - log.error('_do_send:', exc_info=True) - traceback.print_exc() - self.disconnect() + NonBlockingTCP._do_send(self)