From 3d860f40a6dc99853684e7e9f1277b12207b40b9 Mon Sep 17 00:00:00 2001 From: tomk Date: Sun, 13 Jul 2008 22:22:58 +0000 Subject: [PATCH] BOSHClient transformed to NonBlockingBOSH transport - it's easier to maintain more connections from below, implemented handling of non-persistent HTTP connections - it runs with ejabberd, improved NonBlockingTransport interface, minor changes in BOSHDispatcher --- src/common/connection.py | 46 ++-- src/common/xmpp/bosh.py | 375 ++++++++++++++++--------------- src/common/xmpp/client.py | 2 +- src/common/xmpp/client_nb.py | 82 ++++--- src/common/xmpp/dispatcher_nb.py | 85 ++++--- src/common/xmpp/idlequeue.py | 11 +- src/common/xmpp/protocol.py | 14 +- src/common/xmpp/transports_nb.py | 181 +++++++++------ test/test_nonblockingtcp.py | 8 +- 9 files changed, 451 insertions(+), 353 deletions(-) diff --git a/src/common/connection.py b/src/common/connection.py index 649baf178..36e7719b6 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -521,24 +521,19 @@ class Connection(ConnectionHandlers): self.connection = None if self._current_type == 'ssl': + # SSL (force TLS on different port than plain) port = self._current_host['ssl_port'] - secur = 1 + secure = 'force' else: port = self._current_host['port'] if self._current_type == 'plain': - secur = 0 + # plain connection + secure = None else: - secur = None + # TLS (on the same port as plain) + secure = 'negotiate' - if self._proxy and self._proxy['type'] == 'bosh': - clientClass = common.xmpp.bosh.BOSHClient - else: - clientClass = common.xmpp.NonBlockingClient - - # there was: - # "if gajim.verbose:" - # here - con = clientClass( + con = common.xmpp.NonBlockingClient( domain=self._hostname, caller=self, idlequeue=gajim.idlequeue) @@ -550,11 +545,11 @@ class Connection(ConnectionHandlers): if self.on_connect_success == self._on_new_account: con.RegisterDisconnectHandler(self._on_new_account) - # FIXME: BOSH properties should be in proxy dictionary - loaded from - # config - if self._proxy and self._proxy['type'] == 'bosh': + # FIXME: BOSH properties should be loaded from config + if self._proxy and self._proxy['type'] == 'bosh': self._proxy['bosh_hold'] = '1' self._proxy['bosh_wait'] = '60' + self._proxy['bosh_content'] = 'text/xml; charset=utf-8' log.info('Connecting to %s: [%s:%d]', self.name, @@ -566,7 +561,7 @@ class Connection(ConnectionHandlers): on_proxy_failure=self.on_proxy_failure, on_connect_failure=self.connect_to_next_type, proxy=self._proxy, - secure = secur) + secure = secure) else: self.connect_to_next_host(retry) @@ -578,9 +573,11 @@ class Connection(ConnectionHandlers): 'connection_types').split() else: self._connection_types = ['tls', 'ssl', 'plain'] - + # FIXME: remove after tls and ssl will be degubbed - #self._connection_types = ['plain'] + self._connection_types = ['plain'] + + host = self.select_next_host(self._hosts) self._current_host = host self._hosts.remove(host) @@ -619,6 +616,8 @@ class Connection(ConnectionHandlers): if _con_type == 'tcp': _con_type = 'plain' if _con_type != self._current_type: + log.info('Connecting to next type beacuse desired is %s and returned is %s' + % (self._current_type, _con_type)) self.connect_to_next_type() return if _con_type == 'plain' and gajim.config.get_per('accounts', self.name, @@ -662,7 +661,12 @@ class Connection(ConnectionHandlers): (con.Connection.ssl_fingerprint_sha1,)) return True self._register_handlers(con, con_type) - con.auth(name, self.password, self.server_resource, 1, self.__on_auth) + con.auth( + user=name, + password=self.password, + resource=self.server_resource, + sasl=1, + on_auth=self.__on_auth) def ssl_certificate_accepted(self): name = gajim.config.get_per('accounts', self.name, 'name') @@ -997,7 +1001,7 @@ class Connection(ConnectionHandlers): self.time_to_reconnect = None self.connection.RegisterDisconnectHandler(self._on_disconnected) - self.connection.send(p) + self.connection.send(p, now=True) self.connection.StreamTerminate() #self.connection.start_disconnect(p, self._on_disconnected) else: @@ -1554,7 +1558,7 @@ class Connection(ConnectionHandlers): def gc_got_disconnected(self, room_jid): ''' A groupchat got disconnected. This can be or purpose or not. Save the time we quit to avoid duplicate logs AND be faster than get that - date from DB. Save it in mem AND in a small table (with fast access) + date from DB. Save it in mem AND in a small table (with fast access) ''' log_time = time_time() self.last_history_time[room_jid] = log_time diff --git a/src/common/xmpp/bosh.py b/src/common/xmpp/bosh.py index 851d7cc4c..3d53bdf40 100644 --- a/src/common/xmpp/bosh.py +++ b/src/common/xmpp/bosh.py @@ -1,235 +1,260 @@ -import protocol, locale, random, dispatcher_nb -from client_nb import NBCommonClient -import transports_nb -import logging +import locale, random +from transports_nb import NonBlockingTransport, NonBlockingHTTP, CONNECTED, CONNECTING, DISCONNECTED +from protocol import BOSHBody from simplexml import Node + +import logging log = logging.getLogger('gajim.c.x.bosh') -class BOSHClient(NBCommonClient): - ''' - Client class implementing BOSH. Extends common XMPP - ''' - def __init__(self, domain, idlequeue, caller=None): - '''Preceeds constructor of NBCommonClient and sets some of values that will - be used as attributes in tag''' - self.bosh_sid=None +FAKE_DESCRIPTOR = -1337 +'''Fake file descriptor - it's used for setting read_timeout in idlequeue for +BOSH Transport. Timeouts in queue are saved by socket descriptor. +In TCP-derived transports it is file descriptor of socket''' + + +class NonBlockingBOSH(NonBlockingTransport): + def __init__(self, raise_event, on_disconnect, idlequeue, xmpp_server, domain, + bosh_dict): + NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue) # with 50-bit random initial rid, session would have to go up # to 7881299347898368 messages to raise rid over 2**53 # (see http://www.xmpp.org/extensions/xep-0124.html#rids) r = random.Random() r.seed() + global FAKE_DESCRIPTOR + FAKE_DESCRIPTOR = FAKE_DESCRIPTOR - 1 + self.fake_fd = FAKE_DESCRIPTOR self.bosh_rid = r.getrandbits(50) self.bosh_sid = None - if locale.getdefaultlocale()[0]: self.bosh_xml_lang = locale.getdefaultlocale()[0].split('_')[0] else: self.bosh_xml_lang = 'en' self.http_version = 'HTTP/1.1' + self.http_persistent = False + self.http_pipelining = False self.bosh_to = domain - #self.Namespace = protocol.NS_HTTP_BIND - #self.defaultNamespace = self.Namespace - self.bosh_session_on = False + self.route_host, self.route_port = xmpp_server - NBCommonClient.__init__(self, domain, idlequeue, caller) + self.bosh_wait = bosh_dict['bosh_wait'] + self.bosh_hold = bosh_dict['bosh_hold'] + self.bosh_host = bosh_dict['host'] + self.bosh_port = bosh_dict['port'] + self.bosh_content = bosh_dict['bosh_content'] + + self.http_socks = [] + self.stanzas_to_send = [] + self.prio_bosh_stanza = None + self.current_recv_handler = None + + # if proxy_host .. do sth about HTTP proxy etc. + + + def connect(self, conn_5tuple, on_connect, on_connect_failure): + NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure) + self.http_socks.append(self.get_http_socket()) + self.tcp_connection_started() + + # this connect() is not needed because sockets can be connected on send but + # we need to know if host is reachable in order to invoke callback for + # failure when connecting (it's different than callback for errors + # occurring after connection is etabilished) + + self.http_socks[0].connect( + conn_5tuple = conn_5tuple, + on_connect = lambda: self._on_connect(self.http_socks[0]), + on_connect_failure = self._on_connect_failure) - def connect(self, on_connect, on_connect_failure, proxy, hostname=None, port=5222, - on_proxy_failure=None, secure=None): - ''' - Open XMPP connection (open XML streams in both directions). - :param hostname: hostname of XMPP server from SRV request - :param port: port number of XMPP server - :param on_connect: called after stream is successfully opened - :param on_connect_failure: called when error occures during connection - :param on_proxy_failure: called if error occurres during TCP connection to - proxy server or during connection to the proxy - :param proxy: dictionary with bosh-related paramters. It should contain at - least values for keys 'host' and 'port' - connection details for proxy - server and optionally keys 'user' and 'pass' as proxy credentials - :param secure: if + def get_fd(self): + return self.fake_fd + + def on_http_request_possible(self): ''' - NBCommonClient.connect(self, on_connect, on_connect_failure, hostname, port, - on_proxy_failure, proxy, secure) + Called after HTTP response is received - another request is possible. + There should be always one pending request on BOSH CM. + ''' + log.info('on_http_req possible, stanzas in list: %s, state:\n%s' % + (self.stanzas_to_send, self.get_current_state())) + # if one of sockets is connecting, sth is about to be sent - we don't have to + # send request from here + for s in self.http_socks: + if s.state==CONNECTING or s.pending_requests>0: return + self.flush_stanzas() - if hostname: - self.route_host = hostname + + def flush_stanzas(self): + # another to-be-locked candidate + log.info('flushing stanzas') + if self.prio_bosh_stanza: + tmp = self.prio_bosh_stanza + self.prio_bosh_stanza = None else: - self.route_host = self.Server - - assert(proxy.has_key('type')) - assert(proxy['type']=='bosh') - - self.bosh_wait = proxy['bosh_wait'] - self.bosh_hold = proxy['bosh_hold'] - self.bosh_host = proxy['host'] - self.bosh_port = proxy['port'] - self.bosh_content = proxy['bosh_content'] - - # _on_tcp_failure is callback for errors which occur during name resolving or - # TCP connecting. - self._on_tcp_failure = self.on_proxy_failure + tmp = self.stanzas_to_send + self.stanzas_to_send = [] + self.send_http(tmp) - - # in BOSH, client connects to Connection Manager instead of directly to - # XMPP server ((hostname, port)). If HTTP Proxy is specified, client connects - # to HTTP proxy and Connection Manager is specified at URI and Host header - # in HTTP message - - # tcp_host, tcp_port is hostname and port for socket connection - Connection - # Manager or HTTP proxy - if proxy.has_key('proxy_host') and proxy['proxy_host'] and \ - proxy.has_key('proxy_port') and proxy['proxy_port']: - - tcp_host=proxy['proxy_host'] - tcp_port=proxy['proxy_port'] + def send(self, stanza, now=False): + # body tags should be send only via send_http() + assert(not isinstance(stanza, BOSHBody)) + now = True + if now: + self.send_http([stanza]) + else: + self.stanzas_to_send.append(stanza) - # user and password for HTTP proxy - if proxy.has_key('user') and proxy['user'] and \ - proxy.has_key('pass') and proxy['pass']: - proxy_creds=(proxy['user'],proxy['pass']) + def send_http(self, payload): + # "Protocol" and string/unicode stanzas should be sent via send() + # (only initiating and terminating BOSH stanzas should be send via send_http) + assert(isinstance(payload, list) or isinstance(payload, BOSHBody)) + log.warn('send_http: stanzas: %s\n%s' % (payload, self.get_current_state())) + + if isinstance(payload, list): + bosh_stanza = self.boshify_stanzas(payload) + else: + # bodytag_payload is , we don't boshify, only add the rid + bosh_stanza = payload + picked_sock = self.pick_socket() + if picked_sock: + log.info('sending to socket %s' % id(picked_sock)) + bosh_stanza.setAttr('rid', self.get_rid()) + picked_sock.send(bosh_stanza) + else: + # no socket was picked but one is about to connect - save the stanza and + # return + if self.prio_bosh_stanza: + payload = self.merge_stanzas(payload, self.prio_bosh_stanza) + if payload is None: + log.error('Error in BOSH socket handling - unable to send %s because %s\ + is already about to be sent' % (payload, self.prio_bosh_stanza)) + self.disconnect() + self.prio_bosh_stanza = payload + + def merge_stanzas(self, s1, s2): + if isinstance(s1, BOSHBody): + if isinstance(s2, BOSHBody): + # both are boshbodies + return else: - proxy_creds=(None, None) - + s1.setPayload(s2, add=True) + return s1 + elif isinstance(s2, BOSHBody): + s2.setPayload(s1, add=True) + return s2 else: - tcp_host = transports_nb.urisplit(proxy['host'])[1] - tcp_port=proxy['port'] + #both are lists + s1.extend(s2) + return s1 - if tcp_host is None: - self._on_connect_failure("Invalid BOSH URI") + + def get_current_state(self): + t = '\t\tSOCKET_ID\tSOCKET_STATE\tPENDING_REQS\n' + for s in self.http_socks: + t = '%s\t\t%s\t%s\t%s\n' % (t,id(s), s.state, s.pending_requests) + return t + + + def pick_socket(self): + # try to pick connected socket with no pending reqs + for s in self.http_socks: + if s.state == CONNECTED and s.pending_requests == 0: + return s + + # try to connect some disconnected socket + for s in self.http_socks: + if s.state==DISCONNECTED: + self.connect_and_flush(s) return - self.socket = self.get_socket() - - self._resolve_hostname( - hostname=tcp_host, - port=tcp_port, - on_success=self._try_next_ip, - on_failure=self._on_tcp_failure) - - def _on_stream_start(self): - ''' - Called after XMPP stream is opened. In BOSH, TLS is negotiated on socket - connect so success callback can be invoked after TCP connect. - (authentication is started from auth() method) - ''' - self.onreceive(None) - if self.connected == 'tcp': - self._on_connect() - - def get_socket(self): - tmp = transports_nb.NonBlockingHTTP( - raise_event=self.raise_event, - on_disconnect=self.on_http_disconnect, - http_uri = self.bosh_host, - http_port = self.bosh_port, - http_version = self.http_version - ) - tmp.PlugIn(self) - return tmp - - def on_http_disconnect(self): - log.info('HTTP socket disconnected') - #import traceback - #traceback.print_stack() - if self.bosh_session_on: - self.socket.connect( - conn_5tuple=self.current_ip, - on_connect=self.on_http_reconnect, - on_connect_failure=self.on_disconnect) - else: - self.on_disconnect() - - def on_http_reconnect(self): - self.socket._plug_idle() - log.info('Connected to BOSH CM again') - pass + # if there is any just-connecting socket, it will send the data in its + # connect callback + for s in self.http_socks: + if s.state==CONNECTING: + return + # being here means there are only CONNECTED scokets with pending requests. + # Lets create and connect another one + s = self.get_http_socket() + self.http_socks.append(s) + self.connect_and_flush(s) + return - def on_http_reconnect_fail(self): - log.error('Error when reconnecting to BOSH CM') - self.on_disconnect() - - def send(self, stanza, now = False): - (id, stanza_to_send) = self.Dispatcher.assign_id(stanza) + def connect_and_flush(self, socket): + socket.connect( + conn_5tuple = self.conn_5tuple, + on_connect = self.flush_stanzas, + on_connect_failure = self.disconnect) - self.socket.send( - self.boshify_stanza(stanza_to_send), - now = now) - return id - def get_rid(self): - # does this need a lock??" - self.bosh_rid = self.bosh_rid + 1 - return str(self.bosh_rid) + def boshify_stanzas(self, stanzas=[], body_attrs=None): + ''' wraps zero to many stanzas by body tag with xmlns and sid ''' + log.debug('boshify_staza - type is: %s, stanza is %s' % (type(stanzas), stanzas)) + tag = BOSHBody(attrs={'sid': self.bosh_sid}) + tag.setPayload(stanzas) + return tag - def get_bodytag(self): - # this should be called not until after session creation response so sid has - # to be initialized. - assert(hasattr(self, 'bosh_sid')) - return protocol.BOSHBody( - attrs={ 'rid': self.get_rid(), - 'sid': self.bosh_sid}) def get_initial_bodytag(self, after_SASL=False): - tag = protocol.BOSHBody( + return BOSHBody( attrs={'content': self.bosh_content, 'hold': str(self.bosh_hold), - 'route': '%s:%s' % (self.route_host, self.Port), + 'route': '%s:%s' % (self.route_host, self.route_port), 'to': self.bosh_to, 'wait': str(self.bosh_wait), - 'rid': self.get_rid(), 'xml:lang': self.bosh_xml_lang, 'xmpp:version': '1.0', 'ver': '1.6', 'xmlns:xmpp': 'urn:xmpp:xbosh'}) - if after_SASL: - tag.delAttr('content') - tag.delAttr('hold') - tag.delAttr('route') - tag.delAttr('wait') - tag.delAttr('ver') - # xmpp:restart attribute is essential for stream restart request - tag.setAttr('xmpp:restart','true') - tag.setAttr('sid',self.bosh_sid) - - return tag + def get_after_SASL_bodytag(self): + return BOSHBody( + attrs={ 'to': self.bosh_to, + 'sid': self.bosh_sid, + 'xml:lang': self.bosh_xml_lang, + 'xmpp:version': '1.0', + 'xmpp:restart': 'true', + 'xmlns:xmpp': 'urn:xmpp:xbosh'}) def get_closing_bodytag(self): - closing_bodytag = self.get_bodytag() - closing_bodytag.setAttr('type', 'terminate') - return closing_bodytag + return BOSHBody(attrs={'sid': self.bosh_sid, 'type': 'terminate'}) + + def get_rid(self): + self.bosh_rid = self.bosh_rid + 1 + return str(self.bosh_rid) - def boshify_stanza(self, stanza=None, body_attrs=None): - ''' wraps stanza by body tag with rid and sid ''' - #log.info('boshify_staza - type is: %s, stanza is %s' % (type(stanza), stanza)) - tag = self.get_bodytag() - tag.setPayload([stanza]) - return tag + def get_http_socket(self): + s = NonBlockingHTTP( + raise_event=self.raise_event, + on_disconnect=self.disconnect, + idlequeue = self.idlequeue, + on_http_request_possible = self.on_http_request_possible, + http_uri = self.bosh_host, + http_port = self.bosh_port, + http_version = self.http_version) + if self.current_recv_handler: + s.onreceive(self.current_recv_handler) + return s + def onreceive(self, recv_handler): + if recv_handler is None: + recv_handler = self._owner.Dispatcher.ProcessNonBlocking + self.current_recv_handler = recv_handler + for s in self.http_socks: + s.onreceive(recv_handler) - def on_bodytag_attrs(self, body_attrs): - #log.info('on_bodytag_attrs: %s' % body_attrs) - if body_attrs.has_key('type'): - if body_attrs['type']=='terminated': - # BOSH session terminated - self.bosh_session_on = False - elif body_attrs['type']=='error': - # recoverable error - pass - if not self.bosh_sid: - # initial response - when bosh_sid is set - self.bosh_session_on = True - self.bosh_sid = body_attrs['sid'] - self.Dispatcher.Stream._document_attrs['id']=body_attrs['authid'] + def disconnect(self, do_callback=True): + if self.state == DISCONNECTED: return + + for s in self.http_socks: + s.disconnect(do_callback=False) + NonBlockingTransport.disconnect(self, do_callback) diff --git a/src/common/xmpp/client.py b/src/common/xmpp/client.py index f9f200dac..52c6bc2f2 100644 --- a/src/common/xmpp/client.py +++ b/src/common/xmpp/client.py @@ -48,7 +48,7 @@ class PlugIn: else: owner.__dict__[self.__class__.__name__]=self - # following will not work for classes inheriting plugin() + # following commented line will not work for classes inheriting plugin() #if self.__class__.__dict__.has_key('plugin'): return self.plugin(owner) if hasattr(self,'plugin'): return self.plugin(owner) diff --git a/src/common/xmpp/client_nb.py b/src/common/xmpp/client_nb.py index c21437de5..d22905580 100644 --- a/src/common/xmpp/client_nb.py +++ b/src/common/xmpp/client_nb.py @@ -23,7 +23,7 @@ These classes can be used for simple applications "AS IS" though. import socket -import transports_nb, tls_nb, dispatcher_nb, auth_nb, roster_nb, protocol +import transports_nb, tls_nb, dispatcher_nb, auth_nb, roster_nb, protocol, bosh from client import * import logging @@ -49,7 +49,7 @@ class NBCommonClient: self.Server = domain - # caller is who initiated this client, it is sed to register the EventDispatcher + # caller is who initiated this client, it is needed to register the EventDispatcher self._caller = caller self._owner = self self._registered_name = None @@ -92,16 +92,8 @@ class NBCommonClient: self.NonBlockingTCP.PlugOut() if self.__dict__.has_key('NonBlockingHTTP'): self.NonBlockingHTTP.PlugOut() - - - def send(self, stanza, now = False): - ''' interface for putting stanzas on wire. Puts ID to stanza if needed and - sends it via socket wrapper''' - (id, stanza_to_send) = self.Dispatcher.assign_id(stanza) - - self.Connection.send(stanza_to_send, now = now) - return id - + if self.__dict__.has_key('NonBlockingBOSH'): + self.NonBlockingBOSH.PlugOut() def connect(self, on_connect, on_connect_failure, hostname=None, port=5222, @@ -177,7 +169,7 @@ class NBCommonClient: started, and _on_connect_failure on failure. ''' #FIXME: use RegisterHandlerOnce instead of onreceive - log.info('========xmpp_connect_machine() >> mode: %s, data: %s' % (mode,str(data)[:20] )) + log.info('-------------xmpp_connect_machine() >> mode: %s, data: %s' % (mode,str(data)[:20] )) def on_next_receive(mode): log.info('setting %s on next receive' % mode) @@ -187,7 +179,8 @@ class NBCommonClient: self.onreceive(lambda _data:self._xmpp_connect_machine(mode, _data)) if not mode: - dispatcher_nb.Dispatcher().PlugIn(self) + # starting state + d=dispatcher_nb.Dispatcher().PlugIn(self) on_next_receive('RECEIVE_DOCUMENT_ATTRIBUTES') elif mode == 'FAILURE': @@ -205,7 +198,7 @@ class NBCommonClient: if not self.Dispatcher.Stream.features: on_next_receive('RECEIVE_STREAM_FEATURES') else: - log.info('got STREAM FEATURES in first read') + log.info('got STREAM FEATURES in first recv') self._xmpp_connect_machine(mode='STREAM_STARTED') else: @@ -222,7 +215,7 @@ class NBCommonClient: mode='FAILURE', data='Missing in 1.0 stream') else: - log.info('got STREAM FEATURES in second read') + log.info('got STREAM FEATURES in second recv') self._xmpp_connect_machine(mode='STREAM_STARTED') elif mode == 'STREAM_STARTED': @@ -244,7 +237,7 @@ class NBCommonClient: self.on_connect(self, self.connected) def raise_event(self, event_type, data): - log.info('raising event from transport: %s %s' % (event_type,data)) + log.info('raising event from transport: :::::%s::::\n_____________\n%s\n_____________\n' % (event_type,data)) if hasattr(self, 'Dispatcher'): self.Dispatcher.Event('', event_type, data) @@ -272,8 +265,9 @@ class NBCommonClient: ''' get the ip address of the account, from which is made connection to the server , (e.g. me). We will create listening socket on the same ip ''' - if hasattr(self, 'Connection'): - return self.Connection._sock.getsockname() + # FIXME: tuple (ip, port) is expected (and checked for) but port num is useless + if hasattr(self, 'socket'): + return self.socket.peerhost def auth(self, user, password, resource = '', sasl = 1, on_auth = None): @@ -364,6 +358,7 @@ class NonBlockingClient(NBCommonClient): def __init__(self, domain, idlequeue, caller=None): NBCommonClient.__init__(self, domain, idlequeue, caller) + self.protocol_type = 'XMPP' def connect(self, on_connect, on_connect_failure, hostname=None, port=5222, on_proxy_failure=None, proxy=None, secure=None): @@ -379,35 +374,33 @@ class NonBlockingClient(NBCommonClient): if proxy: # with proxies, client connects to proxy instead of directly to # XMPP server ((hostname, port)) - # tcp_host is machine used for socket connection - tcp_host=proxy['host'] - tcp_port=proxy['port'] + # tcp_host is hostname of machine used for socket connection + # (DNS request will be done for this hostname) + tcp_host, tcp_port, proxy_user, proxy_pass = \ + transports_nb.get_proxy_data_from_dict(proxy) + self._on_tcp_failure = self.on_proxy_failure - if proxy.has_key('type'): - assert(proxy['type']!='bosh') - if proxy.has_key('user') and proxy.has_key('pass'): - proxy_creds=(proxy['user'],proxy['pass']) - else: - proxy_creds=(None, None) - - type_ = proxy['type'] - if type_ == 'socks5': - # SOCKS5 proxy - self.socket = transports_nb.NBSOCKS5ProxySocket( + + if proxy['type'] == 'bosh': + self.socket = bosh.NonBlockingBOSH( on_disconnect=self.on_disconnect, - proxy_creds=proxy_creds, - xmpp_server=(xmpp_hostname, self.Port)) - elif type_ == 'http': - # HTTP CONNECT to proxy - self.socket = transports_nb.NBHTTPProxySocket( - on_disconnect=self.on_disconnect, - proxy_creds=proxy_creds, - xmpp_server=(xmpp_hostname, self.Port)) + raise_event = self.raise_event, + idlequeue = self.idlequeue, + xmpp_server=(xmpp_hostname, self.Port), + domain = self.Server, + bosh_dict = proxy) + self.protocol_type = 'BOSH' + else: - # HTTP CONNECT to proxy from environment variables - self.socket = transports_nb.NBHTTPProxySocket( + if proxy['type'] == 'socks5': + proxy_class = transports_nb.NBSOCKS5ProxySocket + elif proxy['type'] == 'http': + proxy_class = transports_nb.NBHTTPProxySocket + self.socket = proxy_class( on_disconnect=self.on_disconnect, - proxy_creds=(None, None), + raise_event = self.raise_event, + idlequeue = self.idlequeue, + proxy_creds=(proxy_user, proxy_pass), xmpp_server=(xmpp_hostname, self.Port)) else: self._on_tcp_failure = self._on_connect_failure @@ -415,6 +408,7 @@ class NonBlockingClient(NBCommonClient): tcp_port=self.Port self.socket = transports_nb.NonBlockingTCP( raise_event = self.raise_event, + idlequeue = self.idlequeue, on_disconnect = self.on_disconnect) self.socket.PlugIn(self) diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index 1c6f04a8b..818ae2790 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -42,8 +42,6 @@ XML_DECLARATION = '' # FIXME: ugly -from client_nb import NonBlockingClient -from bosh import BOSHClient class Dispatcher(): # Why is this here - I needed to redefine Dispatcher for BOSH and easiest way # was to inherit original Dispatcher (now renamed to XMPPDispatcher). Trouble @@ -53,9 +51,9 @@ class Dispatcher(): # If having two kinds of dispatcher will go well, I will rewrite the def PlugIn(self, client_obj, after_SASL=False): - if isinstance(client_obj, NonBlockingClient): + if client_obj.protocol_type == 'XMPP': XMPPDispatcher().PlugIn(client_obj) - elif isinstance(client_obj, BOSHClient): + elif client_obj.protocol_type == 'BOSH': BOSHDispatcher().PlugIn(client_obj, after_SASL) @@ -76,8 +74,8 @@ class XMPPDispatcher(PlugIn): self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler, \ self.RegisterEventHandler, self.UnregisterCycleHandler, self.RegisterCycleHandler, \ self.RegisterHandlerOnce, self.UnregisterHandler, self.RegisterProtocol, \ - self.SendAndWaitForResponse, self.assign_id, self.StreamTerminate, \ - self.SendAndCallForResponse, self.getAnID, self.Event] + self.SendAndWaitForResponse, self.StreamTerminate, \ + self.SendAndCallForResponse, self.getAnID, self.Event, self.send] def getAnID(self): global ID @@ -112,10 +110,7 @@ class XMPPDispatcher(PlugIn): self._owner.lastErrNode = None self._owner.lastErr = None self._owner.lastErrCode = None - if hasattr(self._owner, 'StreamInit'): - self._owner.StreamInit() - else: - self.StreamInit() + self.StreamInit() def plugout(self): ''' Prepares instance to be destructed. ''' @@ -165,6 +160,7 @@ class XMPPDispatcher(PlugIn): self.Stream.Parse(data) # end stream:stream tag received if self.Stream and self.Stream.has_received_endtag(): + # FIXME call client method self._owner.Connection.disconnect() return 0 except ExpatError: @@ -414,25 +410,19 @@ class XMPPDispatcher(PlugIn): ''' Put stanza on the wire and call back when recipient replies. Additional callback arguments can be specified in args. ''' self.SendAndWaitForResponse(stanza, 0, func, args) - - def assign_id(self, stanza): - ''' Assign an unique ID to stanza and return assigned ID.''' - if type(stanza) in [type(''), type(u'')]: - return (None, stanza) - if not isinstance(stanza, Protocol): - _ID=None - elif not stanza.getID(): - global ID - ID+=1 - _ID=`ID` - stanza.setID(_ID) - else: - _ID=stanza.getID() - if self._owner._registered_name and not stanza.getAttr('from'): - stanza.setAttr('from', self._owner._registered_name) - stanza.setNamespace(self._owner.Namespace) - stanza.setParent(self._metastream) - return (_ID, stanza) + + def send(self, stanza, now=False): + id = None + if type(stanza) not in [type(''), type(u'')]: + if isinstance(stanza, Protocol): + id = stanza.getID() + if id is None: + stanza.setID(self.getAnID()) + id = stanza.getID() + if self._owner._registered_name and not stanza.getAttr('from'): + stanza.setAttr('from', self._owner._registered_name) + self._owner.Connection.send(stanza, now) + return id class BOSHDispatcher(XMPPDispatcher): @@ -458,12 +448,16 @@ class BOSHDispatcher(XMPPDispatcher): locale.getdefaultlocale()[0].split('_')[0]) self.restart = True - self._owner.Connection.send(self._owner.get_initial_bodytag(self.after_SASL)) + if self.after_SASL: + self._owner.Connection.send_http(self._owner.Connection.get_after_SASL_bodytag()) + else: + self._owner.Connection.send_http(self._owner.Connection.get_initial_bodytag()) + def StreamTerminate(self): ''' Send a stream terminator. ''' - self._owner.Connection.send(self._owner.get_closing_bodytag()) + self._owner.Connection.send_http(self._owner.Connection.get_closing_bodytag()) def ProcessNonBlocking(self, data=None): @@ -478,10 +472,31 @@ class BOSHDispatcher(XMPPDispatcher): def dispatch(self, stanza, session=None, direct=0): if stanza.getName()=='body' and stanza.getNamespace()==NS_HTTP_BIND: - self._owner.on_bodytag_attrs(stanza.getAttrs()) - #self._owner.send_empty_bodytag() - for child in stanza.getChildren(): - XMPPDispatcher.dispatch(self, child, session, direct) + + stanza_attrs = stanza.getAttrs() + + if stanza_attrs.has_key('authid'): + # should be only in init response + # auth module expects id of stream in document attributes + self.Stream._document_attrs['id'] = stanza_attrs['authid'] + + if stanza_attrs.has_key('sid'): + # session ID should be only in init response + self._owner.Connection.bosh_sid = stanza_attrs['sid'] + + if stanza_attrs.has_key('terminate'): + # staznas under body still should be passed to XMPP dispatcher + self._owner.on_disconnect() + + if stanza_attrs.has_key('error'): + # recoverable error + pass + + children = stanza.getChildren() + + if children: + for child in children: + XMPPDispatcher.dispatch(self, child, session, direct) else: XMPPDispatcher.dispatch(self, stanza, session, direct) diff --git a/src/common/xmpp/idlequeue.py b/src/common/xmpp/idlequeue.py index 2ca1b0bd3..66b40299f 100644 --- a/src/common/xmpp/idlequeue.py +++ b/src/common/xmpp/idlequeue.py @@ -15,6 +15,7 @@ import select import logging log = logging.getLogger('gajim.c.x.idlequeue') +log.setLevel(logging.DEBUG) class IdleObject: ''' base class for all idle listeners, these are the methods, which are called from IdleQueue @@ -36,7 +37,7 @@ class IdleObject: pass def read_timeout(self): - ''' called when timeout has happend ''' + ''' called when timeout happened ''' pass class IdleQueue: @@ -55,7 +56,8 @@ class IdleQueue: self.selector = select.poll() def remove_timeout(self, fd): - log.debug('read timeout removed for fd %s' % fd) + #log.debug('read timeout removed for fd %s' % fd) + print 'read timeout removed for fd %s' % fd if self.read_timeouts.has_key(fd): del(self.read_timeouts[fd]) @@ -71,11 +73,13 @@ class IdleQueue: def set_read_timeout(self, fd, seconds): ''' set a new timeout, if it is not removed after 'seconds', then obj.read_timeout() will be called ''' - log.debug('read timeout set for fd %s on %s seconds' % (fd, seconds)) + #log.debug('read timeout set for fd %s on %s seconds' % (fd, seconds)) + print 'read timeout set for fd %s on %s seconds' % (fd, seconds) timeout = self.current_time() + seconds self.read_timeouts[fd] = timeout def check_time_events(self): + print 'check time evs' current_time = self.current_time() for fd, timeout in self.read_timeouts.items(): if timeout > current_time: @@ -134,6 +138,7 @@ class IdleQueue: return False if flags & 3: # waiting read event + #print 'waiting read on %d, flags are %d' % (fd, flags) obj.pollin() return True diff --git a/src/common/xmpp/protocol.py b/src/common/xmpp/protocol.py index 797a8e9af..c80870a33 100644 --- a/src/common/xmpp/protocol.py +++ b/src/common/xmpp/protocol.py @@ -300,6 +300,13 @@ class JID: """ Produce hash of the JID, Allows to use JID objects as keys of the dictionary. """ return hash(self.__str__()) +class BOSHBody(Node): + ''' + tag that wraps usual XMPP stanzas in XMPP over BOSH + ''' + def __init__(self, attrs={}, payload=[], node=None): + Node.__init__(self, tag='body', attrs=attrs, payload=payload, node=node) + self.setNamespace(NS_HTTP_BIND) class Protocol(Node): @@ -400,13 +407,6 @@ class Protocol(Node): if item in ['to','from']: val=JID(val) return self.setAttr(item,val) -class BOSHBody(Protocol): - ''' - tag that wraps usual XMPP stanzas in BOSH - ''' - def __init__(self, to=None, frm=None, attrs={}, payload=[], node=None): - Protocol.__init__(self, name='body', to=to, frm=frm, attrs=attrs, - payload=payload, xmlns=NS_HTTP_BIND, node=node) class Message(Protocol): """ XMPP Message stanza - "push" mechanism.""" diff --git a/src/common/xmpp/transports_nb.py b/src/common/xmpp/transports_nb.py index a4e35656f..e30a8aa90 100644 --- a/src/common/xmpp/transports_nb.py +++ b/src/common/xmpp/transports_nb.py @@ -3,7 +3,7 @@ ## ## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov ## modified by Dimitur Kirov -## modified by Dimitur Kirov +## modified by Tomas Karasek ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by @@ -45,6 +45,34 @@ def urisplit(uri): proto, host, path = grouped[1], grouped[3], grouped[4] return proto, host, path +def get_proxy_data_from_dict(proxy): + type = proxy['type'] + # with http-connect/socks5 proxy, we do tcp connecting to the proxy machine + tcp_host, tcp_port = proxy['host'], proxy['port'] + if type == 'bosh': + # in ['host'] is whole URI + tcp_host = urisplit(proxy['host'])[1] + # in BOSH, client connects to Connection Manager instead of directly to + # XMPP server ((hostname, port)). If HTTP Proxy is specified, client connects + # to HTTP proxy and Connection Manager is specified at URI and Host header + # in HTTP message + if proxy.has_key('proxy_host') and proxy.has_key('proxy_port'): + tcp_host, tcp_port = proxy['proxy_host'], proxy['proxy_port'] + + # user and pass for socks5/http_connect proxy. In case of BOSH, it's user and + # pass for http proxy - If there's no proxy_host they won't be used + if proxy.has_key('user'): + proxy_user = proxy['user'] + else: + proxy_user = None + if proxy.has_key('pass'): + proxy_pass = proxy['pass'] + else: + proxy_pass = None + return tcp_host, tcp_port, proxy_user, proxy_pass + + + # timeout to connect to the server socket, it doesn't include auth CONNECT_TIMEOUT_SECONDS = 30 @@ -63,62 +91,72 @@ DATA_SENT='DATA SENT' DISCONNECTED ='DISCONNECTED' CONNECTING ='CONNECTING' CONNECTED ='CONNECTED' -DISCONNECTING ='DISCONNECTING' - - +# transports have different constructor and same connect class NonBlockingTransport(PlugIn): - def __init__(self, raise_event, on_disconnect): + def __init__(self, raise_event, on_disconnect, idlequeue): PlugIn.__init__(self) self.raise_event = raise_event self.on_disconnect = on_disconnect self.on_connect = None self.on_connect_failure = None - self.idlequeue = None + self.idlequeue = idlequeue self.on_receive = None self.server = None self.port = None self.state = DISCONNECTED - self._exported_methods=[self.disconnect, self.onreceive] + self._exported_methods=[self.disconnect, self.onreceive, self.set_send_timeout, + self.set_timeout, self.remove_timeout] + + # time to wait for SOME stanza to come and then send keepalive + self.sendtimeout = 0 + + # in case we want to something different than sending keepalives + self.on_timeout = None def plugin(self, owner): owner.Connection=self - self.idlequeue = owner.idlequeue - def plugout(self): self._owner.Connection = None self._owner = None def connect(self, conn_5tuple, on_connect, on_connect_failure): + ''' + connect method should have the same declaration in all derived transports + + ''' + assert(self.state == DISCONNECTED) self.on_connect = on_connect self.on_connect_failure = on_connect_failure (self.server, self.port) = conn_5tuple[4][:2] - log.info('NonBlocking Connect :: About tot connect to %s:%s' % (self.server, self.port)) + self.conn_5tuple = conn_5tuple + log.info('NonBlocking Connect :: About to connect to %s:%s' % (self.server, self.port)) def set_state(self, newstate): - assert(newstate in [DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING]) - if (self.state, newstate) in [(CONNECTING, DISCONNECTING), (DISCONNECTED, DISCONNECTING)]: - log.info('strange move: %s -> %s' % (self.state, newstate)) + assert(newstate in [DISCONNECTED, CONNECTING, CONNECTED]) self.state = newstate def _on_connect(self, data): ''' preceeds call of on_connect callback ''' + # data is reference to socket wrapper instance. We don't need it in client + # because + self.peerhost = data._sock.getsockname() self.set_state(CONNECTED) self.on_connect() def _on_connect_failure(self,err_message): ''' preceeds call of on_connect_failure callback ''' - # In case of error while connecting we need to close socket + # In case of error while connecting we need to disconnect transport # but we don't want to call DisconnectHandlers from client, # thus the do_callback=False self.disconnect(do_callback=False) self.on_connect_failure(err_message=err_message) def send(self, raw_data, now=False): - if self.state not in [CONNECTED, DISCONNECTING]: + if self.state not in [CONNECTED]: # FIXME better handling needed log.error('Trying to send %s when transport is %s.' % (raw_data, self.state)) @@ -139,24 +177,49 @@ class NonBlockingTransport(PlugIn): else: self.on_receive = None return - log.info('setting onreceive on %s' % recv_handler) self.on_receive = recv_handler def tcp_connection_started(self): self.set_state(CONNECTING) # on_connect/on_conn_failure will be called from self.pollin/self.pollout + def read_timeout(self): + if self.on_timeout: + self.on_timeout() + self.renew_send_timeout() + + def renew_send_timeout(self): + if self.on_timeout and self.sendtimeout > 0: + self.set_timeout(self.sendtimeout) + else: + self.remove_timeout() + + def set_timeout(self, timeout): + self.idlequeue.set_read_timeout(self.get_fd(), timeout) + + def get_fd(self): + pass + + def remove_timeout(self): + self.idlequeue.remove_timeout(self.get_fd()) + + def set_send_timeout(self, timeout, on_timeout): + self.sendtimeout = timeout + if self.sendtimeout > 0: + self.on_timeout = on_timeout + else: + self.on_timeout = None class NonBlockingTCP(NonBlockingTransport, IdleObject): ''' Non-blocking TCP socket wrapper ''' - def __init__(self, raise_event, on_disconnect): + def __init__(self, raise_event, on_disconnect, idlequeue): ''' Class constructor. ''' - NonBlockingTransport.__init__(self, raise_event, on_disconnect) + NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue) # writable, readable - keep state of the last pluged flags # This prevents replug of same object with the same flags self.writable = True @@ -165,23 +228,16 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): # queue with messages to be send self.sendqueue = [] - # time to wait for SOME stanza to come and then send keepalive - self.sendtimeout = 0 - - # in case we want to something different than sending keepalives - self.on_timeout = None - # bytes remained from the last send message self.sendbuff = '' - self._exported_methods=[self.disconnect, self.onreceive, self.set_send_timeout, - self.set_timeout, self.remove_timeout] def get_fd(self): try: tmp = self._sock.fileno() return tmp - except: + except socket.error, (errnum, errstr): + log.error('Trying to get file descriptor of not-connected socket: %s' % errstr ) return 0 def connect(self, conn_5tuple, on_connect, on_connect_failure): @@ -205,6 +261,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self._recv = self._sock.recv self.fd = self._sock.fileno() self.idlequeue.plug_idle(self, True, False) + self.peerhost = None errnum = 0 ''' variable for errno symbol that will be found from exception raised from connect() ''' @@ -221,11 +278,11 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): # connecting in progress - log.info('After connect. "%s" raised => CONNECTING' % errstr) + log.info('After NB connect() of %s. "%s" raised => CONNECTING' % (id(self),errstr)) self.tcp_connection_started() return elif errnum in (0, 10056, errno.EISCONN): - # already connected - this branch is very unlikely, nonblocking connect() will + # already connected - this branch is probably useless, nonblocking connect() will # return EINPROGRESS exception in most cases. When here, we don't need timeout # on connected descriptor and success callback can be called. log.info('After connect. "%s" raised => CONNECTED' % errstr) @@ -240,6 +297,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): def _on_connect(self, data): ''' with TCP socket, we have to remove send-timeout ''' self.idlequeue.remove_timeout(self.get_fd()) + NonBlockingTransport._on_connect(self, data) @@ -253,6 +311,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): log.info('pollout called, state == %s' % self.state) if self.state==CONNECTING: + log.info('%s socket wrapper connected' % id(self)) self._on_connect(self) return self._do_send() @@ -288,30 +347,17 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): self._on_connect_failure('Error during connect to %s:%s' % (self.server, self.port)) else: - if self.on_timeout: - self.on_timeout() - self.renew_send_timeout() + NonBlockingTransport.read_timeout(self) - def renew_send_timeout(self): - if self.on_timeout and self.sendtimeout > 0: - self.set_timeout(self.sendtimeout) - else: - self.remove_timeout() - def set_send_timeout(self, timeout, on_timeout): - self.sendtimeout = timeout - if self.sendtimeout > 0: - self.on_timeout = on_timeout - else: - self.on_timeout = None def set_timeout(self, timeout): if self.state in [CONNECTING, CONNECTED] and self.get_fd() > 0: - self.idlequeue.set_read_timeout(self.get_fd(), timeout) + NonBlockingTransport.set_timeout(self, timeout) def remove_timeout(self): if self.get_fd(): - self.idlequeue.remove_timeout(self.get_fd()) + NonBlockingTransport.remove_timeout(self) def send(self, raw_data, now=False): '''Append raw_data to the queue of messages to be send. @@ -415,46 +461,50 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): else: # This should never happen, so we need the debug. (If there is no handler # on receive spacified, data are passed to Dispatcher.ProcessNonBlocking) - log.error('SOCKET Unhandled data received: %s' % received) + log.error('SOCKET %s Unhandled data received: %s' % (id(self), received)) + import traceback + traceback.print_stack() self.disconnect() def _on_receive(self,data): - '''Preceeds passing received data to Client class. Gets rid of HTTP headers - and checks them.''' + ''' preceeds on_receive callback. It peels off and checks HTTP headers in + class, in here it just calls the callback.''' self.on_receive(data) - class NonBlockingHTTP(NonBlockingTCP): ''' Socket wrapper that cretes HTTP message out of sent data and peels-off HTTP headers from incoming messages ''' - def __init__(self, raise_event, on_disconnect, http_uri, http_port, http_version=None): + def __init__(self, raise_event, on_disconnect, idlequeue, on_http_request_possible, + http_uri, http_port, http_version='HTTP/1.1'): + self.http_protocol, self.http_host, self.http_path = urisplit(http_uri) if self.http_protocol is None: self.http_protocol = 'http' if self.http_path == '': http_path = '/' self.http_port = http_port - if http_version: - self.http_version = http_version - else: - self.http_version = 'HTTP/1.1' + self.http_version = http_version # buffer for partial responses self.recvbuff = '' self.expected_length = 0 - NonBlockingTCP.__init__(self, raise_event, on_disconnect) + self.pending_requests = 0 + self.on_http_request_possible = on_http_request_possible + NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue) def send(self, raw_data, now=False): NonBlockingTCP.send( self, self.build_http_message(raw_data), now) + self.pending_requests += 1 + def _on_receive(self,data): - '''Preceeds passing received data to Client class. Gets rid of HTTP headers + '''Preceeds passing received data to owner class. Gets rid of HTTP headers and checks them.''' if not self.recvbuff: # recvbuff empty - fresh HTTP message was received @@ -470,7 +520,8 @@ class NonBlockingHTTP(NonBlockingTCP): if self.expected_length > len(self.recvbuff): # If we haven't received the whole HTTP mess yet, let's end the thread. - # It will be finnished from one of following poll calls on plugged socket. + # It will be finnished from one of following polls (io_watch) on plugged socket. + log.info('not enough bytes - %d expected, %d got' % (self.expected_length, len(self.recvbuff))) return # FIXME the reassembling doesn't work - Connection Manager on jabbim.cz @@ -481,8 +532,13 @@ class NonBlockingHTTP(NonBlockingTCP): self.recvbuff='' self.expected_length=0 + self.pending_requests -= 1 + assert(self.pending_requests >= 0) + # not-persistent connections + self.disconnect(do_callback = False) self.on_receive(httpbody) - + self.on_http_request_possible() + def build_http_message(self, httpbody, method='POST'): ''' @@ -512,7 +568,7 @@ class NonBlockingHTTP(NonBlockingTCP): message = message.replace('\r','') (header, httpbody) = message.split('\n\n',1) header = header.split('\n') - statusline = header[0].split(' ') + statusline = header[0].split(' ',2) header = header[1:] headers = {} for dummy in header: @@ -521,16 +577,16 @@ class NonBlockingHTTP(NonBlockingTCP): return (statusline, headers, httpbody) - class NBProxySocket(NonBlockingTCP): ''' Interface for proxy socket wrappers - when tunnneling XMPP over proxies, some connecting process usually has to be done before opening stream. ''' - def __init__(self, raise_event, on_disconnect, xmpp_server, proxy_creds=(None,None)): + def __init__(self, raise_event, on_disconnect, idlequeue, xmpp_server, + proxy_creds=(None,None)): self.proxy_user, self.proxy_pass = proxy_creds self.xmpp_server = xmpp_server - NonBlockingTCP.__init__(self, raise_event, on_disconnect) + NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue) def connect(self, conn_5tuple, on_connect, on_connect_failure): @@ -552,7 +608,6 @@ class NBProxySocket(NonBlockingTCP): pass - class NBHTTPProxySocket(NBProxySocket): ''' This class can be used instead of NonBlockingTCP HTTP (CONNECT) proxy connection class. Allows to use HTTP proxies like squid with diff --git a/test/test_nonblockingtcp.py b/test/test_nonblockingtcp.py index 7987d3278..cca0b0a26 100644 --- a/test/test_nonblockingtcp.py +++ b/test/test_nonblockingtcp.py @@ -1,5 +1,5 @@ ''' -Unit test for NonBlockingTcp tranport. +Unit test for NonBlockingTCP tranport. ''' import unittest @@ -38,7 +38,7 @@ class MockClient(IdleMock): IdleMock.__init__(self) def do_connect(self): - self.socket=transports_nb.NonBlockingTcp( + self.socket=transports_nb.NonBlockingTCP( on_disconnect=lambda: self.on_success(mode='SocketDisconnect') ) @@ -73,7 +73,7 @@ class MockClient(IdleMock): -class TestNonBlockingTcp(unittest.TestCase): +class TestNonBlockingTCP(unittest.TestCase): def setUp(self): self.idlequeue_thread = IdleQueueThread() self.idlequeue_thread.start() @@ -100,6 +100,6 @@ class TestNonBlockingTcp(unittest.TestCase): if __name__ == '__main__': - suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingTcp) + suite = unittest.TestLoader().loadTestsFromTestCase(TestNonBlockingTCP) unittest.TextTestRunner(verbosity=2).run(suite)