Coding standards and documentation improvements in transports_nb.py

This commit is contained in:
Stephan Erb 2008-12-24 11:10:38 +00:00
parent 7163be96e0
commit 5c02a907b4
1 changed files with 111 additions and 83 deletions

View File

@ -15,6 +15,14 @@
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
'''
Transports are objects responsible for connecting to XMPP server and putting
data to wrapped sockets in in desired form (SSL, TLS, TCP, for HTTP proxy,
for SOCKS5 proxy...)
Transports are not aware of XMPP stanzas.
'''
from simplexml import ustr
from client import PlugIn
from idlequeue import IdleObject
@ -23,8 +31,6 @@ import proxy_connectors
import tls_nb
import socket
import sys
import os
import errno
import time
import traceback
@ -47,8 +53,8 @@ def urisplit(uri):
def get_proxy_data_from_dict(proxy):
tcp_host, tcp_port, proxy_user, proxy_pass = None, None, None, None
type = proxy['type']
if type == 'bosh' and not proxy['bosh_useproxy']:
proxy_type = proxy['type']
if proxy_type == 'bosh' and not proxy['bosh_useproxy']:
# with BOSH not over proxy we have to parse the hostname from BOSH URI
tcp_host, tcp_port = urisplit(proxy['bosh_uri'])[1], proxy['bosh_port']
else:
@ -83,10 +89,10 @@ STATES = [DISCONNECTED, CONNECTING, PROXY_CONNECTING, CONNECTED, DISCONNECTING]
class NonBlockingTransport(PlugIn):
'''
Abstract class representing a trasport - object responsible for connecting to
XMPP server and putting stanzas on wire in desired form.
Abstract class representing a transport.
'''
def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs):
def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
certs):
'''
Each trasport class can have different constructor but it has to have at
least all the arguments of NonBlockingTransport constructor.
@ -94,10 +100,10 @@ class NonBlockingTransport(PlugIn):
:param raise_event: callback for monitoring of sent and received data
:param on_disconnect: callback called on disconnection during runtime
:param idlequeue: processing idlequeue
:param estabilish_tls: boolean whether to estabilish TLS connection after TCP
connection is done
:param certs: tuple of (cacerts, mycerts) see tls_nb.NonBlockingTLS
constructor for more details
:param estabilish_tls: boolean whether to estabilish TLS connection after
TCP connection is done
:param certs: tuple of (cacerts, mycerts) see constructor of
tls_nb.NonBlockingTLS for more details
'''
PlugIn.__init__(self)
self.raise_event = raise_event
@ -108,6 +114,7 @@ class NonBlockingTransport(PlugIn):
self.on_receive = None
self.server = None
self.port = None
self.conn_5tuple = None
self.set_state(DISCONNECTED)
self.estabilish_tls = estabilish_tls
self.certs = certs
@ -123,7 +130,7 @@ class NonBlockingTransport(PlugIn):
self.on_timeout = None
def plugin(self, owner):
owner.Connection=self
owner.Connection = self
def plugout(self):
self._owner.Connection = None
@ -132,8 +139,9 @@ class NonBlockingTransport(PlugIn):
def connect(self, conn_5tuple, on_connect, on_connect_failure):
'''
Creates and connects transport to server and port defined in conn_5tupe which
should be item from list returned from getaddrinfo.
Creates and connects transport to server and port defined in conn_5tuple
which should be item from list returned from getaddrinfo.
:param conn_5tuple: 5-tuple returned from getaddrinfo
:param on_connect: callback called on successful connect to the server
:param on_connect_failure: callback called on failure when connecting
@ -157,7 +165,7 @@ class NonBlockingTransport(PlugIn):
self.set_state(CONNECTED)
self.on_connect()
def _on_connect_failure(self,err_message):
def _on_connect_failure(self, err_message):
''' preceeds call of on_connect_failure callback '''
# In case of error while connecting we need to disconnect transport
# but we don't want to call DisconnectHandlers from client,
@ -178,11 +186,13 @@ class NonBlockingTransport(PlugIn):
def onreceive(self, recv_handler):
'''
Sets the on_receive callback. Do not confuse it with on_receive() method,
which is the callback itself.
onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is the
default one that will decide what to do with received stanza based on its
tag name and namespace.
Sets the on_receive callback.
onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is
the default one that will decide what to do with received stanza based on
its tag name and namespace.
Do not confuse it with on_receive() method, which is the callback itself.
'''
if not recv_handler:
if hasattr(self._owner, 'Dispatcher'):
@ -229,8 +239,10 @@ class NonBlockingTransport(PlugIn):
class NonBlockingTCP(NonBlockingTransport, IdleObject):
'''
Non-blocking TCP socket wrapper. It is used for simple XMPP connection. Can be
connected via proxy and can estabilish TLS connection.
Non-blocking TCP socket wrapper.
It is used for simple XMPP connection. Can be connected via proxy and can
estabilish TLS connection.
'''
def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
certs, proxy_dict=None):
@ -239,6 +251,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
'''
NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue,
estabilish_tls, certs)
IdleObject.__init__(self)
# queue with messages to be send
self.sendqueue = []
@ -255,14 +268,16 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self.disconnect()
def connect(self, conn_5tuple, on_connect, on_connect_failure):
NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure)
log.info('NonBlockingTCP Connect :: About to connect to %s:%s' % (self.server, self.port))
NonBlockingTransport.connect(self, conn_5tuple, on_connect,
on_connect_failure)
log.info('NonBlockingTCP Connect :: About to connect to %s:%s' %
(self.server, self.port))
try:
self._sock = socket.socket(*conn_5tuple[:3])
except socket.error, (errnum, errstr):
self._on_connect_failure('NonBlockingTCP Connect: Error while creating socket:\
%s %s' % (errnum, errstr))
self._on_connect_failure('NonBlockingTCP Connect: Error while creating\
socket: %s %s' % (errnum, errstr))
return
self._send = self._sock.send
@ -274,7 +289,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._plug_idle(writable=True, readable=False)
self.peerhost = None
# variable for errno symbol that will be found from exception raised from connect()
# variable for errno symbol that will be found from exception raised
# from connect()
errnum = 0
# set timeout for TCP connecting - if nonblocking connect() fails, pollend
@ -283,18 +299,19 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
try:
self._sock.setblocking(False)
self._sock.connect((self.server,self.port))
self._sock.connect((self.server, self.port))
except Exception, (errnum, errstr):
pass
if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
# connecting in progress
log.info('After NB connect() of %s. "%s" raised => CONNECTING' % (id(self),errstr))
log.info('After NB connect() of %s. "%s" raised => CONNECTING' %
(id(self), errstr))
self.tcp_connecting_started()
return
# if there was some other exception, call failure callback and unplug transport
# which will also remove read_timeouts for descriptor
# if there was some other exception, call failure callback and unplug
# transport which will also remove read_timeouts for descriptor
self._on_connect_failure('Exception while connecting to %s:%s - %s %s' %
(self.server, self.port, errnum, errstr))
@ -322,14 +339,16 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
if self.estabilish_tls:
self.tls_init(
on_succ = lambda: NonBlockingTransport._on_connect(self),
on_fail = lambda: self._on_connect_failure('error while estabilishing TLS'))
on_fail = lambda: self._on_connect_failure(
'error while estabilishing TLS'))
else:
NonBlockingTransport._on_connect(self)
def tls_init(self, on_succ, on_fail):
'''
Estabilishes a TLS/SSL on TCP connection by plugging a NonBlockingTLS module
Estabilishes TLS/SSL using this TCP connection by plugging a
NonBlockingTLS module
'''
cacerts, mycerts = self.certs
result = tls_nb.NonBlockingTLS(cacerts, mycerts).PlugIn(self)
@ -339,29 +358,31 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
on_fail()
def pollin(self):
'''called when receive on plugged socket is possible '''
'''called by idlequeu when receive on plugged socket is possible '''
log.info('pollin called, state == %s' % self.get_state())
self._do_receive()
def pollout(self):
'''called when send to plugged socket is possible'''
'''called by idlequeu when send to plugged socket is possible'''
log.info('pollout called, state == %s' % self.get_state())
if self.get_state()==CONNECTING:
if self.get_state() == CONNECTING:
log.info('%s socket wrapper connected' % id(self))
self.idlequeue.remove_timeout(self.fd)
self._plug_idle(writable=False, readable=False)
self.peerhost = self._sock.getsockname()
if self.proxy_dict: self._connect_to_proxy()
else: self._on_connect()
return
self._do_send()
if self.proxy_dict:
self._connect_to_proxy()
else:
self._on_connect()
else:
self._do_send()
def pollend(self):
'''called on error on TCP connection'''
'''called by idlequeue on TCP connection errors'''
log.info('pollend called, state == %s' % self.get_state())
if self.get_state()==CONNECTING:
if self.get_state() == CONNECTING:
self._on_connect_failure('Error during connect to %s:%s' %
(self.server, self.port))
else:
@ -382,7 +403,6 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
NonBlockingTransport.disconnect(self, do_callback)
def read_timeout(self):
''' method called when timeout passed '''
log.info('read_timeout called, state == %s' % self.get_state())
if self.get_state()==CONNECTING:
# if read_timeout is called during connecting, connect() didn't end yet
@ -396,7 +416,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
if self.get_state() != DISCONNECTED and self.fd != -1:
NonBlockingTransport.set_timeout(self, timeout)
else:
log.warn('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' % (self.get_state(), self.fd))
log.warn('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' %
(self.get_state(), self.fd))
def remove_timeout(self):
if self.fd:
@ -405,7 +426,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
log.warn('remove_timeout: no self.fd state is %s' % self.get_state())
def send(self, raw_data, now=False):
'''Append raw_data to the queue of messages to be send.
'''
Append raw_data to the queue of messages to be send.
If supplied data is unicode string, encode it to utf-8.
'''
NonBlockingTransport.send(self, raw_data, now)
@ -421,6 +443,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._plug_idle(writable=True, readable=True)
def encode_stanza(self, stanza):
''' Encode str or unicode to utf-8 '''
if isinstance(stanza, unicode):
stanza = stanza.encode('utf-8')
elif not isinstance(stanza, str):
@ -429,16 +452,18 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
def _plug_idle(self, writable, readable):
'''
Plugs file descriptor of socket to Idlequeue. Plugged socket
will be watched for "send possible" or/and "recv possible" events. pollin()
callback is invoked on "recv possible", pollout() on "send_possible".
Plugs file descriptor of socket to Idlequeue.
Plugged socket will be watched for "send possible" or/and "recv possible"
events. pollin() callback is invoked on "recv possible", pollout() on
"send_possible".
Plugged socket will always be watched for "error" event - in that case,
pollend() is called.
'''
log.info('Plugging fd %d, W:%s, R:%s' % (self.fd, writable, readable))
self.idlequeue.plug_idle(self, writable, readable)
def _do_send(self):
'''
Called when send() to connected socket is possible. First message from
@ -466,7 +491,10 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self.disconnect()
def _do_receive(self):
''' Reads all pending incoming data. Calls owner's disconnected() method if appropriate.'''
'''
Reads all pending incoming data. Will call owner's disconnected() method
if appropriate.
'''
received = None
errnum = 0
errstr = 'No Error Set'
@ -477,7 +505,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
except socket.error, (errnum, errstr):
log.info("_do_receive: got %s:" % received , exc_info=True)
except tls_nb.SSLWrapper.Error, e:
log.info("_do_receive, caught SSL error, got %s:" % received , exc_info=True)
log.info("_do_receive, caught SSL error, got %s:" % received,
exc_info=True)
errnum, errstr = e.exc
if received == '': errstr = 'zero bytes on recv'
@ -491,15 +520,14 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self.on_remote_disconnect()
return
if errnum:
log.error("Connection to %s:%s lost: %s %s" % ( self.server, self.port, errnum, errstr), exc_info=True)
log.error("Connection to %s:%s lost: %s %s" % (self.server, self.port,
errnum, errstr), exc_info=True)
self.disconnect()
return
# this branch is for case of non-fatal SSL errors - None is returned from
# recv() but no errnum is set
if received is None:
return
@ -510,48 +538,50 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self.raise_event(DATA_RECEIVED, received)
self._on_receive(received)
else:
# This should never happen, so we need the debug. (If there is no handler
# on receive specified, data are passed to Dispatcher.ProcessNonBlocking)
log.error('SOCKET %s Unhandled data received: %s' % (id(self), received))
# This should never happen, so we need the debug.
# (If there is no handler on receive specified, data is passed to
# Dispatcher.ProcessNonBlocking)
log.error('SOCKET %s Unhandled data received: %s' % (id(self),
received))
self.disconnect()
def _on_receive(self, data):
''' preceeds on_receive callback. It peels off and checks HTTP headers in
class, in here it just calls the callback.'''
HTTP classes, in here it just calls the callback.'''
self.on_receive(data)
class NonBlockingHTTP(NonBlockingTCP):
'''
Socket wrapper that creates HTTP message out of sent data and peels-off
HTTP headers from incoming messages
HTTP headers from incoming messages.
'''
def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
certs, on_http_request_possible, on_persistent_fallback, http_dict,
proxy_dict = None):
'''
:param on_http_request_possible: method to call when HTTP request to socket
owned by transport is possible.
:param on_http_request_possible: method to call when HTTP request to
socket owned by transport is possible.
:param on_persistent_fallback: callback called when server ends TCP
connection. It doesn't have to be fatal for HTTP session.
:param http_dict: dictionary with data for HTTP request and headers
'''
NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue,
estabilish_tls, certs, proxy_dict)
self.http_protocol, self.http_host, self.http_path = urisplit(http_dict['http_uri'])
if self.http_protocol is None:
self.http_protocol = 'http'
if self.http_path == '':
self.http_path = '/'
self.http_protocol, self.http_host, self.http_path = urisplit(
http_dict['http_uri'])
self.http_protocol = self.http_protocol or 'http'
self.http_path = self.http_path or '/'
self.http_port = http_dict['http_port']
self.http_version = http_dict['http_version']
self.http_persistent = http_dict['http_persistent']
self.add_proxy_headers = http_dict['add_proxy_headers']
if http_dict.has_key('proxy_user') and http_dict.has_key('proxy_pass'):
self.proxy_user, self.proxy_pass = http_dict['proxy_user'], http_dict['proxy_pass']
if 'proxy_user' in http_dict and 'proxy_pass' in http_dict:
self.proxy_user, self.proxy_pass = http_dict['proxy_user'], http_dict[
'proxy_pass']
else:
self.proxy_user, self.proxy_pass = None, None
@ -567,7 +597,7 @@ class NonBlockingHTTP(NonBlockingTCP):
def http_send(self, raw_data, now=False):
self.send(self.build_http_message(raw_data), now)
def _on_receive(self,data):
def _on_receive(self, data):
'''
Preceeds passing received data to owner class. Gets rid of HTTP headers
and checks them.
@ -587,7 +617,7 @@ class NonBlockingHTTP(NonBlockingTCP):
self.disconnect()
return
self.expected_length = int(headers['Content-Length'])
if headers.has_key('Connection') and headers['Connection'].strip()=='close':
if 'Connection' in headers and headers['Connection'].strip()=='close':
self.close_current_connection = True
else:
#sth in recvbuff - append currently received data to HTTP msg in buffer
@ -603,8 +633,8 @@ class NonBlockingHTTP(NonBlockingTCP):
# everything was received
httpbody = self.recvbuff
self.recvbuff=''
self.expected_length=0
self.recvbuff = ''
self.expected_length = 0
if not self.http_persistent or self.close_current_connection:
# not-persistent connections disconnect after response
@ -614,7 +644,6 @@ class NonBlockingHTTP(NonBlockingTCP):
self.on_receive(data=httpbody, socket=self)
self.on_http_request_possible()
def build_http_message(self, httpbody, method='POST'):
'''
Builds http message with given body.
@ -642,12 +671,11 @@ class NonBlockingHTTP(NonBlockingTCP):
def parse_http_message(self, message):
'''
splits http message to tuple (
statusline - list of e.g. ['HTTP/1.1', '200', 'OK'],
headers - dictionary of headers e.g. {'Content-Length': '604',
'Content-Type': 'text/xml; charset=utf-8'},
httpbody - string with http body
)
splits http message to tuple:
(statusline - list of e.g. ['HTTP/1.1', '200', 'OK'],
headers - dictionary of headers e.g. {'Content-Length': '604',
'Content-Type': 'text/xml; charset=utf-8'},
httpbody - string with http body)
'''
message = message.replace('\r','')
(header, httpbody) = message.split('\n\n', 1)
@ -656,7 +684,7 @@ class NonBlockingHTTP(NonBlockingTCP):
header = header[1:]
headers = {}
for dummy in header:
row = dummy.split(' ',1)
row = dummy.split(' ', 1)
headers[row[0][:-1]] = row[1]
return (statusline, headers, httpbody)