improved disconnect handling, added comments, fixed minor bugs

This commit is contained in:
tomk 2008-08-17 22:57:48 +00:00
parent a76c173816
commit acdf4ff143
6 changed files with 369 additions and 269 deletions

View File

@ -581,8 +581,8 @@ class Connection(ConnectionHandlers):
if self.on_connect_success == self._on_new_account:
con.RegisterDisconnectHandler(self._on_new_account)
log.info('Connecting to %s: [%s:%d]', self.name,
self._current_host['host'], port)
self.log_hosttype_info(port)
con.connect(
hostname=self._current_host['host'],
port=port,
@ -594,6 +594,19 @@ class Connection(ConnectionHandlers):
else:
self.connect_to_next_host(retry)
def log_hosttype_info(self, port):
msg = '>>>>>> Connecting to %s [%s:%d], type = %s' % (self.name,
self._current_host['host'], port, self._current_type)
log.info(msg)
if self._proxy:
msg = '>>>>>> '
if self._proxy['type']=='bosh':
msg = '%s over BOSH %s:%s' % (msg, self._proxy['bosh_uri'], self._proxy['bosh_port'])
if self._proxy['type'] in ['http','socks5'] or self._proxy['bosh_useproxy']:
msg = '%s over proxy %s:%s' % (msg, self._proxy['host'], self._proxy['port'])
log.info(msg)
def _connect_failure(self, con_type = None):
if not con_type:

View File

@ -30,10 +30,9 @@ log = logging.getLogger('gajim.c.x.bosh')
KEY_COUNT = 10
# Fake file descriptor - it's used for setting read_timeout in idlequeue for
# BOSH Transport. In TCP-derived transports this is file descriptor of socket.
FAKE_DESCRIPTOR = -1337
'''Fake file descriptor - it's used for setting read_timeout in idlequeue for
BOSH Transport.
In TCP-derived transports it is file descriptor of socket'''
class NonBlockingBOSH(NonBlockingTransport):
@ -126,10 +125,11 @@ class NonBlockingBOSH(NonBlockingTransport):
def on_http_request_possible(self):
'''
Called after HTTP response is received - when another request is possible.
Called when HTTP request it's possible to send a HTTP request. It can be when
socket is connected or when HTTP response arrived.
There should be always one pending request to BOSH CM.
'''
log.info('on_http_req possible, state:\n%s' % self.get_current_state())
log.debug('on_http_req possible, state:\n%s' % self.get_current_state())
if self.get_state()==DISCONNECTED: return
#Hack for making the non-secure warning dialog work
@ -137,6 +137,10 @@ class NonBlockingBOSH(NonBlockingTransport):
if (hasattr(self._owner, 'NonBlockingNonSASL') or hasattr(self._owner, 'SASL')):
self.send_BOSH(None)
else:
# If we already got features and no auth module was plugged yet, we are
# probably waiting for confirmation of the "not-secure-connection" dialog.
# We don't send HTTP request in that case.
# see http://lists.jabber.ru/pipermail/ejabberd/2008-August/004027.html
return
else:
self.send_BOSH(None)
@ -144,18 +148,20 @@ class NonBlockingBOSH(NonBlockingTransport):
def get_socket_in(self, state):
''' gets sockets in desired state '''
for s in self.http_socks:
if s.get_state()==state: return s
return None
def get_free_socket(self):
''' Selects and returns socket eligible for sending a data to.'''
if self.http_pipelining:
return self.get_socket_in(CONNECTED)
else:
last_recv_time, tmpsock = 0, None
for s in self.http_socks:
# we're interested only into CONNECTED socket with no req pending
# we're interested only in CONNECTED socket with no requests pending
if s.get_state()==CONNECTED and s.pending_requests==0:
# if there's more of them, we want the one with the least recent data receive
# (lowest last_recv_time)
@ -169,6 +175,10 @@ class NonBlockingBOSH(NonBlockingTransport):
def send_BOSH(self, payload):
'''
Tries to send a stanza in payload by appeding it to a buffer and plugging a
free socket for writing.
'''
total_pending_reqs = sum([s.pending_requests for s in self.http_socks])
# when called after HTTP response (Payload=None) and when there are already
@ -192,7 +202,8 @@ class NonBlockingBOSH(NonBlockingTransport):
self.get_current_state())
return
# when there's free CONNECTED socket, we flush the data
# when there's free CONNECTED socket, we plug it for write and the data will
# be sent when write is possible
if self.get_free_socket():
self.plug_socket()
return
@ -209,8 +220,7 @@ class NonBlockingBOSH(NonBlockingTransport):
if s:
self.connect_and_flush(s)
else:
#if len(self.http_socks) > 1: return
print 'connecting sock'
# otherwise create and connect a new one
ss = self.get_new_http_socket()
self.http_socks.append(ss)
self.connect_and_flush(ss)
@ -225,6 +235,15 @@ class NonBlockingBOSH(NonBlockingTransport):
log.error('=====!!!!!!!!====> Couldn\'t get free socket in plug_socket())')
def build_stanza(self, socket):
'''
Builds a BOSH body tag from data in buffers and adds key, rid and ack
attributes to it.
This method is called from _do_send() of underlying transport. This is to
ensure rid and keys will be processed in correct order. If I generate them
before plugging a socket for write (and did it for two sockets/HTTP
connections) in parallel, they might be sent in wrong order, which results
in violating the BOSH session and server-side disconnect.
'''
if self.prio_bosh_stanzas:
stanza, add_payload = self.prio_bosh_stanzas.pop(0)
if add_payload:
@ -244,7 +263,6 @@ class NonBlockingBOSH(NonBlockingTransport):
log.info('sending msg with rid=%s to sock %s' % (stanza.getAttr('rid'), id(socket)))
#socket.send(stanza)
self.renew_bosh_wait_timeout(self.bosh_wait + 3)
return stanza
@ -266,8 +284,12 @@ class NonBlockingBOSH(NonBlockingTransport):
self.wait_cb_time)
def on_persistent_fallback(self, socket):
log.warn('Fallback to nonpersistent HTTP (no pipelining as well)')
'''
Called from underlying transport when server closes TCP connection.
:param socket: disconnected transport object
'''
if socket.http_persistent:
log.warn('Fallback to nonpersistent HTTP (no pipelining as well)')
socket.http_persistent = False
self.http_persistent = False
self.http_pipelining = False
@ -279,6 +301,9 @@ class NonBlockingBOSH(NonBlockingTransport):
def handle_body_attrs(self, stanza_attrs):
'''
Called for each incoming body stanza from dispatcher. Checks body attributes.
'''
self.remove_bosh_wait_timeout()
if self.after_init:
@ -315,11 +340,13 @@ class NonBlockingBOSH(NonBlockingTransport):
def append_stanza(self, stanza):
''' appends stanza to a buffer to send '''
if stanza:
if isinstance(stanza, tuple):
# stanza is tuple of BOSH stanza and bool value for whether to add payload
self.prio_bosh_stanzas.append(stanza)
else:
# stanza is XMPP stanza. Will be boshified before send.
self.stanza_buffer.append(stanza)
@ -391,7 +418,6 @@ class NonBlockingBOSH(NonBlockingTransport):
if self.use_proxy_auth:
http_dict['proxy_user'], http_dict['proxy_pass'] = self.proxy_creds
s = NonBlockingHTTPBOSH(
raise_event=self.raise_event,
on_disconnect=self.disconnect,
@ -402,6 +428,7 @@ class NonBlockingBOSH(NonBlockingTransport):
http_dict = http_dict,
proxy_dict = self.proxy_dict,
on_persistent_fallback = self.on_persistent_fallback)
s.onreceive(self.on_received_http)
s.set_stanza_build_cb(self.build_stanza)
return s
@ -439,6 +466,10 @@ def get_rand_number():
class AckChecker():
'''
Class for generating rids and generating and checking acknowledgements in
BOSH messages.
'''
def __init__(self):
self.rid = get_rand_number()
self.ack = 1
@ -481,6 +512,9 @@ class AckChecker():
class KeyStack():
'''
Class implementing key sequences for BOSH messages
'''
def __init__(self, count):
self.count = count
self.keys = []

