- Refactored non-blocking transport and client classes - getaddrinfo is called

in Client now
- Added NonBlockingHttpBOSH transport (to tranports_nb) and BOSHClient
(to client_nb)
- Extended possible proxy types in configuration by "BOSH" proxy
- Rewrote NonBlockingTLS to invoke success callback only after successful TLS handshake is over (formerly, the TLS Plugin returned right after sending <starttls>)
This commit is contained in:
tomk 2008-06-30 00:02:32 +00:00
parent 65644ca13f
commit f3820706fb
13 changed files with 1555 additions and 1725 deletions

View File

@ -211,7 +211,8 @@
<widget class="GtkComboBox" id="proxytype_combobox">
<property name="visible">True</property>
<property name="items" translatable="yes">HTTP Connect
SOCKS5</property>
SOCKS5
BOSH</property>
<property name="add_tearoffs">False</property>
<property name="focus_on_click">True</property>
<signal name="changed" handler="on_proxytype_combobox_changed" last_modification_time="Wed, 08 Jun 2005 17:45:26 GMT"/>

View File

@ -55,6 +55,7 @@ from common.rst_xhtml_generator import create_xhtml
from string import Template
import logging
log = logging.getLogger('gajim.c.connection')
log.setLevel(logging.DEBUG)
ssl_error = {
2: _("Unable to get issuer certificate"),
@ -207,7 +208,7 @@ class Connection(ConnectionHandlers):
def _disconnectedReconnCB(self):
'''Called when we are disconnected'''
log.debug('disconnectedReconnCB')
log.error('disconnectedReconnCB')
if gajim.account_is_connected(self.name):
# we cannot change our status to offline or connecting
# after we auth to server
@ -467,7 +468,6 @@ class Connection(ConnectionHandlers):
proxy = None
else:
proxy = None
h = hostname
p = 5222
ssl_p = 5223
@ -504,7 +504,7 @@ class Connection(ConnectionHandlers):
self.connect_to_next_host()
def on_proxy_failure(self, reason):
log.debug('Connection to proxy failed')
log.error('Connection to proxy failed: %s' % reason)
self.time_to_reconnect = None
self.on_connect_failure = None
self.disconnect(on_purpose = True)
@ -519,23 +519,6 @@ class Connection(ConnectionHandlers):
self.last_connection.socket.disconnect()
self.last_connection = None
self.connection = None
if gajim.verbose:
con = common.xmpp.NonBlockingClient(self._hostname, caller = self,
on_connect = self.on_connect_success,
on_proxy_failure = self.on_proxy_failure,
on_connect_failure = self.connect_to_next_type)
else:
con = common.xmpp.NonBlockingClient(self._hostname, debug = [],
caller = self, on_connect = self.on_connect_success,
on_proxy_failure = self.on_proxy_failure,
on_connect_failure = self.connect_to_next_type)
self.last_connection = con
# increase default timeout for server responses
common.xmpp.dispatcher_nb.DEFAULT_TIMEOUT_SECONDS = self.try_connecting_for_foo_secs
con.set_idlequeue(gajim.idlequeue)
# FIXME: this is a hack; need a better way
if self.on_connect_success == self._on_new_account:
con.RegisterDisconnectHandler(self._on_new_account)
if self._current_type == 'ssl':
port = self._current_host['ssl_port']
@ -546,9 +529,40 @@ class Connection(ConnectionHandlers):
secur = 0
else:
secur = None
if self._proxy and self._proxy['type'] == 'bosh':
clientClass = common.xmpp.BOSHClient
else:
clientClass = common.xmpp.NonBlockingClient
if gajim.verbose:
con = common.xmpp.NonBlockingClient(
hostname=self._current_host['host'],
port=port,
caller=self,
idlequeue=gajim.idlequeue)
else:
con = common.xmpp.NonBlockingClient(
hostname=self._current_host['host'],
debug=[],
port=port,
caller=self,
idlequeue=gajim.idlequeue)
self.last_connection = con
# increase default timeout for server responses
common.xmpp.dispatcher_nb.DEFAULT_TIMEOUT_SECONDS = self.try_connecting_for_foo_secs
# FIXME: this is a hack; need a better way
if self.on_connect_success == self._on_new_account:
con.RegisterDisconnectHandler(self._on_new_account)
log.info('Connecting to %s: [%s:%d]', self.name,
self._current_host['host'], port)
con.connect((self._current_host['host'], port), proxy=self._proxy,
con.connect(
on_connect=self.on_connect_success,
on_proxy_failure=self.on_proxy_failure,
on_connect_failure=self.connect_to_next_type,
proxy=self._proxy,
secure = secur)
else:
self.connect_to_next_host(retry)
@ -561,6 +575,9 @@ class Connection(ConnectionHandlers):
'connection_types').split()
else:
self._connection_types = ['tls', 'ssl', 'plain']
# FIXME: remove after tls and ssl will be degubbed
#self._connection_types = ['plain']
host = self.select_next_host(self._hosts)
self._current_host = host
self._hosts.remove(host)
@ -975,7 +992,11 @@ class Connection(ConnectionHandlers):
p.setStatus(msg)
self.remove_all_transfers()
self.time_to_reconnect = None
self.connection.start_disconnect(p, self._on_disconnected)
self.connection.RegisterDisconnectHandler(self._on_disconnected)
self.connection.send(p)
self.connection.StreamTerminate()
#self.connection.start_disconnect(p, self._on_disconnected)
else:
self.time_to_reconnect = None
self._on_disconnected()
@ -1010,7 +1031,7 @@ class Connection(ConnectionHandlers):
def _on_disconnected(self):
''' called when a disconnect request has completed successfully'''
self.dispatch('STATUS', 'offline')
self.disconnect()
self.disconnect(on_purpose=True)
def get_status(self):
return STATUS_LIST[self.connected]

View File

@ -169,6 +169,9 @@ class SASL(PlugIn):
self.startsasl='success'
self.DEBUG('Successfully authenticated with remote server.', 'ok')
handlers=self._owner.Dispatcher.dumpHandlers()
print '6' * 79
print handlers
print '6' * 79
self._owner.Dispatcher.PlugOut()
dispatcher_nb.Dispatcher().PlugIn(self._owner)
self._owner.Dispatcher.restoreHandlers(handlers)

View File

@ -17,40 +17,61 @@
# $Id: client.py,v 1.52 2006/01/02 19:40:55 normanr Exp $
'''
Provides PlugIn class functionality to develop extentions for xmpppy.
Also provides Client and Component classes implementations as the
examples of xmpppy structures usage.
Provides Client classes implementations as examples of xmpppy structures usage.
These classes can be used for simple applications "AS IS" though.
'''
import socket
import debug
import random
import transports_nb, dispatcher_nb, auth_nb, roster_nb
import transports_nb, dispatcher_nb, auth_nb, roster_nb, protocol
from client import *
import logging
log = logging.getLogger('gajim.c.x.client_nb')
consoleloghandler = logging.StreamHandler()
consoleloghandler.setLevel(logging.DEBUG)
consoleloghandler.setFormatter(
logging.Formatter('%(levelname)s: %(message)s')
)
log.setLevel(logging.DEBUG)
log.addHandler(consoleloghandler)
log.propagate = False
class NBCommonClient:
''' Base for Client and Component classes.'''
def __init__(self, server, port=5222, debug=['always', 'nodebuilder'], caller=None,
on_connect=None, on_proxy_failure=None, on_connect_failure=None):
''' Caches server name and (optionally) port to connect to. "debug" parameter specifies
the debug IDs that will go into debug output. You can either specifiy an "include"
or "exclude" list. The latter is done via adding "always" pseudo-ID to the list.
Full list: ['nodebuilder', 'dispatcher', 'gen_auth', 'SASL_auth', 'bind', 'socket',
'CONNECTproxy', 'TLS', 'roster', 'browser', 'ibb'] . '''
def __init__(self, hostname, idlequeue, port=5222, debug=['always', 'nodebuilder'], caller=None):
if isinstance(self, NonBlockingClient):
self.Namespace, self.DBG = 'jabber:client', DBG_CLIENT
elif isinstance(self, NBCommonClient):
self.Namespace, self.DBG = dispatcher_nb.NS_COMPONENT_ACCEPT, DBG_COMPONENT
''' Caches connection data:
:param hostname: hostname of machine where the XMPP server is running (from Account
of from SRV request) and port to connect to.
:param idlequeue: processing idlequeue
:param port: port of listening XMPP server
:param debug: specifies the debug IDs that will go into debug output. You can either
specifiy an "include" or "exclude" list. The latter is done via adding "always"
pseudo-ID to the list. Full list: ['nodebuilder', 'dispatcher', 'gen_auth',
'SASL_auth', 'bind', 'socket', 'CONNECTproxy', 'TLS', 'roster', 'browser', 'ibb'].
TODO: get rid of debug.py using
:param caller: calling object - it has to implement certain methods (necessary?)
'''
self.DBG = DBG_CLIENT
self.Namespace = protocol.NS_CLIENT
self.idlequeue = idlequeue
self.defaultNamespace = self.Namespace
self.disconnect_handlers = []
self.Server = server
# XMPP server and port from account or SRV
self.Server = hostname
self.Port = port
# Who initiated this client
# Used to register the EventDispatcher
# caller is who initiated this client, it is sed to register the EventDispatcher
self._caller = caller
if debug and type(debug) != list:
debug = ['always', 'nodebuilder']
@ -62,20 +83,24 @@ class NBCommonClient:
self._registered_name = None
self.connected = ''
self._component=0
self.idlequeue = None
self.socket = None
self.on_connect = on_connect
self.on_proxy_failure = on_proxy_failure
self.on_connect_failure = on_connect_failure
self.on_connect = None
self.on_proxy_failure = None
self.on_connect_failure = None
self.proxy = None
def set_idlequeue(self, idlequeue):
self.idlequeue = idlequeue
def disconnected(self):
''' Called on disconnection. Calls disconnect handlers and cleans things up. '''
def on_disconnect(self):
'''
Called on disconnection - when connect failure occurs on running connection
(after stream is successfully opened).
Calls disconnect handlers and cleans things up.
'''
self.connected=''
self.DEBUG(self.DBG,'Disconnect detected','stop')
for i in reversed(self.disconnect_handlers):
self.DEBUG(self.DBG, 'Calling disc handler %s' % i, 'stop')
i()
if self.__dict__.has_key('NonBlockingRoster'):
self.NonBlockingRoster.PlugOut()
@ -94,96 +119,201 @@ class NBCommonClient:
if self.__dict__.has_key('NonBlockingTcp'):
self.NonBlockingTcp.PlugOut()
def reconnectAndReauth(self):
''' Just disconnect. We do reconnecting in connection.py '''
self.disconnect()
return ''
def connect(self,server=None,proxy=None, ssl=None, on_stream_start = None):
''' Make a tcp/ip connection, protect it with tls/ssl if possible and start XMPP stream. '''
if not server:
server = (self.Server, self.Port)
self._Server, self._Proxy, self._Ssl = server , proxy, ssl
self.on_stream_start = on_stream_start
def send(self, stanza, is_message = False, now = False):
''' interface for putting stanzas on wire. Puts ID to stanza if needed and
sends it via socket wrapper'''
(id, stanza_to_send) = self.Dispatcher.assign_id(stanza)
if is_message:
# somehow zeroconf-specific
self.Connection.send(stanza_to_send, True, now = now)
else:
self.Connection.send(stanza_to_send, now = now)
return id
def connect(self, on_connect, on_connect_failure, on_proxy_failure=None, proxy=None, secure=None):
'''
Open XMPP connection (open streams in both directions).
:param on_connect: called after stream is successfully opened
:param on_connect_failure: called when error occures during connection
:param on_proxy_failure: called if error occurres during TCP connection to
proxy server or during connection to the proxy
:param proxy: dictionary with proxy data. It should contain at least values
for keys 'host' and 'port' - connection details for proxy server and
optionally keys 'user' and 'pass' as proxy credentials
:param secure:
'''
self.on_connect = on_connect
self.on_connect_failure=on_connect_failure
self.on_proxy_failure = on_proxy_failure
self._secure = secure
self.Connection = None
if proxy:
# with proxies, client connects to proxy instead of directly to
# XMPP server from __init__.
# tcp_server is hostname used for socket connecting
tcp_server=proxy['host']
tcp_port=proxy['port']
self._on_tcp_failure = self.on_proxy_failure
if proxy.has_key('type'):
if proxy.has_key('user') and proxy.has_key('pass'):
proxy_creds=(proxy['user'],proxy['pass'])
else:
proxy_creds=(None, None)
type_ = proxy['type']
if type_ == 'socks5':
self.socket = transports_nb.NBSOCKS5PROXYsocket(
self._on_connected, self._on_proxy_failure,
self._on_connected_failure, proxy, server)
self.socket = transports_nb.NBSOCKS5ProxySocket(
on_disconnect=self.on_disconnect,
proxy_creds=proxy_creds,
xmpp_server=(self.Server, self.Port))
elif type_ == 'http':
self.socket = transports_nb.NBHTTPPROXYsocket(self._on_connected,
self._on_proxy_failure, self._on_connected_failure, proxy,
server)
self.socket = transports_nb.NBHTTPProxySocket(
on_disconnect=self.on_disconnect,
proxy_creds=proxy_creds,
xmpp_server=(self.Server, self.Port))
elif type_ == 'bosh':
tcp_server = transports_nb.urisplit(tcp_server)[1]
self.socket = transports_nb.NonBlockingHttpBOSH(
on_disconnect=self.on_disconnect,
bosh_uri = proxy['host'],
bosh_port = tcp_port)
else:
self.socket = transports_nb.NBHTTPPROXYsocket(self._on_connected,
self._on_proxy_failure, self._on_connected_failure, proxy,
server)
self.socket = transports_nb.NBHTTPProxySocket(
on_disconnect=self.on_disconnect,
proxy_creds=(None, None),
xmpp_server=(self.Server, self.Port))
else:
self.connected = 'tcp'
self.socket = transports_nb.NonBlockingTcp(self._on_connected,
self._on_connected_failure, server)
self._on_tcp_failure = self._on_connect_failure
tcp_server=self.Server
tcp_port=self.Port
self.socket = transports_nb.NonBlockingTcp(on_disconnect = self.on_disconnect)
self.socket.PlugIn(self)
return True
self._resolve_hostname(
hostname=tcp_server,
port=tcp_port,
on_success=self._try_next_ip,
on_failure=self._on_tcp_failure)
def _resolve_hostname(self, hostname, port, on_success, on_failure):
''' wrapper of getaddinfo call. FIXME: getaddinfo blocks'''
try:
self.ip_addresses = socket.getaddrinfo(hostname,port,
socket.AF_UNSPEC,socket.SOCK_STREAM)
except socket.gaierror, (errnum, errstr):
on_failure(err_message='Lookup failure for %s:%s - %s %s' %
(self.Server, self.Port, errnum, errstr))
else:
on_success()
def get_attrs(self, on_stream_start):
self.on_stream_start = on_stream_start
self.onreceive(self._on_receive_document_attrs)
def _try_next_ip(self, err_message=None):
'''iterates over IP addresses from getaddinfo'''
if err_message:
self.DEBUG(self.DBG,err_message,'connect')
if self.ip_addresses == []:
self._on_tcp_failure(err_message='Run out of hosts for name %s:%s' %
(self.Server, self.Port))
else:
self.current_ip = self.ip_addresses.pop(0)
self.socket.connect(
conn_5tuple=self.current_ip,
on_connect=lambda: self._xmpp_connect(socket_type='tcp'),
on_connect_failure=self._try_next_ip)
def _on_proxy_failure(self, reason):
if self.on_proxy_failure:
self.on_proxy_failure(reason)
def _on_connected_failure(self, retry = None):
def incoming_stream_version(self):
''' gets version of xml stream'''
if self.Dispatcher.Stream._document_attrs.has_key('version'):
return self.Dispatcher.Stream._document_attrs['version']
else:
return None
def _xmpp_connect(self, socket_type):
self.connected = socket_type
self._xmpp_connect_machine()
def _xmpp_connect_machine(self, mode=None, data=None):
'''
Finite automaton called after TCP connecting. Takes care of stream opening
and features tag handling. Calls _on_stream_start when stream is
started, and _on_connect_failure on failure.
'''
#FIXME: use RegisterHandlerOnce instead of onreceive
log.info('=============xmpp_connect_machine() >> mode: %s, data: %s' % (mode,data))
def on_next_receive(mode):
if mode is None:
self.onreceive(None)
else:
self.onreceive(lambda data:self._xmpp_connect_machine(mode, data))
if not mode:
dispatcher_nb.Dispatcher().PlugIn(self)
on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES')
elif mode == 'FAILURE':
self._on_connect_failure(err_message='During XMPP connect: %s' % data)
elif mode == 'RECEIVE_DOCUMENT_ATTRIBUTES':
if data:
self.Dispatcher.ProcessNonBlocking(data)
if not hasattr(self, 'Dispatcher') or \
self.Dispatcher.Stream._document_attrs is None:
self._xmpp_connect_machine(
mode='FAILURE',
data='Error on stream open')
if self.incoming_stream_version() == '1.0':
if not self.Dispatcher.Stream.features:
on_next_receive('RECEIVE_STREAM_FEATURES')
else:
self._xmpp_connect_machine(mode='STREAM_STARTED')
else:
self._xmpp_connect_machine(mode='STREAM_STARTED')
elif mode == 'RECEIVE_STREAM_FEATURES':
if data:
# sometimes <features> are received together with document
# attributes and sometimes on next receive...
self.Dispatcher.ProcessNonBlocking(data)
if not self.Dispatcher.Stream.features:
self._xmpp_connect_machine(
mode='FAILURE',
data='Missing <features> in 1.0 stream')
else:
self._xmpp_connect_machine(mode='STREAM_STARTED')
elif mode == 'STREAM_STARTED':
self._on_stream_start()
def _on_stream_start(self):
'''Called when stream is opened. To be overriden in derived classes.'''
def _on_connect_failure(self, retry=None, err_message=None):
self.connected = None
if err_message:
self.DEBUG(self.DBG, err_message, 'connecting')
if self.socket:
self.socket.disconnect()
if self.on_connect_failure:
self.on_connect_failure(retry)
self.on_connect_failure(retry)
def _on_connect(self):
self.onreceive(None)
self.on_connect(self, self.connected)
def _on_connected(self):
# FIXME: why was this needed? Please note that we're working
# in nonblocking mode, and this handler is actually called
# as soon as connection is initiated, NOT when connection
# succeeds, as the name suggests.
# # connect succeeded, so no need of this callback anymore
# self.on_connect_failure = None
self.connected = 'tcp'
if self._Ssl:
transports_nb.NonBlockingTLS().PlugIn(self, now=1)
if not self.Connection: # ssl error, stream is closed
return
self.connected = 'ssl'
self.onreceive(self._on_receive_document_attrs)
dispatcher_nb.Dispatcher().PlugIn(self)
def _on_receive_document_attrs(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if not hasattr(self, 'Dispatcher') or \
self.Dispatcher.Stream._document_attrs is None:
return
self.onreceive(None)
if self.Dispatcher.Stream._document_attrs.has_key('version') and \
self.Dispatcher.Stream._document_attrs['version'] == '1.0':
self.onreceive(self._on_receive_stream_features)
return
if self.on_stream_start:
self.on_stream_start()
self.on_stream_start = None
return True
def _on_receive_stream_features(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if not self.Dispatcher.Stream.features:
return
# pass # If we get version 1.0 stream the features tag MUST BE presented
self.onreceive(None)
if self.on_stream_start:
self.on_stream_start()
self.on_stream_start = None
return True
# moved from client.CommonClient:
def RegisterDisconnectHandler(self,handler):
@ -200,11 +330,7 @@ class NBCommonClient:
override this method or at least unregister it. """
raise IOError('Disconnected from server.')
def event(self,eventName,args={}):
""" Default event handler. To be overriden. """
print "Event: ",(eventName,args)
def isConnected(self):
def get_connect_type(self):
""" Returns connection state. F.e.: None / 'tls' / 'tcp+non_sasl' . """
return self.connected
@ -212,74 +338,18 @@ class NBCommonClient:
''' get the ip address of the account, from which is made connection
to the server , (e.g. me).
We will create listening socket on the same ip '''
# moved from client.CommonClient
if hasattr(self, 'Connection'):
return self.Connection._sock.getsockname()
class NonBlockingClient(NBCommonClient):
''' Example client class, based on CommonClient. '''
def connect(self,server=None,proxy=None,secure=None,use_srv=True):
''' Connect to jabber server. If you want to specify different ip/port to connect to you can
pass it as tuple as first parameter. If there is HTTP proxy between you and server
specify it's address and credentials (if needed) in the second argument.
If you want ssl/tls support to be discovered and enable automatically - leave third argument as None. (ssl will be autodetected only if port is 5223 or 443)
If you want to force SSL start (i.e. if port 5223 or 443 is remapped to some non-standard port) then set it to 1.
If you want to disable tls/ssl support completely, set it to 0.
Example: connect(('192.168.5.5',5222),{'host':'proxy.my.net','port':8080,'user':'me','password':'secret'})
Returns '' or 'tcp' or 'tls', depending on the result.'''
self.__secure = secure
self.Connection = None
NBCommonClient.connect(self, server = server, proxy = proxy, ssl = secure,
on_stream_start = self._on_tcp_stream_start)
return self.connected
def _is_connected(self):
self.onreceive(None)
if self.on_connect:
self.on_connect(self, self.connected)
self.on_connect_failure = None
self.on_connect = None
def _on_tcp_stream_start(self):
if not self.connected or self.__secure is not None and not self.__secure:
self._is_connected()
return True
self.isplugged = True
self.onreceive(None)
transports_nb.NonBlockingTLS().PlugIn(self)
if not self.Connection: # ssl error, stream is closed
return True
if not self.Dispatcher.Stream._document_attrs.has_key('version') or \
not self.Dispatcher.Stream._document_attrs['version']=='1.0':
self._is_connected()
return
if not self.Dispatcher.Stream.features.getTag('starttls'):
self._is_connected()
return
self.onreceive(self._on_receive_starttls)
def _on_receive_starttls(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if not self.NonBlockingTLS.starttls:
return
self.onreceive(None)
if not hasattr(self, 'NonBlockingTLS') or self.NonBlockingTLS.starttls != 'success':
self.event('tls_failed')
self._is_connected()
return
self.connected = 'tls'
self.onreceive(None)
self._is_connected()
return True
def auth(self, user, password, resource = '', sasl = 1, on_auth = None):
print 'auth called'
''' Authenticate connnection and bind resource. If resource is not provided
random one or library name used. '''
self._User, self._Password, self._Resource, self._sasl = user, password, resource, sasl
self.on_auth = on_auth
self.get_attrs(self._on_doc_attrs)
self._on_doc_attrs()
return
def _on_old_auth(self, res):
@ -335,6 +405,40 @@ class NonBlockingClient(NBCommonClient):
self.on_auth(self, 'sasl')
else:
self.on_auth(self, None)
class NonBlockingClient(NBCommonClient):
''' Example client class, based on CommonClient. '''
def _on_stream_start(self):
'''
Called after XMPP stream is opened.
In pure XMPP client, TLS negotiation may follow after esabilishing a stream.
'''
self.onreceive(None)
if self.connected == 'tcp':
if not self.connected or self._secure is not None and not self._secure:
# if we are disconnected or TLS/SSL is not desired, return
self._on_connect()
return
if not self.Dispatcher.Stream.features.getTag('starttls'):
# if server doesn't advertise TLS in init response
self._on_connect()
return
if self.incoming_stream_version() != '1.0':
self._on_connect()
return
# otherwise start TLS
transports_nb.NonBlockingTLS().PlugIn(
self,
on_tls_success=lambda: self._xmpp_connect(socket_type='tls'),
on_tls_failure=self._on_connect_failure)
elif self.connected == 'tls':
self._on_connect()
def initRoster(self):
''' Plug in the roster. '''
@ -354,87 +458,84 @@ class NonBlockingClient(NBCommonClient):
if requestRoster: roster_nb.NonBlockingRoster().PlugIn(self)
self.send(dispatcher_nb.Presence(to=jid, typ=typ))
class Component(NBCommonClient):
''' Component class. The only difference from CommonClient is ability to perform component authentication. '''
def __init__(self, server, port=5347, typ=None, debug=['always', 'nodebuilder'],
domains=None, component=0, on_connect = None, on_connect_failure = None):
''' Init function for Components.
As components use a different auth mechanism which includes the namespace of the component.
Jabberd1.4 and Ejabberd use the default namespace then for all client messages.
Jabberd2 uses jabber:client.
'server' argument is a server name that you are connecting to (f.e. "localhost").
'port' can be specified if 'server' resolves to correct IP. If it is not then you'll need to specify IP
and port while calling "connect()".'''
NBCommonClient.__init__(self, server, port=port, debug=debug)
self.typ = typ
self.component=component
if domains:
self.domains=domains
else:
self.domains=[server]
self.on_connect_component = on_connect
self.on_connect_failure = on_connect_failure
def connect(self, server=None, proxy=None):
''' This will connect to the server, and if the features tag is found then set
the namespace to be jabber:client as that is required for jabberd2.
'server' and 'proxy' arguments have the same meaning as in xmpp.Client.connect() '''
if self.component:
self.Namespace=auth.NS_COMPONENT_1
self.Server=server[0]
NBCommonClient.connect(self, server=server, proxy=proxy,
on_connect = self._on_connect, on_connect_failure = self.on_connect_failure)
def _on_connect(self):
if self.typ=='jabberd2' or not self.typ and self.Dispatcher.Stream.features is not None:
self.defaultNamespace=auth.NS_CLIENT
self.Dispatcher.RegisterNamespace(self.defaultNamespace)
self.Dispatcher.RegisterProtocol('iq',dispatcher.Iq)
self.Dispatcher.RegisterProtocol('message',dispatcher_nb.Message)
self.Dispatcher.RegisterProtocol('presence',dispatcher_nb.Presence)
self.on_connect(self.connected)
def auth(self, name, password, dup=None, sasl=0):
''' Authenticate component "name" with password "password".'''
self._User, self._Password, self._Resource=name, password,''
try:
if self.component:
sasl=1
if sasl:
auth.SASL(name,password).PlugIn(self)
if not sasl or self.SASL.startsasl=='not-supported':
if auth.NonSASL(name,password,'').PlugIn(self):
self.connected+='+old_auth'
return 'old_auth'
return
self.SASL.auth()
self.onreceive(self._on_auth_component)
except:
self.DEBUG(self.DBG,"Failed to authenticate %s" % name,'error')
def _on_auth_component(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if self.SASL.startsasl == 'in-process':
return
if self.SASL.startsasl =='success':
if self.component:
self._component = self.component
auth.NBComponentBind().PlugIn(self)
self.onreceive(_on_component_bind)
self.connected += '+sasl'
else:
raise auth.NotAuthorized(self.SASL.startsasl)
def _on_component_bind(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if self.NBComponentBind.bound is None:
return
for domain in self.domains:
self.NBComponentBind.Bind(domain, _on_component_bound)
def _on_component_bound(self, resp):
self.NBComponentBind.PlugOut()
class BOSHClient(NBCommonClient):
'''
Client class implementing BOSH.
'''
def __init__(self, *args, **kw):
'''Preceeds constructor of NBCommonClient and sets some of values that will
be used as attributes in <body> tag'''
self.Namespace = NS_HTTP_BIND
# BOSH parameters should be given via Advanced Configuration Editor
self.bosh_hold = 1
self.bosh_wait=60
self.bosh_rid=-1
self.bosh_httpversion = 'HTTP/1.1'
NBCommonClient.__init__(self, *args, **kw)
def connect(self, *args, **kw):
proxy = kw['proxy']
self.bosh_protocol, self.bosh_host, self.bosh_uri = self.urisplit(proxy['host'])
self.bosh_port = proxy['port']
NBCommonClient.connect(*args, **kw)
def _on_stream_start(self):
'''
Called after XMPP stream is opened. In BOSH TLS is negotiated on tranport layer
so success callback can be invoked.
(authentication is started from auth() method)
'''
self.onreceive(None)
if self.connected == 'tcp':
self._on_connect()
def bosh_raise_event(self, realm, event, data):
# should to extract stanza from body
self.DEBUG(self.DBG,'realm: %s, event: %s, data: %s' % (realm, event, data),
'BOSH EventHandler')
self._caller._event_dispatcher(realm, event, data)
def StreamInit(self):
'''
Init of BOSH session. Called instead of Dispatcher.StreamInit()
Initial body tag is created and sent.
'''
#self.Dispatcher.RegisterEventHandler(self.bosh_event_handler)
self.Dispatcher.Stream = simplexml.NodeBuilder()
self.Dispatcher.Stream._dispatch_depth = 2
self.Dispatcher.Stream.dispatch = self.Dispatcher.dispatch
self.Dispatcher.Stream.stream_header_received = self._check_stream_start
self.Dispatcher.Stream.features = None
r = random.Random()
r.seed()
# with 50-bit random initial rid, session would have to go up
# to 7881299347898368 messages to raise rid over 2**53
# (see http://www.xmpp.org/extensions/xep-0124.html#rids)
self.bosh_rid = r.getrandbits(50)
initial_body_tag = BOSHBody(
attrs={'content': 'text/xml; charset=utf-8',
'hold': str(self.bosh_hold),
# "to" should be domain, not hostname of machine
'to': self.Server,
'wait': str(self.bosh_wait),
'rid': str(self.bosh_rid),
'xmpp:version': '1.0',
'xmlns:xmpp': 'urn:xmpp:xbosh'}
)
if locale.getdefaultlocale()[0]:
initial_body_tag.setAttr('xml:lang',
locale.getdefaultlocale()[0].split('_')[0])
initial_body_tag.setAttr('xmpp:version', '1.0')
initial_body_tag.setAttr('xmlns:xmpp', 'urn:xmpp:xbosh')
self.send(initial_body_tag)

View File

@ -393,6 +393,7 @@ class Debug:
colors={}
def Show(self, flag, msg, prefix=''):
msg=str(msg)
msg=msg.replace('\r','\\r').replace('\n','\\n').replace('><','>\n <')
if not colors_enabled: pass
elif self.colors.has_key(prefix): msg=self.colors[prefix]+msg+color_none

View File

@ -47,7 +47,7 @@ class Dispatcher(PlugIn):
self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler, \
self.RegisterEventHandler, self.UnregisterCycleHandler, self.RegisterCycleHandler, \
self.RegisterHandlerOnce, self.UnregisterHandler, self.RegisterProtocol, \
self.SendAndWaitForResponse, self.send,self.disconnect, \
self.SendAndWaitForResponse, self.assign_id, self.StreamTerminate, \
self.SendAndCallForResponse, self.getAnID, self.Event]
def getAnID(self):
@ -79,6 +79,8 @@ class Dispatcher(PlugIn):
def plugin(self, owner):
''' Plug the Dispatcher instance into Client class instance and send initial stream header. Used internally.'''
self.DEBUG('Dispatcher plugin', 'PlugIn')
self._init()
self._owner.lastErrNode = None
self._owner.lastErr = None
@ -116,6 +118,10 @@ class Dispatcher(PlugIn):
locale.getdefaultlocale()[0].split('_')[0])
self._owner.send("<?xml version='1.0'?>%s>" % str(self._metastream)[:-2])
def StreamTerminate(self):
''' Send a stream terminator. '''
self._owner.send('</stream:stream>')
def _check_stream_start(self, ns, tag, attrs):
if ns<>NS_STREAMS or tag<>'stream':
raise ValueError('Incorrect stream start: (%s,%s). Terminating.' % (tag, ns))
@ -139,7 +145,7 @@ class Dispatcher(PlugIn):
return 0
except ExpatError:
self.DEBUG('Invalid XML received from server. Forcing disconnect.', 'error')
self._owner.Connection.pollend()
self._owner.Connection.disconnect()
return 0
if len(self._pendingExceptions) > 0:
_pendingException = self._pendingExceptions.pop()
@ -244,7 +250,7 @@ class Dispatcher(PlugIn):
def returnStanzaHandler(self,conn,stanza):
''' Return stanza back to the sender with <feature-not-implemennted/> error set. '''
if stanza.getType() in ['get','set']:
conn.send(Error(stanza,ERR_FEATURE_NOT_IMPLEMENTED))
conn._owner.send(Error(stanza,ERR_FEATURE_NOT_IMPLEMENTED))
def streamErrorHandler(self,conn,error):
name,text='error',error.getData()
@ -387,7 +393,7 @@ class Dispatcher(PlugIn):
''' Put stanza on the wire and wait for recipient's response to it. '''
if timeout is None:
timeout = DEFAULT_TIMEOUT_SECONDS
self._witid = self.send(stanza)
self._witid = self._owner.send(stanza)
if func:
self.on_responses[self._witid] = (func, args)
if timeout:
@ -401,11 +407,10 @@ class Dispatcher(PlugIn):
Additional callback arguments can be specified in args. '''
self.SendAndWaitForResponse(stanza, 0, func, args)
def send(self, stanza, is_message = False, now = False):
''' Serialise stanza and put it on the wire. Assign an unique ID to it before send.
Returns assigned ID.'''
def assign_id(self, stanza):
''' Assign an unique ID to stanza and return assigned ID.'''
if type(stanza) in [type(''), type(u'')]:
return self._owner.Connection.send(stanza, now = now)
return (None, stanza)
if not isinstance(stanza, Protocol):
_ID=None
elif not stanza.getID():
@ -417,23 +422,7 @@ class Dispatcher(PlugIn):
_ID=stanza.getID()
if self._owner._registered_name and not stanza.getAttr('from'):
stanza.setAttr('from', self._owner._registered_name)
if self._owner._component and stanza.getName() != 'bind':
to=self._owner.Server
if stanza.getTo() and stanza.getTo().getDomain():
to=stanza.getTo().getDomain()
frm=stanza.getFrom()
if frm.getDomain():
frm=frm.getDomain()
route=Protocol('route', to=to, frm=frm, payload=[stanza])
stanza=route
stanza.setNamespace(self._owner.Namespace)
stanza.setParent(self._metastream)
if is_message:
self._owner.Connection.send(stanza, True, now = now)
else:
self._owner.Connection.send(stanza, now = now)
return _ID
return (_ID, stanza)
def disconnect(self):
''' Send a stream terminator. '''
self._owner.Connection.send('</stream:stream>')

View File

@ -33,7 +33,7 @@ class IdleObject:
''' called on new write event (connect in sockets is a pollout) '''
pass
def read_timeout(self, fd):
def read_timeout(self):
''' called when timeout has happend '''
pass

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,270 +0,0 @@
from idlequeue import IdleObject
from client import PlugIn
import threading, socket, errno
import logging
log = logging.getLogger('gajim.c.x.transports_nb')
consoleloghandler = logging.StreamHandler()
consoleloghandler.setLevel(logging.DEBUG)
consoleloghandler.setFormatter(
logging.Formatter('%(levelname)s: %(message)s')
)
log.setLevel(logging.DEBUG)
log.addHandler(consoleloghandler)
log.propagate = False
'''
this module will replace transports_nb.py
For now, it can be run from test/test_nonblockingtcp.py
* set credentials in the testing script
'''
class NBgetaddrinfo(threading.Thread):
'''
Class for nonblocking call of getaddrinfo. Maybe unnecessary.
'''
def __init__(self, server, on_success, on_failure, timeout_sec):
'''
Call is started from constructor. It is not needed to hold reference on
created instance.
:param server: tuple (hostname, port) for DNS request
:param on_success: callback for successful DNS request
:param on_failure: called when DNS request couldn't be performed
:param timeout_sec: max seconds to wait for return from getaddrinfo. After
this time, on_failure is called with error message.
'''
threading.Thread.__init__(self)
self.on_success = on_success
self.on_failure = on_failure
self.server = server
self.lock = threading.Lock()
self.already_called = False
self.timer = threading.Timer(timeout_sec, self.on_timeout)
self.timer.start()
self.start()
def on_timeout(self):
'''
Called by timer. Means that getaddrinfo takes too long and will be
interrupted.
'''
self.do_call(False, 'NBgetaddrinfo timeout while looking up %s:%s' % self.server)
def do_call(self, success, data):
'''
Method called either on success and failure. In case of timeout it will be
called twice but only the first (failure) call will be performed.
:param success: True if getaddrinfo returned properly, False if there was an
error or on timeout.
:param data: error message if failure, list of address structures if success
'''
log.debug('NBgetaddrinfo::do_call(): %s' % repr(data))
self.timer.cancel()
self.lock.acquire()
if not self.already_called:
self.already_called = True
self.lock.release()
if success:
self.on_success(data)
else:
self.on_failure(data)
return
else:
self.lock.release()
return
def run(self):
try:
ips = socket.getaddrinfo(self.server[0],self.server[1],socket.AF_UNSPEC,
socket.SOCK_STREAM)
except socket.gaierror, e:
self.do_call(False, 'Lookup failure for %s: %s %s' %
(repr(self.server), e[0], e[1]))
except Exception, e:
self.do_call(False, 'Exception while DNS lookup of %s: %s' %
(repr(e), repr(self.server)))
else:
self.do_call(True, ips)
DISCONNECTED ='DISCONNECTED'
CONNECTING ='CONNECTING'
CONNECTED ='CONNECTED'
DISCONNECTING ='DISCONNECTING'
CONNECT_TIMEOUT_SECONDS = 5
'''timeout to connect to the server socket, it doesn't include auth'''
DISCONNECT_TIMEOUT_SECONDS = 10
'''how long to wait for a disconnect to complete'''
class NonBlockingTcp(PlugIn, IdleObject):
def __init__(self, on_xmpp_connect=None, on_xmpp_failure=None):
'''
Class constructor. All parameters can be reset in tcp_connect or xmpp_connect
calls.
'''
PlugIn.__init__(self)
IdleObject.__init__(self)
self.on_tcp_connect = None
self.on_tcp_failure = None
self.sock = None
self.idlequeue = None
self.DBG_LINE='socket'
self.state = DISCONNECTED
'''
CONNECTING - after non-blocking socket.connect() until TCP connection is estabilished
CONNECTED - after TCP connection is estabilished
DISCONNECTING -
DISCONNECTED
'''
self._exported_methods=[self.send, self.disconnect, self.onreceive, self.set_send_timeout,
self.start_disconnect, self.set_timeout, self.remove_timeout]
def connect(self, conn_5tuple, on_tcp_connect, on_tcp_failure, idlequeue):
'''
Creates and connects socket to server and port defined in conn_5tupe which
should be list item returned from getaddrinfo.
:param conn_5tuple: 5-tuple returned from getaddrinfo
:param on_tcp_connect: callback called on successful tcp connection
:param on_tcp_failure: callback called on failure when estabilishing tcp
connection
:param idlequeue: idlequeue for socket
'''
self.on_tcp_connect = on_tcp_connect
self.on_tcp_failure = on_tcp_failure
self.conn_5tuple = conn_5tuple
try:
self.sock = socket.socket(*conn_5tuple[:3])
except socket.error, (errnum, errstr):
on_tcp_failure('NonBlockingTcp: Error while creating socket: %s %s' % (errnum, errstr))
return
self.idlequeue = idlequeue
self.fd = self.sock.fileno()
self.idlequeue.plug_idle(self, True, False)
errnum = 0
''' variable for errno symbol that will be found from exception raised from connect() '''
# set timeout for TCP connecting - if nonblocking connect() fails, pollend
# is called. If if succeeds pollout is called.
self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT_SECONDS)
try:
self.sock.setblocking(False)
self.sock.connect(conn_5tuple[4])
except Exception, (errnum, errstr):
pass
if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
# connecting in progress
self.state = CONNECTING
log.debug('After nonblocking connect. "%s" raised => CONNECTING' % errstr)
# on_tcp_connect/failure will be called from self.pollin/self.pollout
return
elif errnum in (0, 10056, errno.EISCONN):
# already connected - this branch is very unlikely, nonblocking connect() will
# return EINPROGRESS exception in most cases. Anyway, we don't need timeout
# on connected descriptor
log.debug('After nonblocking connect. "%s" raised => CONNECTED' % errstr)
self._on_tcp_connect(self)
return
# if there was some other error, call failure callback and unplug transport
# which will also remove read_timeouts for descriptor
self._on_tcp_failure('Exception while connecting to %s: %s - %s' %
(conn_5tuple[4], errnum, errstr))
def _on_tcp_connect(self, data):
''' This method preceeds actual call of on_tcp_connect callback
'''
self.state = CONNECTED
self.idlequeue.remove_timeout(self.fd)
self.on_tcp_connect(data)
def _on_tcp_failure(self,err_msg):
''' This method preceeds actual call of on_tcp_failure callback
'''
self.state = DISCONNECTED
self.idlequeue.unplug_idle(self.fd)
self.on_tcp_failure(err_msg)
def pollin(self):
'''called when receive on plugged socket is possible '''
log.debug('pollin called, state == %s' % self.state)
def pollout(self):
'''called when send to plugged socket is possible'''
log.debug('pollout called, state == %s' % self.state)
if self.state==CONNECTING:
self._on_tcp_connect(self)
return
def pollend(self):
'''called when remote site closed connection'''
log.debug('pollend called, state == %s' % self.state)
if self.state==CONNECTING:
self._on_tcp_failure('Error during connect to %s:%s' % self.conn_5tuple[4])
def read_timeout(self):
'''
Implemntation of IdleObject function called on timeouts from IdleQueue.
'''
log.debug('read_timeout called, state == %s' % self.state)
if self.state==CONNECTING:
# if read_timeout is called during connecting, connect() didn't end yet
# thus we have to close the socket
try:
self.sock.close()
except socket.error, (errnum, errmsg):
log.error('Error while closing socket on connection timeout: %s %s'
% (errnum, errmsg))
self._on_tcp_failure('Error during connect to %s:%s' % self.conn_5tuple[4])
def disconnect(self, on_disconnect=None):
if self.state == DISCONNECTED:
return
self.idlequeue.unplug_idle(self.fd)
try:
self.sock.shutdown(socket.SHUT_RDWR)
except socket.error, (errnum, errstr):
log.error('Error while disconnecting: %s %s' % (errnum,errstr))
try:
self.sock.close()
except socket.error, (errnum, errmsg):
log.error('Error closing socket: %s %s' % (errnum,errstr))
if on_disconnect:
on_disconnect()
def send(self, data, now=False):
pass
def onreceive(self):
pass
def set_send_timeout(self):
pass
def set_timeout(self):
pass
def remove_timeout(self):
pass
def start_disconnect(self):
pass

View File

@ -1199,7 +1199,7 @@ class ManageProxiesWindow:
proxypass_entry.set_text(gajim.config.get_per('proxies', proxy,
'pass'))
proxytype = gajim.config.get_per('proxies', proxy, 'type')
types = ['http', 'socks5']
types = ['http', 'socks5', 'bosh']
self.proxytype_combobox.set_active(types.index(proxytype))
if gajim.config.get_per('proxies', proxy, 'user'):
useauth_checkbutton.set_active(True)
@ -1227,7 +1227,7 @@ class ManageProxiesWindow:
model.set_value(iter, 0, new_name)
def on_proxytype_combobox_changed(self, widget):
types = ['http', 'socks5']
types = ['http', 'socks5', 'bosh']
type_ = self.proxytype_combobox.get_active()
proxy = self.proxyname_entry.get_text().decode('utf-8')
gajim.config.set_per('proxies', proxy, 'type', types[type_])

View File

@ -22,7 +22,7 @@ xmpp_server_port = ('xmpp.example.org',5222)
Script will connect to the machine.
'''
credentials = ['login', 'pass', 'testclient']
credentials = ['loginn', 'passwo', 'testresour']
'''
[username, password, passphrase]
Script will autheticate itself with this credentials on above mentioned server.
@ -41,19 +41,6 @@ class TestNonBlockingClient(unittest.TestCase):
self.idlequeue_thread = IdleQueueThread()
self.connection = MockConnectionClass()
self.client = client_nb.NonBlockingClient(
server=xmpp_server_port[0],
port=xmpp_server_port[1],
on_connect=lambda *args: self.connection.on_connect(True, *args),
on_connect_failure=lambda *args: self.connection.on_connect(False, *args),
caller=self.connection
)
'''
NonBlockingClient instance with parameters from global variables and with
callbacks from dummy connection.
'''
self.client.set_idlequeue(self.idlequeue_thread.iq)
self.idlequeue_thread.start()
def tearDown(self):
@ -70,17 +57,33 @@ class TestNonBlockingClient(unittest.TestCase):
:param server_port: tuple of (hostname, port) for where the client should
connect.
'''
self.client.connect(server_port)
self.client = client_nb.NonBlockingClient(
hostname=server_port[0],
port=server_port[1],
caller=self.connection,
idlequeue=self.idlequeue_thread.iq,
)
'''
NonBlockingClient instance with parameters from global variables and with
callbacks from dummy connection.
'''
self.client.connect(
on_connect=lambda *args: self.connection.on_connect(True, *args),
on_connect_failure=lambda *args: self.connection.on_connect(False, *args),
secure=False
)
print 'waiting for callback from client constructor'
self.connection.wait()
# if on_connect was called, client has to be connected and vice versa
if self.connection.connect_succeeded:
self.assert_(self.client.isConnected())
self.assert_(self.client.get_connect_type())
else:
self.assert_(not self.client.isConnected())
self.assert_(not self.client.get_connect_type())
def client_auth(self, username, password, resource, sasl):
'''
@ -100,7 +103,9 @@ class TestNonBlockingClient(unittest.TestCase):
'''
Does disconnecting of connected client. Returns when TCP connection is closed.
'''
self.client.start_disconnect(None, on_disconnect=self.connection.set_event)
#self.client.start_disconnect(None, on_disconnect=self.connection.set_event)
self.client.RegisterDisconnectHandler(self.connection.set_event)
self.client.disconnect()
print 'waiting for disconnecting...'
self.connection.wait()
@ -113,7 +118,7 @@ class TestNonBlockingClient(unittest.TestCase):
self.open_stream(xmpp_server_port)
# if client is not connected, lets raise the AssertionError
self.assert_(self.client.isConnected())
self.assert_(self.client.get_connect_type())
# (client.disconnect() is already called from NBClient._on_connected_failure
# so there's need to call it in this case
@ -130,7 +135,7 @@ class TestNonBlockingClient(unittest.TestCase):
then disconnected.
'''
self.open_stream(xmpp_server_port)
self.assert_(self.client.isConnected())
self.assert_(self.client.get_connect_type())
self.client_auth(credentials[0], credentials[1], credentials[2], sasl=0)
self.assert_(self.connection.con)
self.assert_(self.connection.auth=='old_auth')
@ -141,7 +146,8 @@ class TestNonBlockingClient(unittest.TestCase):
Connect to nonexisting host. DNS request for A records should return nothing.
'''
self.open_stream(('fdsfsdf.fdsf.fss', 5222))
self.assert_(not self.client.isConnected())
print 'nonexthost: %s' % self.client.get_connect_type()
self.assert_(not self.client.get_connect_type())
def test_connect_to_wrong_port(self):
'''
@ -149,14 +155,14 @@ class TestNonBlockingClient(unittest.TestCase):
but there shouldn't be XMPP server running on specified port.
'''
self.open_stream((xmpp_server_port[0], 31337))
self.assert_(not self.client.isConnected())
self.assert_(not self.client.get_connect_type())
def test_connect_with_wrong_creds(self):
'''
Connecting with invalid password.
'''
self.open_stream(xmpp_server_port)
self.assert_(self.client.isConnected())
self.assert_(self.client.get_connect_type())
self.client_auth(credentials[0], "wrong pass", credentials[2], sasl=1)
self.assert_(self.connection.auth is None)
self.do_disconnect()
@ -168,9 +174,10 @@ class TestNonBlockingClient(unittest.TestCase):
if __name__ == '__main__':
#suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingClient)
suite = unittest.TestSuite()
suite.addTest(TestNonBlockingClient('test_proper_connect_sasl'))
suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingClient)
#suite = unittest.TestSuite()
#suite.addTest(TestNonBlockingClient('test_proper_connect_oldauth'))
#suite.addTest(TestNonBlockingClient('test_connect_to_nonexisting_host'))
unittest.TextTestRunner(verbosity=2).run(suite)

View File

@ -12,7 +12,7 @@ gajim_root = os.path.join(os.path.abspath(os.path.dirname(__file__)), '..')
sys.path.append(gajim_root + '/src/common/xmpp')
sys.path.append(gajim_root + '/src/common')
import transports_new, debug
import transports_nb
from client import *
xmpp_server = ('xmpp.example.org',5222)
@ -21,60 +21,48 @@ xmpp_server = ('xmpp.example.org',5222)
Script will connect to the machine.
'''
dns_timeout = 10
'''
timeout for DNS A-request (for getaddrinfo() call)
'''
import socket
ips = socket.getaddrinfo(xmpp_server[0], xmpp_server[1], socket.AF_UNSPEC,socket.SOCK_STREAM)
# change xmpp_server on real values
ip = ips[0]
class MockClient(IdleMock):
def __init__(self, server, port):
def __init__(self, idlequeue):
self.idlequeue=idlequeue
self.debug_flags=['all', 'nodebuilder']
self._DEBUG = debug.Debug(['socket'])
self.DEBUG = self._DEBUG.Show
self.server = server
self.port = port
IdleMock.__init__(self)
self.tcp_connected = False
self.ip_addresses = []
self.socket = None
def do_dns_request(self):
transports_new.NBgetaddrinfo(
server=(self.server, self.port),
on_success=lambda *args:self.on_success('DNSrequest', *args),
on_failure=self.on_failure,
timeout_sec=dns_timeout
def do_connect(self):
self.socket=transports_nb.NonBlockingTcp(
on_disconnect=lambda: self.on_success(mode='SocketDisconnect')
)
self.socket.PlugIn(self)
self.socket.connect(
conn_5tuple=ip,
on_connect=lambda: self.on_success(mode='TCPconnect'),
on_connect_failure=self.on_failure
)
self.wait()
def try_next_ip(self, err_message=None):
if err_message:
print err_message
if self.ip_addresses == []:
self.on_failure('Run out of hosts')
return
current_ip = self.ip_addresses.pop(0)
self.NonBlockingTcp.connect(
conn_5tuple=current_ip,
on_tcp_connect=lambda *args: self.on_success('TCPconnect',*args),
on_tcp_failure=self.try_next_ip,
idlequeue=self.idlequeue
)
def do_disconnect(self):
self.socket.disconnect()
self.wait()
def set_idlequeue(self, idlequeue):
self.idlequeue=idlequeue
def on_failure(self, data):
print 'Error: %s' % data
self.set_event()
def on_success(self, mode, data):
if mode == "DNSrequest":
self.ip_addresses = data
elif mode == "TCPconnect":
def on_success(self, mode, data=None):
if mode == "TCPconnect":
pass
if mode == "SocketDisconnect":
pass
self.set_event()
@ -87,12 +75,10 @@ class MockClient(IdleMock):
class TestNonBlockingTcp(unittest.TestCase):
def setUp(self):
self.nbtcp = transports_new.NonBlockingTcp()
self.client = MockClient(*xmpp_server)
self.idlequeue_thread = IdleQueueThread()
self.idlequeue_thread.start()
self.client.set_idlequeue(self.idlequeue_thread.iq)
self.nbtcp.PlugIn(self.client)
self.client = MockClient(
idlequeue=self.idlequeue_thread.iq)
def tearDown(self):
self.idlequeue_thread.stop_thread()
@ -100,12 +86,12 @@ class TestNonBlockingTcp(unittest.TestCase):
def testSth(self):
self.client.do_dns_request()
if self.client.ip_addresses == []:
print 'No IP found for given hostname: %s' % self.client.server
return
else:
self.client.try_next_ip()
self.client.do_connect()
self.assert_(self.client.socket.state == 'CONNECTED')
self.client.do_disconnect()
self.assert_(self.client.socket.state == 'DISCONNECTED')