diff --git a/src/common/xmpp/transports_nb.py b/src/common/xmpp/transports_nb.py index d5750fd7b..0653da461 100644 --- a/src/common/xmpp/transports_nb.py +++ b/src/common/xmpp/transports_nb.py @@ -15,6 +15,14 @@ ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. +''' +Transports are objects responsible for connecting to XMPP server and putting +data to wrapped sockets in in desired form (SSL, TLS, TCP, for HTTP proxy, +for SOCKS5 proxy...) + +Transports are not aware of XMPP stanzas. +''' + from simplexml import ustr from client import PlugIn from idlequeue import IdleObject @@ -23,8 +31,6 @@ import proxy_connectors import tls_nb import socket -import sys -import os import errno import time import traceback @@ -47,15 +53,15 @@ def urisplit(uri): def get_proxy_data_from_dict(proxy): tcp_host, tcp_port, proxy_user, proxy_pass = None, None, None, None - type = proxy['type'] - if type == 'bosh' and not proxy['bosh_useproxy']: + proxy_type = proxy['type'] + if proxy_type == 'bosh' and not proxy['bosh_useproxy']: # with BOSH not over proxy we have to parse the hostname from BOSH URI tcp_host, tcp_port = urisplit(proxy['bosh_uri'])[1], proxy['bosh_port'] else: # with proxy!=bosh or with bosh over HTTP proxy we're connecting to proxy # machine tcp_host, tcp_port = proxy['host'], proxy['port'] - if proxy['useauth']: + if proxy['useauth']: proxy_user, proxy_pass = proxy['user'], proxy['pass'] return tcp_host, tcp_port, proxy_user, proxy_pass @@ -83,10 +89,10 @@ STATES = [DISCONNECTED, CONNECTING, PROXY_CONNECTING, CONNECTED, DISCONNECTING] class NonBlockingTransport(PlugIn): ''' - Abstract class representing a trasport - object responsible for connecting to - XMPP server and putting stanzas on wire in desired form. + Abstract class representing a transport. ''' - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs): + def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, + certs): ''' Each trasport class can have different constructor but it has to have at least all the arguments of NonBlockingTransport constructor. @@ -94,10 +100,10 @@ class NonBlockingTransport(PlugIn): :param raise_event: callback for monitoring of sent and received data :param on_disconnect: callback called on disconnection during runtime :param idlequeue: processing idlequeue - :param estabilish_tls: boolean whether to estabilish TLS connection after TCP - connection is done - :param certs: tuple of (cacerts, mycerts) see tls_nb.NonBlockingTLS - constructor for more details + :param estabilish_tls: boolean whether to estabilish TLS connection after + TCP connection is done + :param certs: tuple of (cacerts, mycerts) see constructor of + tls_nb.NonBlockingTLS for more details ''' PlugIn.__init__(self) self.raise_event = raise_event @@ -108,6 +114,7 @@ class NonBlockingTransport(PlugIn): self.on_receive = None self.server = None self.port = None + self.conn_5tuple = None self.set_state(DISCONNECTED) self.estabilish_tls = estabilish_tls self.certs = certs @@ -123,7 +130,7 @@ class NonBlockingTransport(PlugIn): self.on_timeout = None def plugin(self, owner): - owner.Connection=self + owner.Connection = self def plugout(self): self._owner.Connection = None @@ -132,8 +139,9 @@ class NonBlockingTransport(PlugIn): def connect(self, conn_5tuple, on_connect, on_connect_failure): ''' - Creates and connects transport to server and port defined in conn_5tupe which - should be item from list returned from getaddrinfo. + Creates and connects transport to server and port defined in conn_5tuple + which should be item from list returned from getaddrinfo. + :param conn_5tuple: 5-tuple returned from getaddrinfo :param on_connect: callback called on successful connect to the server :param on_connect_failure: callback called on failure when connecting @@ -157,7 +165,7 @@ class NonBlockingTransport(PlugIn): self.set_state(CONNECTED) self.on_connect() - def _on_connect_failure(self,err_message): + def _on_connect_failure(self, err_message): ''' preceeds call of on_connect_failure callback ''' # In case of error while connecting we need to disconnect transport # but we don't want to call DisconnectHandlers from client, @@ -178,11 +186,13 @@ class NonBlockingTransport(PlugIn): def onreceive(self, recv_handler): ''' - Sets the on_receive callback. Do not confuse it with on_receive() method, - which is the callback itself. - onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is the - default one that will decide what to do with received stanza based on its - tag name and namespace. + Sets the on_receive callback. + + onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is + the default one that will decide what to do with received stanza based on + its tag name and namespace. + + Do not confuse it with on_receive() method, which is the callback itself. ''' if not recv_handler: if hasattr(self._owner, 'Dispatcher'): @@ -229,8 +239,10 @@ class NonBlockingTransport(PlugIn): class NonBlockingTCP(NonBlockingTransport, IdleObject): ''' - Non-blocking TCP socket wrapper. It is used for simple XMPP connection. Can be - connected via proxy and can estabilish TLS connection. + Non-blocking TCP socket wrapper. + + It is used for simple XMPP connection. Can be connected via proxy and can + estabilish TLS connection. ''' def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs, proxy_dict=None): @@ -239,6 +251,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): ''' NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs) + IdleObject.__init__(self) # queue with messages to be send self.sendqueue = [] @@ -255,14 +268,16 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self.disconnect() def connect(self, conn_5tuple, on_connect, on_connect_failure): - NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure) - log.info('NonBlockingTCP Connect :: About to connect to %s:%s' % (self.server, self.port)) + NonBlockingTransport.connect(self, conn_5tuple, on_connect, + on_connect_failure) + log.info('NonBlockingTCP Connect :: About to connect to %s:%s' % + (self.server, self.port)) try: self._sock = socket.socket(*conn_5tuple[:3]) except socket.error, (errnum, errstr): - self._on_connect_failure('NonBlockingTCP Connect: Error while creating socket:\ - %s %s' % (errnum, errstr)) + self._on_connect_failure('NonBlockingTCP Connect: Error while creating\ + socket: %s %s' % (errnum, errstr)) return self._send = self._sock.send @@ -274,7 +289,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self._plug_idle(writable=True, readable=False) self.peerhost = None - # variable for errno symbol that will be found from exception raised from connect() + # variable for errno symbol that will be found from exception raised + # from connect() errnum = 0 # set timeout for TCP connecting - if nonblocking connect() fails, pollend @@ -283,18 +299,19 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): try: self._sock.setblocking(False) - self._sock.connect((self.server,self.port)) + self._sock.connect((self.server, self.port)) except Exception, (errnum, errstr): pass if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): # connecting in progress - log.info('After NB connect() of %s. "%s" raised => CONNECTING' % (id(self),errstr)) + log.info('After NB connect() of %s. "%s" raised => CONNECTING' % + (id(self), errstr)) self.tcp_connecting_started() return - # if there was some other exception, call failure callback and unplug transport - # which will also remove read_timeouts for descriptor + # if there was some other exception, call failure callback and unplug + # transport which will also remove read_timeouts for descriptor self._on_connect_failure('Exception while connecting to %s:%s - %s %s' % (self.server, self.port, errnum, errstr)) @@ -322,14 +339,16 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): if self.estabilish_tls: self.tls_init( on_succ = lambda: NonBlockingTransport._on_connect(self), - on_fail = lambda: self._on_connect_failure('error while estabilishing TLS')) + on_fail = lambda: self._on_connect_failure( + 'error while estabilishing TLS')) else: NonBlockingTransport._on_connect(self) def tls_init(self, on_succ, on_fail): ''' - Estabilishes a TLS/SSL on TCP connection by plugging a NonBlockingTLS module + Estabilishes TLS/SSL using this TCP connection by plugging a + NonBlockingTLS module ''' cacerts, mycerts = self.certs result = tls_nb.NonBlockingTLS(cacerts, mycerts).PlugIn(self) @@ -339,29 +358,31 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): on_fail() def pollin(self): - '''called when receive on plugged socket is possible ''' + '''called by idlequeu when receive on plugged socket is possible ''' log.info('pollin called, state == %s' % self.get_state()) self._do_receive() def pollout(self): - '''called when send to plugged socket is possible''' + '''called by idlequeu when send to plugged socket is possible''' log.info('pollout called, state == %s' % self.get_state()) - if self.get_state()==CONNECTING: + if self.get_state() == CONNECTING: log.info('%s socket wrapper connected' % id(self)) self.idlequeue.remove_timeout(self.fd) self._plug_idle(writable=False, readable=False) self.peerhost = self._sock.getsockname() - if self.proxy_dict: self._connect_to_proxy() - else: self._on_connect() - return - self._do_send() + if self.proxy_dict: + self._connect_to_proxy() + else: + self._on_connect() + else: + self._do_send() def pollend(self): - '''called on error on TCP connection''' + '''called by idlequeue on TCP connection errors''' log.info('pollend called, state == %s' % self.get_state()) - if self.get_state()==CONNECTING: + if self.get_state() == CONNECTING: self._on_connect_failure('Error during connect to %s:%s' % (self.server, self.port)) else: @@ -382,7 +403,6 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): NonBlockingTransport.disconnect(self, do_callback) def read_timeout(self): - ''' method called when timeout passed ''' log.info('read_timeout called, state == %s' % self.get_state()) if self.get_state()==CONNECTING: # if read_timeout is called during connecting, connect() didn't end yet @@ -396,7 +416,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): if self.get_state() != DISCONNECTED and self.fd != -1: NonBlockingTransport.set_timeout(self, timeout) else: - log.warn('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' % (self.get_state(), self.fd)) + log.warn('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' % + (self.get_state(), self.fd)) def remove_timeout(self): if self.fd: @@ -405,7 +426,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): log.warn('remove_timeout: no self.fd state is %s' % self.get_state()) def send(self, raw_data, now=False): - '''Append raw_data to the queue of messages to be send. + ''' + Append raw_data to the queue of messages to be send. If supplied data is unicode string, encode it to utf-8. ''' NonBlockingTransport.send(self, raw_data, now) @@ -421,6 +443,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self._plug_idle(writable=True, readable=True) def encode_stanza(self, stanza): + ''' Encode str or unicode to utf-8 ''' if isinstance(stanza, unicode): stanza = stanza.encode('utf-8') elif not isinstance(stanza, str): @@ -429,16 +452,18 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): def _plug_idle(self, writable, readable): ''' - Plugs file descriptor of socket to Idlequeue. Plugged socket - will be watched for "send possible" or/and "recv possible" events. pollin() - callback is invoked on "recv possible", pollout() on "send_possible". + Plugs file descriptor of socket to Idlequeue. + + Plugged socket will be watched for "send possible" or/and "recv possible" + events. pollin() callback is invoked on "recv possible", pollout() on + "send_possible". + Plugged socket will always be watched for "error" event - in that case, pollend() is called. ''' log.info('Plugging fd %d, W:%s, R:%s' % (self.fd, writable, readable)) self.idlequeue.plug_idle(self, writable, readable) - def _do_send(self): ''' Called when send() to connected socket is possible. First message from @@ -466,7 +491,10 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self.disconnect() def _do_receive(self): - ''' Reads all pending incoming data. Calls owner's disconnected() method if appropriate.''' + ''' + Reads all pending incoming data. Will call owner's disconnected() method + if appropriate. + ''' received = None errnum = 0 errstr = 'No Error Set' @@ -477,7 +505,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): except socket.error, (errnum, errstr): log.info("_do_receive: got %s:" % received , exc_info=True) except tls_nb.SSLWrapper.Error, e: - log.info("_do_receive, caught SSL error, got %s:" % received , exc_info=True) + log.info("_do_receive, caught SSL error, got %s:" % received, + exc_info=True) errnum, errstr = e.exc if received == '': errstr = 'zero bytes on recv' @@ -491,15 +520,14 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self.on_remote_disconnect() return - if errnum: - log.error("Connection to %s:%s lost: %s %s" % ( self.server, self.port, errnum, errstr), exc_info=True) + log.error("Connection to %s:%s lost: %s %s" % (self.server, self.port, + errnum, errstr), exc_info=True) self.disconnect() return # this branch is for case of non-fatal SSL errors - None is returned from # recv() but no errnum is set - if received is None: return @@ -510,48 +538,50 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self.raise_event(DATA_RECEIVED, received) self._on_receive(received) else: - # This should never happen, so we need the debug. (If there is no handler - # on receive specified, data are passed to Dispatcher.ProcessNonBlocking) - log.error('SOCKET %s Unhandled data received: %s' % (id(self), received)) + # This should never happen, so we need the debug. + # (If there is no handler on receive specified, data is passed to + # Dispatcher.ProcessNonBlocking) + log.error('SOCKET %s Unhandled data received: %s' % (id(self), + received)) self.disconnect() def _on_receive(self, data): ''' preceeds on_receive callback. It peels off and checks HTTP headers in - class, in here it just calls the callback.''' + HTTP classes, in here it just calls the callback.''' self.on_receive(data) class NonBlockingHTTP(NonBlockingTCP): ''' Socket wrapper that creates HTTP message out of sent data and peels-off - HTTP headers from incoming messages + HTTP headers from incoming messages. ''' def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs, on_http_request_possible, on_persistent_fallback, http_dict, proxy_dict = None): ''' - :param on_http_request_possible: method to call when HTTP request to socket - owned by transport is possible. + :param on_http_request_possible: method to call when HTTP request to + socket owned by transport is possible. :param on_persistent_fallback: callback called when server ends TCP connection. It doesn't have to be fatal for HTTP session. :param http_dict: dictionary with data for HTTP request and headers ''' - NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs, proxy_dict) - self.http_protocol, self.http_host, self.http_path = urisplit(http_dict['http_uri']) - if self.http_protocol is None: - self.http_protocol = 'http' - if self.http_path == '': - self.http_path = '/' + self.http_protocol, self.http_host, self.http_path = urisplit( + http_dict['http_uri']) + self.http_protocol = self.http_protocol or 'http' + self.http_path = self.http_path or '/' self.http_port = http_dict['http_port'] self.http_version = http_dict['http_version'] self.http_persistent = http_dict['http_persistent'] self.add_proxy_headers = http_dict['add_proxy_headers'] - if http_dict.has_key('proxy_user') and http_dict.has_key('proxy_pass'): - self.proxy_user, self.proxy_pass = http_dict['proxy_user'], http_dict['proxy_pass'] + + if 'proxy_user' in http_dict and 'proxy_pass' in http_dict: + self.proxy_user, self.proxy_pass = http_dict['proxy_user'], http_dict[ + 'proxy_pass'] else: self.proxy_user, self.proxy_pass = None, None @@ -567,7 +597,7 @@ class NonBlockingHTTP(NonBlockingTCP): def http_send(self, raw_data, now=False): self.send(self.build_http_message(raw_data), now) - def _on_receive(self,data): + def _on_receive(self, data): ''' Preceeds passing received data to owner class. Gets rid of HTTP headers and checks them. @@ -579,7 +609,7 @@ class NonBlockingHTTP(NonBlockingTCP): # recvbuff empty - fresh HTTP message was received try: statusline, headers, self.recvbuff = self.parse_http_message(data) - except ValueError: + except ValueError: self.disconnect() return if statusline[1] != '200': @@ -587,7 +617,7 @@ class NonBlockingHTTP(NonBlockingTCP): self.disconnect() return self.expected_length = int(headers['Content-Length']) - if headers.has_key('Connection') and headers['Connection'].strip()=='close': + if 'Connection' in headers and headers['Connection'].strip()=='close': self.close_current_connection = True else: #sth in recvbuff - append currently received data to HTTP msg in buffer @@ -603,8 +633,8 @@ class NonBlockingHTTP(NonBlockingTCP): # everything was received httpbody = self.recvbuff - self.recvbuff='' - self.expected_length=0 + self.recvbuff = '' + self.expected_length = 0 if not self.http_persistent or self.close_current_connection: # not-persistent connections disconnect after response @@ -614,7 +644,6 @@ class NonBlockingHTTP(NonBlockingTCP): self.on_receive(data=httpbody, socket=self) self.on_http_request_possible() - def build_http_message(self, httpbody, method='POST'): ''' Builds http message with given body. @@ -642,12 +671,11 @@ class NonBlockingHTTP(NonBlockingTCP): def parse_http_message(self, message): ''' - splits http message to tuple ( - statusline - list of e.g. ['HTTP/1.1', '200', 'OK'], - headers - dictionary of headers e.g. {'Content-Length': '604', - 'Content-Type': 'text/xml; charset=utf-8'}, - httpbody - string with http body - ) + splits http message to tuple: + (statusline - list of e.g. ['HTTP/1.1', '200', 'OK'], + headers - dictionary of headers e.g. {'Content-Length': '604', + 'Content-Type': 'text/xml; charset=utf-8'}, + httpbody - string with http body) ''' message = message.replace('\r','') (header, httpbody) = message.split('\n\n', 1) @@ -656,7 +684,7 @@ class NonBlockingHTTP(NonBlockingTCP): header = header[1:] headers = {} for dummy in header: - row = dummy.split(' ',1) + row = dummy.split(' ', 1) headers[row[0][:-1]] = row[1] return (statusline, headers, httpbody)