View File

@ -16,11 +16,6 @@
# $Id: client.py,v 1.52 2006/01/02 19:40:55 normanr Exp $
'''
Provides Client classes implementations as examples of xmpppy structures usage.
These classes can be used for simple applications "AS IS" though.
'''
import socket
import transports_nb, dispatcher_nb, auth_nb, roster_nb, protocol, bosh
@ -32,16 +27,19 @@ import logging
log = logging.getLogger('gajim.c.x.client_nb')
class NBCommonClient:
''' Base for Client and Component classes.'''
class NonBlockingClient:
'''
Client class is XMPP connection mountpoint. Objects for authentication,
network communication, roster, xml parsing ... are plugged to client object.
Client implements the abstract behavior - mostly negotioation and callbacks
handling, whereas underlying modules take care of feature-specific logic.
'''
def __init__(self, domain, idlequeue, caller=None):
''' Caches connection data:
'''
Caches connection data:
:param domain: domain - for to: attribute (from account info)
:param idlequeue: processing idlequeue
:param port: port of listening XMPP server
:param caller: calling object - it has to implement certain methods (necessary?)
:param caller: calling object - it has to implement method _event_dispatcher
'''
self.Namespace = protocol.NS_CLIENT
self.defaultNamespace = self.Namespace
@ -62,19 +60,22 @@ class NBCommonClient:
self.on_connect_failure = None
self.proxy = None
self.got_features = False
self.stream_started = False
self.disconnecting = False
self.protocol_type = 'XMPP'
def on_disconnect(self):
def disconnect(self, message=''):
'''
Called on disconnection - when connect failure occurs on running connection
(after stream is successfully opened).
Calls disconnect handlers and cleans things up.
Called on disconnection - disconnect callback is picked based on state of the
client.
'''
self.connected=''
for i in reversed(self.disconnect_handlers):
log.debug('Calling disconnect handler %s' % i)
i()
# to avoid recursive calls
if self.disconnecting: return
log.warn('Disconnecting NBClient: %s' % message)
if self.__dict__.has_key('NonBlockingRoster'):
self.NonBlockingRoster.PlugOut()
if self.__dict__.has_key('NonBlockingBind'):
@ -89,7 +90,41 @@ class NBCommonClient:
self.NonBlockingHTTP.PlugOut()
if self.__dict__.has_key('NonBlockingBOSH'):
self.NonBlockingBOSH.PlugOut()
connected = self.connected
stream_started = self.stream_started
self.connected = ''
self.stream_started = False
self.disconnecting = True
log.debug('Client disconnected..')
if connected == '':
# if we're disconnecting before connection to XMPP sever is opened, we don't
# call disconnect handlers but on_connect_failure callback
if self.proxy:
# with proxy, we have different failure callback
log.debug('calling on_proxy_failure cb')
self.on_proxy_failure(reason=message)
else:
log.debug('ccalling on_connect_failure cb')
self.on_connect_failure()
else:
# we are connected to XMPP server
if not stream_started:
# if error occur before XML stream was opened, e.g. no response on init
# request, we call the on_connect_failure callback because proper
# connection is not estabilished yet and it's not a proxy issue
log.debug('calling on_connect_failure cb')
self.on_connect_failure()
else:
# with open connection, we are calling the disconnect handlers
for i in reversed(self.disconnect_handlers):
log.debug('Calling disconnect handler %s' % i)
i()
self.disconnecting = False
def connect(self, on_connect, on_connect_failure, hostname=None, port=5222,
@ -101,11 +136,14 @@ class NBCommonClient:
:param on_connect: called after stream is successfully opened
:param on_connect_failure: called when error occures during connection
:param on_proxy_failure: called if error occurres during TCP connection to
proxy server or during connection to the proxy
proxy server or during proxy connecting process
:param proxy: dictionary with proxy data. It should contain at least values
for keys 'host' and 'port' - connection details for proxy server and
optionally keys 'user' and 'pass' as proxy credentials
:param secure_tuple:
:param secure_tuple: tuple of (desired connection type, cacerts and mycerts)
connection type can be 'ssl' - TLS estabilished after TCP connection,
'tls' - TLS estabilished after negotiation with starttls, or 'plain'.
cacerts, mycerts - see tls_nb.NonBlockingTLS constructor for more details
'''
self.on_connect = on_connect
self.on_connect_failure=on_connect_failure
@ -113,16 +151,72 @@ class NBCommonClient:
self.secure, self.cacerts, self.mycerts = secure_tuple
self.Connection = None
self.Port = port
self.proxy = proxy
if hostname:
xmpp_hostname = hostname
else:
xmpp_hostname = self.Server
estabilish_tls = self.secure == 'ssl'
certs = (self.cacerts, self.mycerts)
proxy_dict = {}
tcp_host=xmpp_hostname
tcp_port=self.Port
if proxy:
# with proxies, client connects to proxy instead of directly to
# XMPP server ((hostname, port))
# tcp_host is hostname of machine used for socket connection
# (DNS request will be done for proxy or BOSH CM hostname)
tcp_host, tcp_port, proxy_user, proxy_pass = \
transports_nb.get_proxy_data_from_dict(proxy)
if proxy['type'] == 'bosh':
self.socket = bosh.NonBlockingBOSH(
on_disconnect = self.disconnect,
raise_event = self.raise_event,
idlequeue = self.idlequeue,
estabilish_tls = estabilish_tls,
certs = certs,
proxy_creds = (proxy_user, proxy_pass),
xmpp_server = (xmpp_hostname, self.Port),
domain = self.Server,
bosh_dict = proxy)
self.protocol_type = 'BOSH'
self.wait_for_restart_response = proxy['bosh_wait_for_restart_response']
def _resolve_hostname(self, hostname, port, on_success, on_failure):
''' wrapper of getaddinfo call. FIXME: getaddinfo blocks'''
else:
proxy_dict['type'] = proxy['type']
proxy_dict['xmpp_server'] = (xmpp_hostname, self.Port)
proxy_dict['credentials'] = (proxy_user, proxy_pass)
if not proxy or proxy['type'] != 'bosh':
self.socket = transports_nb.NonBlockingTCP(
on_disconnect = self.disconnect,
raise_event = self.raise_event,
idlequeue = self.idlequeue,
estabilish_tls = estabilish_tls,
certs = certs,
proxy_dict = proxy_dict)
self.socket.PlugIn(self)
self._resolve_hostname(
hostname=tcp_host,
port=tcp_port,
on_success=self._try_next_ip)
def _resolve_hostname(self, hostname, port, on_success):
''' wrapper for getaddinfo call. FIXME: getaddinfo blocks'''
try:
self.ip_addresses = socket.getaddrinfo(hostname,port,
socket.AF_UNSPEC,socket.SOCK_STREAM)
except socket.gaierror, (errnum, errstr):
on_failure('Lookup failure for %s:%s, hostname: %s - %s' %
self.disconnect(message= 'Lookup failure for %s:%s, hostname: %s - %s' %
(self.Server, self.Port, hostname, errstr))
else:
on_success()
@ -130,12 +224,13 @@ class NBCommonClient:
def _try_next_ip(self, err_message=None):
'''iterates over IP addresses from getaddinfo'''
'''iterates over IP addresses from getaddrinfo'''
if err_message:
log.debug('While looping over DNS A records: %s' % err_message)
if self.ip_addresses == []:
self._on_tcp_failure('Run out of hosts for name %s:%s' %
(self.Server, self.Port))
msg = 'Run out of hosts for name %s:%s.' % (self.Server, self.Port)
msg = msg + ' Error for last IP: %s' % err_message
self.disconnect(msg)
else:
self.current_ip = self.ip_addresses.pop(0)
self.socket.connect(
@ -152,19 +247,23 @@ class NBCommonClient:
return None
def _xmpp_connect(self, socket_type):
if socket_type == 'plain' and self.Connection.ssl_lib:
socket_type = 'ssl'
'''
Starts XMPP connecting process - opens the XML stream. Is called after TCP
connection is estabilished or after switch to TLS when successfully
negotiated with <starttls>.
'''
if socket_type == 'plain' and self.Connection.ssl_lib: socket_type = 'ssl'
self.connected = socket_type
self._xmpp_connect_machine()
def _xmpp_connect_machine(self, mode=None, data=None):
'''
Finite automaton called after TCP connecting. Takes care of stream opening
and features tag handling. Calls _on_stream_start when stream is
started, and _on_connect_failure on failure.
Finite automaton taking care of stream opening and features tag
handling. Calls _on_stream_start when stream is started, and disconnect()
on failure.
'''
log.info('-------------xmpp_connect_machine() >> mode: %s, data: %s' % (mode,str(data)[:20] ))
log.info('-------------xmpp_connect_machine() >> mode: %s, data: %s...' % (mode,str(data)[:20] ))
def on_next_receive(mode):
log.info('setting %s on next receive' % mode)
@ -182,7 +281,7 @@ class NBCommonClient:
on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES')
elif mode == 'FAILURE':
self._on_connect_failure(err_message='During XMPP connect: %s' % data)
self.disconnect('During XMPP connect: %s' % data)
elif mode == 'RECEIVE_DOCUMENT_ATTRIBUTES':
if data:
@ -219,8 +318,38 @@ class NBCommonClient:
elif mode == 'STREAM_STARTED':
self._on_stream_start()
def _on_stream_start(self):
'''
Called after XMPP stream is opened.
TLS negotiation may follow after esabilishing a stream.
'''
self.stream_started = True
self.onreceive(None)
if self.connected == 'plain':
if self.secure == 'plain':
# if we want plain connection, we're done now
self._on_connect()
return
if not self.Dispatcher.Stream.features.getTag('starttls'):
# if server doesn't advertise TLS in init response, we can't do more
log.warn('While connecting with type = "tls": TLS unsupported by remote server')
self._on_connect()
return
if self.incoming_stream_version() != '1.0':
# if stream version is less than 1.0, we can't do more
log.warn('While connecting with type = "tls": stream version is less than 1.0')
self._on_connect()
return
# otherwise start TLS negotioation
self.stream_started = False
log.info("TLS supported by remote server. Requesting TLS start.")
self._tls_negotiation_handler()
elif self.connected in ['ssl', 'tls']:
self._on_connect()
def _tls_negotiation_handler(self, con=None, tag=None):
''' takes care of TLS negotioation with <starttls> '''
log.info('-------------tls_negotiaton_handler() >> tag: %s' % tag)
if not con and not tag:
# starting state when we send the <starttls>
@ -232,72 +361,44 @@ class NBCommonClient:
else:
# we got <proceed> or <failure>
if tag.getNamespace() <> NS_TLS:
self._on_connect_failure('Unknown namespace: %s' % tag.getNamespace())
self.disconnect('Unknown namespace: %s' % tag.getNamespace())
return
tagname = tag.getName()
if tagname == 'failure':
self._on_connect_failure('TLS <failure> received: %s' % tag)
self.disconnect('TLS <failure> received: %s' % tag)
return
log.info('Got starttls proceed response. Switching to TLS/SSL...')
# following call wouldn't work for BOSH transport but it doesn't matter
# because TLS negotiation with BOSH is forbidden
# because <starttls> negotiation with BOSH is forbidden
self.Connection.tls_init(
on_succ = lambda: self._xmpp_connect(socket_type='tls'),
on_fail = lambda: self._on_connect_failure('error while etabilishing TLS'))
on_fail = lambda: self.disconnect('error while etabilishing TLS'))
def _on_stream_start(self):
'''Called when stream is opened. To be overriden in derived classes.'''
def _on_connect_failure(self, retry=None, err_message=None):
self.connected = ''
if err_message:
log.debug('While connecting: %s' % err_message)
if self.socket:
self.socket.disconnect()
self.on_connect_failure(retry)
def _on_connect(self):
''' preceeds call of on_connect callback '''
self.onreceive(None)
self.on_connect(self, self.connected)
def raise_event(self, event_type, data):
'''
raises event to connection instance - DATA_SENT and DATA_RECIVED events are
used in XML console to show XMPP traffic
'''
log.info('raising event from transport: >>>>>%s<<<<<\n_____________\n%s\n_____________\n' % (event_type,data))
if hasattr(self, 'Dispatcher'):
self.Dispatcher.Event('', event_type, data)
# moved from client.CommonClient (blocking client from xmpppy):
def RegisterDisconnectHandler(self,handler):
""" Register handler that will be called on disconnect."""
self.disconnect_handlers.append(handler)
def UnregisterDisconnectHandler(self,handler):
""" Unregister handler that is called on disconnect."""
self.disconnect_handlers.remove(handler)
def DisconnectHandler(self):
""" Default disconnect handler. Just raises an IOError.
If you choosed to use this class in your production client,
override this method or at least unregister it. """
raise IOError('Disconnected from server.')
def get_connect_type(self):
""" Returns connection state. F.e.: None / 'tls' / 'plain+non_sasl' . """
return self.connected
def get_peerhost(self):
''' get the ip address of the account, from which is made connection
to the server , (e.g. me).
We will create listening socket on the same ip '''
# FIXME: tuple (ip, port) is expected (and checked for) but port num is useless
return self.socket.peerhost
# follows code for authentication, resource bind, session and roster download
#
def auth(self, user, password, resource = '', sasl = 1, on_auth = None):
''' Authenticate connnection and bind resource. If resource is not provided
random one or library name used. '''
'''
Authenticate connnection and bind resource. If resource is not provided
random one or library name used.
'''
self._User, self._Password, self._Resource, self._sasl = user, password, resource, sasl
self.on_auth = on_auth
self._on_doc_attrs()
@ -318,7 +419,6 @@ class NBCommonClient:
self._Resource = 'xmpppy'
auth_nb.NonBlockingNonSASL(self._User, self._Password, self._Resource, self._on_old_auth).PlugIn(self)
return
#self.onreceive(self._on_start_sasl)
self.SASL.auth()
return True
@ -350,6 +450,7 @@ class NBCommonClient:
return
return
def _on_auth_bind(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
@ -387,106 +488,34 @@ class NBCommonClient:
class NonBlockingClient(NBCommonClient):
''' Example client class, based on CommonClient. '''
def __init__(self, domain, idlequeue, caller=None):
NBCommonClient.__init__(self, domain, idlequeue, caller)
self.protocol_type = 'XMPP'
# following methods are moved from blocking client class from xmpppy:
def RegisterDisconnectHandler(self,handler):
''' Register handler that will be called on disconnect.'''
self.disconnect_handlers.append(handler)
def connect(self, on_connect, on_connect_failure, hostname=None, port=5222,
on_proxy_failure=None, proxy=None, secure_tuple=None):
def UnregisterDisconnectHandler(self,handler):
''' Unregister handler that is called on disconnect.'''
self.disconnect_handlers.remove(handler)
NBCommonClient.connect(self, on_connect, on_connect_failure, hostname, port,
on_proxy_failure, proxy, secure_tuple)
if hostname:
xmpp_hostname = hostname
else:
xmpp_hostname = self.Server
estabilish_tls = self.secure == 'ssl'
certs = (self.cacerts, self.mycerts)
self._on_tcp_failure = self._on_connect_failure
proxy_dict = {}
tcp_host=xmpp_hostname
tcp_port=self.Port
if proxy:
# with proxies, client connects to proxy instead of directly to
# XMPP server ((hostname, port))
# tcp_host is hostname of machine used for socket connection
# (DNS request will be done for proxy or BOSH CM hostname)
tcp_host, tcp_port, proxy_user, proxy_pass = \
transports_nb.get_proxy_data_from_dict(proxy)
if proxy['type'] == 'bosh':
self.socket = bosh.NonBlockingBOSH(
on_disconnect = self.on_disconnect,
raise_event = self.raise_event,
idlequeue = self.idlequeue,
estabilish_tls = estabilish_tls,
certs = certs,
proxy_creds = (proxy_user, proxy_pass),
xmpp_server = (xmpp_hostname, self.Port),
domain = self.Server,
bosh_dict = proxy)
self.protocol_type = 'BOSH'
self.wait_for_restart_response = proxy['bosh_wait_for_restart_response']
else:
self._on_tcp_failure = self.on_proxy_failure
proxy_dict['type'] = proxy['type']
proxy_dict['xmpp_server'] = (xmpp_hostname, self.Port)
proxy_dict['credentials'] = (proxy_user, proxy_pass)
if not proxy or proxy['type'] != 'bosh':
self.socket = transports_nb.NonBlockingTCP(
on_disconnect = self.on_disconnect,
raise_event = self.raise_event,
idlequeue = self.idlequeue,
estabilish_tls = estabilish_tls,
certs = certs,
proxy_dict = proxy_dict)
self.socket.PlugIn(self)
self._resolve_hostname(
hostname=tcp_host,
port=tcp_port,
on_success=self._try_next_ip,
on_failure=self._on_tcp_failure)
def _on_stream_start(self):
def DisconnectHandler(self):
'''
Called after XMPP stream is opened.
In pure XMPP client, TLS negotiation may follow after esabilishing a stream.
Default disconnect handler. Just raises an IOError. If you choosed to use
this class in your production client, override this method or at least
unregister it.
'''
self.onreceive(None)
if self.connected == 'plain':
if self.secure == 'plain':
# if we want plain connection, we're done now
self._on_connect()
return
if not self.Dispatcher.Stream.features.getTag('starttls'):
# if server doesn't advertise TLS in init response, we can't do more
log.warn('While connecting with type = "tls": TLS unsupported by remote server')
self._on_connect()
return
if self.incoming_stream_version() != '1.0':
# if stream version is less than 1.0, we can't do more
log.warn('While connecting with type = "tls": stream version is less than 1.0')
self._on_connect()
return
# otherwise start TLS
log.info("TLS supported by remote server. Requesting TLS start.")
self._tls_negotiation_handler()
elif self.connected in ['ssl', 'tls']:
self._on_connect()
raise IOError('Disconnected from server.')
def get_connect_type(self):
''' Returns connection state. F.e.: None / 'tls' / 'plain+non_sasl'. '''
return self.connected
def get_peerhost(self):
'''
Gets the ip address of the account, from which is made connection to the
server , (e.g. IP and port of gajim's socket. We will create listening socket
on the same ip
'''
# FIXME: tuple (ip, port) is expected (and checked for) but port num is
# useless
return self.socket.peerhost

View File

@ -19,6 +19,7 @@ import struct, socket, base64
'''
Module containing classes for proxy connecting. So far its HTTP CONNECT
and SOCKS5 proxy.
Authentication to NTLM (Microsoft implementation) proxies can be next.
'''
import logging

View File

@ -3,6 +3,7 @@
##
## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov
## modified by Dimitur Kirov <dkirov@gmail.com>
## modified by Tomas Karasek <tom.to.the.k@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
@ -190,7 +191,7 @@ class PyOpenSSLWrapper(SSLWrapper):
return 0
class StdlibSSLWrapper(SSLWrapper):
'''Wrapper class for Python's socket.ssl read() and write() methods'''
'''Wrapper class for Python socket.ssl read() and write() methods'''
def __init__(self, *args):
self.parent = SSLWrapper
@ -221,6 +222,10 @@ class NonBlockingTLS(PlugIn):
''' TLS connection used to encrypts already estabilished tcp connection.'''
def __init__(self, cacerts, mycerts):
'''
:param cacerts: path to pem file with certificates of known XMPP servers
:param mycerts: path to pem file with certificates of user trusted servers
'''
PlugIn.__init__(self)
self.cacerts = cacerts
self.mycerts = mycerts
@ -239,11 +244,9 @@ class NonBlockingTLS(PlugIn):
log.info('Starting TLS estabilishing')
PlugIn.PlugIn(self, owner)
try:
self._owner._plug_idle(writable=False, readable=False)
res = self._startSSL()
except Exception, e:
log.error("PlugIn: while trying _startSSL():", exc_info=True)
#traceback.print_exc()
return False
return res
@ -278,7 +281,6 @@ class NonBlockingTLS(PlugIn):
if result:
log.debug("Synchronous handshake completed")
self._owner._plug_idle(writable=True, readable=False)
return True
else:
return False
@ -361,7 +363,6 @@ class NonBlockingTLS(PlugIn):
def _ssl_verify_callback(self, sslconn, cert, errnum, depth, ok):
# Exceptions can't propagate up through this callback, so print them here.
try:
print 'in ssl verify callback'
self._owner.ssl_fingerprint_sha1 = cert.digest('sha1')
if errnum == 0:
return True

View File

@ -84,11 +84,28 @@ CONNECTING = 'CONNECTING'
PROXY_CONNECTING = 'PROXY_CONNECTING'
CONNECTED = 'CONNECTED'
STATES = [DISCONNECTED, CONNECTING, PROXY_CONNECTING, CONNECTED, DISCONNECTING]
# transports have different arguments in constructor and same in connect()
# method
# Transports have different arguments in constructor and same in connect()
# method.
class NonBlockingTransport(PlugIn):
'''
Abstract class representing a trasport - object responsible for connecting to
XMPP server and putting stanzas on wire in desired form.
'''
def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs):
'''
Each trasport class can have different constructor but it has to have at
least all the arguments of NonBlockingTransport constructor.
:param raise_event: callback for monitoring of sent and received data
:param on_disconnect: callback called on disconnection during runtime
:param idlequeue: processing idlequeue
:param estabilish_tls: boolean whether to estabilish TLS connection after TCP
connection is done
:param certs: tuple of (cacerts, mycerts) see tls_nb.NonBlockingTLS
constructor for more details
'''
PlugIn.__init__(self)
self.raise_event = raise_event
self.on_disconnect = on_disconnect
@ -103,7 +120,7 @@ class NonBlockingTransport(PlugIn):
self.certs = certs
# type of used ssl lib (if any) will be assigned to this member var
self.ssl_lib = None
self._exported_methods=[self.disconnect, self.onreceive, self.set_send_timeout,
self._exported_methods=[self.onreceive, self.set_send_timeout,
self.set_timeout, self.remove_timeout, self.start_disconnect]
# time to wait for SOME stanza to come and then send keepalive
@ -118,10 +135,15 @@ class NonBlockingTransport(PlugIn):
def plugout(self):
self._owner.Connection = None
self._owner = None
self.disconnect(do_callback=False)
def connect(self, conn_5tuple, on_connect, on_connect_failure):
'''
connect method should have the same declaration in all derived transports
Creates and connects transport to server and port defined in conn_5tupe which
should be item from list returned from getaddrinfo.
:param conn_5tuple: 5-tuple returned from getaddrinfo
:param on_connect: callback called on successful connect to the server
:param on_connect_failure: callback called on failure when connecting
'''
self.on_connect = on_connect
self.on_connect_failure = on_connect_failure
@ -164,8 +186,13 @@ class NonBlockingTransport(PlugIn):
self.on_disconnect()
def onreceive(self, recv_handler):
''' Sets the on_receive callback. Do not confuse it with
on_receive() method, which is the callback itself.'''
'''
Sets the on_receive callback. Do not confuse it with on_receive() method,
which is the callback itself.
onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is the
default one that will decide what to do with received stanza based on its
tag name and namespace.
'''
if not recv_handler:
if hasattr(self._owner, 'Dispatcher'):
self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
@ -176,9 +203,9 @@ class NonBlockingTransport(PlugIn):
def tcp_connecting_started(self):
self.set_state(CONNECTING)
# on_connect/on_conn_failure will be called from self.pollin/self.pollout
def read_timeout(self):
''' called when there's no response from server in defined timeout '''
if self.on_timeout:
self.on_timeout()
self.renew_send_timeout()
@ -212,12 +239,13 @@ class NonBlockingTransport(PlugIn):
class NonBlockingTCP(NonBlockingTransport, IdleObject):
'''
Non-blocking TCP socket wrapper
Non-blocking TCP socket wrapper. It is used for simple XMPP connection. Can be
connected via proxy and can estabilish TLS connection.
'''
def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs,
proxy_dict=None):
'''
Class constructor.
:param proxy_dict: dictionary with proxy data as loaded from config file
'''
NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue,
estabilish_tls, certs)
@ -227,7 +255,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
# bytes remained from the last send message
self.sendbuff = ''
self.proxy_dict = proxy_dict
self.on_remote_disconnect = self.disconnect()
self.on_remote_disconnect = self.disconnect
def start_disconnect(self):
@ -236,14 +264,6 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self.disconnect()
def connect(self, conn_5tuple, on_connect, on_connect_failure):
'''
Creates and connects socket to server and port defined in conn_5tupe which
should be list item returned from getaddrinfo.
:param conn_5tuple: 5-tuple returned from getaddrinfo
:param on_connect: callback called on successful tcp connection
:param on_connect_failure: callback called on failure when estabilishing tcp
connection
'''
NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure)
log.info('NonBlockingTCP Connect :: About to connect to %s:%s' % (self.server, self.port))
@ -258,12 +278,13 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._recv = self._sock.recv
self.fd = self._sock.fileno()
# we want to be notified when send is possible to connected socket
# we want to be notified when send is possible to connected socket because
# it means the TCP connection is estabilished
self._plug_idle(writable=True, readable=False)
self.peerhost = None
#variable for errno symbol that will be found from exception raised from connect()
errnum = 0
''' variable for errno symbol that will be found from exception raised from connect() '''
# set timeout for TCP connecting - if nonblocking connect() fails, pollend
# is called. If if succeeds pollout is called.
@ -280,15 +301,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
log.info('After NB connect() of %s. "%s" raised => CONNECTING' % (id(self),errstr))
self.tcp_connecting_started()
return
elif errnum in (0, 10056, errno.EISCONN):
# already connected - this branch is probably useless, nonblocking connect() will
# return EINPROGRESS exception in most cases. When here, we don't need timeout
# on connected descriptor and success callback can be called.
log.info('After connect. "%s" raised => CONNECTED' % errstr)
self._on_connect(self)
return
# if there was some other error, call failure callback and unplug transport
# if there was some other exception, call failure callback and unplug transport
# which will also remove read_timeouts for descriptor
self._on_connect_failure('Exception while connecting to %s:%s - %s %s' %
(self.server, self.port, errnum, errstr))
@ -312,8 +326,8 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
def _on_connect(self):
'''
Preceeds invoking of on_connect callback. TCP connection is estabilished at
this time.
Preceeds invoking of on_connect callback. TCP connection is already
estabilished by this this time.
'''
if self.estabilish_tls:
self.tls_init(
@ -324,6 +338,9 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
def tls_init(self, on_succ, on_fail):
'''
Estabilishes a TLS/SSL on TCP connection by plugging a NonBlockingTLS module
'''
cacerts, mycerts = self.certs
result = tls_nb.NonBlockingTLS(cacerts, mycerts).PlugIn(self)
if result: on_succ()
@ -342,6 +359,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
if self.get_state()==CONNECTING:
log.info('%s socket wrapper connected' % id(self))
self.idlequeue.remove_timeout(self.fd)
self._plug_idle(writable=False, readable=False)
self.peerhost = self._sock.getsockname()
if self.proxy_dict: self._connect_to_proxy()
else: self._on_connect()
@ -349,6 +367,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._do_send()
def pollend(self):
'''called on error on TCP connection'''
log.info('pollend called, state == %s' % self.get_state())
if self.get_state()==CONNECTING:
@ -358,8 +377,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self.disconnect()
def disconnect(self, do_callback=True):
if self.get_state() == DISCONNECTED:
return
if self.get_state() == DISCONNECTED: return
self.set_state(DISCONNECTED)
self.idlequeue.unplug_idle(self.fd)
if self.__dict__.has_key('NonBlockingTLS'): self.NonBlockingTLS.PlugOut()
@ -367,14 +385,12 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close()
except socket.error, (errnum, errstr):
log.error('Error while disconnecting a socket: %s %s' % (errnum,errstr))
log.error('Error while disconnecting socket: %s' % errstr)
self.fd = -1
NonBlockingTransport.disconnect(self, do_callback)
def read_timeout(self):
'''
Implemntation of IdleObject function called on timeouts from IdleQueue.
'''
''' method called when timeout passed '''
log.warn('read_timeout called, state == %s' % self.get_state())
if self.get_state()==CONNECTING:
# if read_timeout is called during connecting, connect() didn't end yet
@ -403,11 +419,9 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
If supplied data is unicode string, encode it to utf-8.
'''
NonBlockingTransport.send(self, raw_data, now)
r = raw_data
if isinstance(r, unicode):
r = r.encode('utf-8')
elif not isinstance(r, str):
r = ustr(r).encode('utf-8')
r = self.encode_stanza(raw_data)
if now:
self.sendqueue.insert(0, r)
self._do_send()
@ -416,6 +430,12 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
self._plug_idle(writable=True, readable=True)
def encode_stanza(self, stanza):
if isinstance(stanza, unicode):
stanza = stanza.encode('utf-8')
elif not isinstance(stanza, str):
stanza = ustr(stanza).encode('utf-8')
return stanza
def _plug_idle(self, writable, readable):
@ -433,12 +453,14 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
def _do_send(self):
'''
Called when send() to connected socket is possible. First message from
sendqueue will be sent.
'''
if not self.sendbuff:
if not self.sendqueue:
log.warn('calling send on empty buffer and queue')
self._plug_idle(
writable= ((self.sendqueue!=[]) or (self.sendbuff!='')),
readable=True)
self._plug_idle(writable=False, readable=True)
return None
self.sendbuff = self.sendqueue.pop(0)
try:
@ -472,12 +494,15 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
log.info("_do_receive, caught SSL error, got %s:" % received , exc_info=True)
errnum, errstr = e.exc
if received == '': errstr = 'zero bytes on recv'
if (self.ssl_lib is None and received == '') or \
(self.ssl_lib == tls_nb.PYSTDLIB and errnum == 8 ) or \
(self.ssl_lib == tls_nb.PYOPENSSL and errnum == -1 ):
# 8 in stdlib: errstr == EOF occured in violation of protocol
# -1 in pyopenssl: errstr == Unexpected EOF
log.info("Disconnected by remote server: %s %s" % (errnum, errstr), exc_info=True)
log.info("Disconnected by remote server: #%s, %s" % (errnum, errstr))
print self.on_remote_disconnect
self.on_remote_disconnect()
return
@ -489,8 +514,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject):
# this branch is for case of non-fatal SSL errors - None is returned from
# recv() but no errnum is set
if received is None:
return
if received is None: return
# we have received some bytes, stop the timeout!
self.renew_send_timeout()
@ -519,6 +543,13 @@ class NonBlockingHTTP(NonBlockingTCP):
def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, certs,
on_http_request_possible, on_persistent_fallback, http_dict, proxy_dict = None):
'''
:param on_http_request_possible: method to call when HTTP request to socket
owned by transport is possible.
:param on_persistent_fallback: callback called when server ends TCP
connection. It doesn't have to be fatal for HTTP session.
:param http_dict: dictionary with data for HTTP request and headers
'''
NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue,
estabilish_tls, certs, proxy_dict)
@ -551,8 +582,10 @@ class NonBlockingHTTP(NonBlockingTCP):
def _on_receive(self,data):
'''Preceeds passing received data to owner class. Gets rid of HTTP headers
and checks them.'''
'''
Preceeds passing received data to owner class. Gets rid of HTTP headers and
checks them.
'''
if self.get_state() == PROXY_CONNECTING:
NonBlockingTCP._on_receive(self, data)
return
@ -648,7 +681,10 @@ class NonBlockingHTTP(NonBlockingTCP):
class NonBlockingHTTPBOSH(NonBlockingHTTP):
'''
Class for BOSH HTTP connections. Slightly redefines HTTP transport by calling
bosh bodytag generating callback before putting data on wire.
'''
def set_stanza_build_cb(self, build_cb):
self.build_cb = build_cb
@ -659,24 +695,10 @@ class NonBlockingHTTPBOSH(NonBlockingHTTP):
return
if not self.sendbuff:
stanza = self.build_cb(socket=self)
stanza = self.encode_stanza(stanza)
stanza = self.build_http_message(httpbody=stanza)
if isinstance(stanza, unicode):
stanza = stanza.encode('utf-8')
elif not isinstance(stanza, str):
stanza = ustr(stanza).encode('utf-8')
self.sendbuff = stanza
try:
send_count = self._send(self.sendbuff)
if send_count:
sent_data = self.sendbuff[:send_count]
self.sendbuff = self.sendbuff[send_count:]
self._plug_idle(writable = self.sendbuff != '', readable = True)
self.raise_event(DATA_SENT, sent_data)
except socket.error, e:
log.error('_do_send:', exc_info=True)
traceback.print_exc()
self.disconnect()
NonBlockingTCP._do_send(self)