diff --git a/data/glade/manage_proxies_window.glade b/data/glade/manage_proxies_window.glade index b28beef97..9580b12e3 100644 --- a/data/glade/manage_proxies_window.glade +++ b/data/glade/manage_proxies_window.glade @@ -235,6 +235,7 @@ BOSH 0 True + False @@ -305,7 +306,7 @@ BOSH True False - 5 + 8 2 False 6 @@ -314,7 +315,7 @@ BOSH True - _Port: + Proxy _Port: True False GTK_JUSTIFY_LEFT @@ -333,8 +334,8 @@ BOSH 0 1 - 1 - 2 + 4 + 5 fill @@ -349,14 +350,15 @@ BOSH 0 True + False 1 2 - 0 - 1 + 3 + 4 @@ -370,14 +372,15 @@ BOSH 0 True + False 1 2 - 1 - 2 + 4 + 5 @@ -385,7 +388,7 @@ BOSH True - _Host: + Proxy _Host: True False GTK_JUSTIFY_LEFT @@ -404,8 +407,8 @@ BOSH 0 1 - 0 - 1 + 3 + 4 fill @@ -433,8 +436,8 @@ BOSH 0 1 - 4 - 5 + 7 + 8 fill @@ -462,8 +465,8 @@ BOSH 0 1 - 3 - 4 + 6 + 7 fill @@ -478,14 +481,15 @@ BOSH 0 True + False 1 2 - 4 - 5 + 7 + 8 @@ -499,14 +503,15 @@ BOSH 0 True + False 1 2 - 3 - 4 + 6 + 7 @@ -515,7 +520,7 @@ BOSH True True - Use authentication + Use proxy authentication True GTK_RELIEF_NORMAL True @@ -524,6 +529,80 @@ BOSH True + + 0 + 2 + 5 + 6 + fill + + + + + + + True + _BOSH URL: + True + False + GTK_JUSTIFY_LEFT + False + False + 0 + 0.5 + 0 + 0 + proxyhost_entry + PANGO_ELLIPSIZE_NONE + -1 + False + 0 + + + 0 + 1 + 0 + 1 + fill + + + + + + + True + True + True + True + 0 + + True + + False + + + + 1 + 2 + 0 + 1 + + + + + + + True + True + Use HTTP proxy + True + GTK_RELIEF_NORMAL + True + False + False + True + + 0 2 @@ -533,6 +612,57 @@ BOSH + + + + True + B_OSH Port: + True + False + GTK_JUSTIFY_LEFT + False + False + 0 + 0.5 + 0 + 0 + proxyhost_entry + PANGO_ELLIPSIZE_NONE + -1 + False + 0 + + + 0 + 1 + 1 + 2 + fill + + + + + + + True + True + True + True + 0 + + True + + False + + + + 1 + 2 + 1 + 2 + + + diff --git a/src/common/config.py b/src/common/config.py index 9b950705a..4a52f9eae 100644 --- a/src/common/config.py +++ b/src/common/config.py @@ -337,8 +337,17 @@ class Config: 'type': [ opt_str, 'http' ], 'host': [ opt_str, '' ], 'port': [ opt_int, 3128 ], + 'useauth': [ opt_bool, False ], 'user': [ opt_str, '' ], 'pass': [ opt_str, '' ], + 'bosh_uri': [ opt_str, '' ], + 'bosh_port': [ opt_int, 80 ], + 'bosh_useproxy': [ opt_bool, False ], + 'bosh_wait': [ opt_int, 30 ], + 'bosh_hold': [ opt_int, 2 ], + 'bosh_content': [ opt_str, 'text/xml; charset=utf-8' ], + 'bosh_http_pipelining': [ opt_bool, False ], + 'bosh_wait_for_restart_response': [ opt_bool, False ], }, {}), 'themes': ({ 'accounttextcolor': [ opt_color, 'black', '', True ], diff --git a/src/common/connection.py b/src/common/connection.py index 08d7bcaae..7df8b4a71 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -428,11 +428,11 @@ class Connection(ConnectionHandlers): # create connection if it doesn't already exist self.connected = 1 if p and p in gajim.config.get_per('proxies'): - proxy = {'host': gajim.config.get_per('proxies', p, 'host')} - proxy['port'] = gajim.config.get_per('proxies', p, 'port') - proxy['user'] = gajim.config.get_per('proxies', p, 'user') - proxy['password'] = gajim.config.get_per('proxies', p, 'pass') - proxy['type'] = gajim.config.get_per('proxies', p, 'type') + proxy = {} + proxyptr = gajim.config.get_per('proxies',p) + for key in proxyptr.keys(): proxy[key]=proxyptr[key][1] + print proxy + elif gajim.config.get_per('accounts', self.name, 'use_env_http_proxy'): try: try: @@ -546,11 +546,11 @@ class Connection(ConnectionHandlers): con.RegisterDisconnectHandler(self._on_new_account) # FIXME: BOSH properties should be loaded from config - if self._proxy and self._proxy['type'] == 'bosh': - self._proxy['bosh_hold'] = '1' - self._proxy['bosh_wait'] = '60' - self._proxy['bosh_content'] = 'text/xml; charset=utf-8' - self._proxy['wait_for_restart_response'] = False + #if self._proxy and self._proxy['type'] == 'bosh': + # self._proxy['bosh_hold'] = '2' + # self._proxy['bosh_wait'] = '10' + # self._proxy['bosh_content'] = 'text/xml; charset=utf-8' + # self._proxy['wait_for_restart_response'] = False log.info('Connecting to %s: [%s:%d]', self.name, @@ -1003,7 +1003,7 @@ class Connection(ConnectionHandlers): self.connection.RegisterDisconnectHandler(self._on_disconnected) self.connection.send(p, now=True) - self.connection.StreamTerminate() + self.connection.start_disconnect() #self.connection.start_disconnect(p, self._on_disconnected) else: self.time_to_reconnect = None diff --git a/src/common/xmpp/bosh.py b/src/common/xmpp/bosh.py index 4ce71cd68..c29fa7294 100644 --- a/src/common/xmpp/bosh.py +++ b/src/common/xmpp/bosh.py @@ -1,16 +1,20 @@ import locale, random -from transports_nb import NonBlockingTransport, NonBlockingHTTP, CONNECTED, CONNECTING, DISCONNECTED +from transports_nb import NonBlockingTransport, NonBlockingHTTPBOSH,\ + CONNECTED, CONNECTING, DISCONNECTED, DISCONNECTING,\ + urisplit from protocol import BOSHBody from simplexml import Node +import sha import logging log = logging.getLogger('gajim.c.x.bosh') +KEY_COUNT = 10 FAKE_DESCRIPTOR = -1337 '''Fake file descriptor - it's used for setting read_timeout in idlequeue for -BOSH Transport. Timeouts in queue are saved by socket descriptor. +BOSH Transport. In TCP-derived transports it is file descriptor of socket''' @@ -19,12 +23,6 @@ class NonBlockingBOSH(NonBlockingTransport): bosh_dict): NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue) - # with 50-bit random initial rid, session would have to go up - # to 7881299347898368 messages to raise rid over 2**53 - # (see http://www.xmpp.org/extensions/xep-0124.html#rids) - r = random.Random() - r.seed() - self.bosh_rid = r.getrandbits(50) self.bosh_sid = None if locale.getdefaultlocale()[0]: self.bosh_xml_lang = locale.getdefaultlocale()[0].split('_')[0] @@ -33,25 +31,30 @@ class NonBlockingBOSH(NonBlockingTransport): self.http_version = 'HTTP/1.1' self.http_persistent = True - self.http_pipelining = False + self.http_pipelining = bosh_dict['bosh_http_pipelining'] self.bosh_to = domain self.route_host, self.route_port = xmpp_server self.bosh_wait = bosh_dict['bosh_wait'] self.bosh_hold = bosh_dict['bosh_hold'] - self.bosh_host = bosh_dict['host'] - self.bosh_port = bosh_dict['port'] + self.bosh_requests = self.bosh_hold + self.bosh_uri = bosh_dict['bosh_uri'] + self.bosh_port = bosh_dict['bosh_port'] self.bosh_content = bosh_dict['bosh_content'] + self.wait_cb_time = None self.http_socks = [] - self.stanzas_to_send = [] - self.prio_bosh_stanza = None + self.stanza_buffer = [] + self.prio_bosh_stanzas = [] self.current_recv_handler = None + self.current_recv_socket = None + self.key_stack = None + self.ack_checker = None + self.after_init = False # if proxy_host .. do sth about HTTP proxy etc. - def connect(self, conn_5tuple, on_connect, on_connect_failure): NonBlockingTransport.connect(self, conn_5tuple, on_connect, on_connect_failure) @@ -59,14 +62,20 @@ class NonBlockingBOSH(NonBlockingTransport): FAKE_DESCRIPTOR = FAKE_DESCRIPTOR - 1 self.fd = FAKE_DESCRIPTOR - self.http_persistent = True - self.http_socks.append(self.get_http_socket()) + self.stanza_buffer = [] + self.prio_bosh_stanzas = [] + + self.key_stack = KeyStack(KEY_COUNT) + self.ack_checker = AckChecker() + self.after_init = True + + self.http_socks.append(self.get_new_http_socket()) self.tcp_connection_started() - # this connect() is not needed because sockets can be connected on send but - # we need to know if host is reachable in order to invoke callback for - # connecting failurei eventually (it's different than callback for errors - # occurring after connection is etabilished) + # following connect() is not necessary because sockets can be connected on + # send but we need to know if host is reachable in order to invoke callback + # for connecting failure eventually (the callback is different than callback + # for errors occurring after connection is etabilished) self.http_socks[0].connect( conn_5tuple = conn_5tuple, on_connect = lambda: self._on_connect(self.http_socks[0]), @@ -83,121 +92,209 @@ class NonBlockingBOSH(NonBlockingTransport): Called after HTTP response is received - another request is possible. There should be always one pending request on BOSH CM. ''' - log.info('on_http_req possible state:\n%s' % self.get_current_state()) - # if one of sockets is connecting, sth is about to be sent - # if there is a pending request, we shouldn't send another one + log.info('on_http_req possible, state:\n%s' % self.get_current_state()) + if self.state == DISCONNECTING: + self.disconnect() + return + self.send_BOSH(None) + + + def get_socket_in(self, state): for s in self.http_socks: - if s.state==CONNECTING or s.pending_requests>0: return - self.flush_stanzas() + if s.state==state: return s + return None - - def flush_stanzas(self): - # another to-be-locked candidate - log.info('flushing stanzas') - if self.prio_bosh_stanza: - tmp = self.prio_bosh_stanza - self.prio_bosh_stanza = None + def get_free_socket(self): + if self.http_pipelining: + assert( len(self.http_socks) == 1 ) + return self.get_socket_in(CONNECTED) else: - if self.stanzas_to_send: - tmp = self.stanzas_to_send.pop(0) - else: - tmp = [] - self.send_http(tmp) + last_recv_time, tmpsock = 0, None + for s in self.http_socks: + # we're interested only into CONNECTED socket with no req pending + if s.state==CONNECTED and s.pending_requests==0: + # if there's more of them, we want the one with less recent data receive + # (lowest last_recv_time) + if (last_recv_time==0) or (s.last_recv_time < last_recv_time): + last_recv_time = s.last_recv_time + tmpsock = s + if tmpsock: + return tmpsock + else: + return None + + + def send_BOSH(self, payload): + total_pending_reqs = sum([s.pending_requests for s in self.http_socks]) + + # when called after HTTP response when there are some pending requests and + # no data to send, we do nothing and disccard the payload + if payload is None and \ + total_pending_reqs > 0 and \ + self.stanza_buffer == [] and \ + self.prio_bosh_stanzas == [] or \ + self.state==DISCONNECTED: + return + + # now the payload is put to buffer and will be sent at some point + self.append_stanza(payload) + + # if we're about to make more requests than allowed, we don't send - stanzas will be + # sent after HTTP response from CM, exception is when we're disconnecting - then we + # send anyway + if total_pending_reqs >= self.bosh_requests and self.state!=DISCONNECTING: + log.warn('attemp to make more requests than allowed by Connection Manager:\n%s' % + self.get_current_state()) + return + + # when there's free CONNECTED socket, we flush the data + if self.get_free_socket(): + self.plug_socket() + return + + # if there is a connecting socket, we just wait for when it connects, + # payload will be sent in a sec when the socket connects + if self.get_socket_in(CONNECTING): return + + # being here means there are either DISCONNECTED sockets or all sockets are + # CONNECTED with too many pending requests + s = self.get_socket_in(DISCONNECTED) + + # if we have DISCONNECTED socket, lets connect it and ... + if s: + self.connect_and_flush(s) + else: + if len(self.http_socks) > 1: return + ss = self.get_new_http_socket() + self.http_socks.append(ss) + self.connect_and_flush(ss) + return + + def plug_socket(self): + stanza = None + s = self.get_free_socket() + if s: + s._plug_idle(writable=True, readable=True) + else: + log.error('=====!!!!!!!!====> Couldnt get free socket in plug_socket())') + + def build_stanza(self, socket): + if self.prio_bosh_stanzas: + stanza, add_payload = self.prio_bosh_stanzas.pop(0) + if add_payload: + stanza.setPayload(self.stanza_buffer) + self.stanza_buffer = [] + else: + stanza = self.boshify_stanzas(self.stanza_buffer) + self.stanza_buffer = [] + + stanza = self.ack_checker.backup_stanza(stanza, socket) + + key, newkey = self.key_stack.get() + if key: + stanza.setAttr('key', key) + if newkey: + stanza.setAttr('newkey', newkey) + + + log.info('sending msg with rid=%s to sock %s' % (stanza.getAttr('rid'), id(socket))) + socket.send(stanza) + self.renew_bosh_wait_timeout() + return stanza + + + def on_bosh_wait_timeout(self): + log.error('Connection Manager didn\'t respond within % seconds --> forcing \ + disconnect' % self.bosh_wait) + self.disconnect() + + + def renew_bosh_wait_timeout(self): + if self.wait_cb_time is not None: + self.remove_bosh_wait_timeout() + sched_time = self.idlequeue.set_alarm(self.on_bosh_wait_timeout, self.bosh_wait+10) + self.wait_cb_time = sched_time + + def remove_bosh_wait_timeout(self): + self.idlequeue.remove_alarm( + self.on_bosh_wait_timeout, + self.wait_cb_time) + + def on_persistent_fallback(self): + log.warn('Fallback to nonpersistent HTTP (no pipelining as well)') + self.http_persistent = False + self.http_pipelining = False + + + def handle_body_attrs(self, stanza_attrs): + self.remove_bosh_wait_timeout() + + if self.after_init: + self.after_init = False + if stanza_attrs.has_key('sid'): + # session ID should be only in init response + self.bosh_sid = stanza_attrs['sid'] + + if stanza_attrs.has_key('requests'): + #self.bosh_requests = int(stanza_attrs['requests']) + self.bosh_requests = int(stanza_attrs['wait']) + + if stanza_attrs.has_key('wait'): + self.bosh_wait = int(stanza_attrs['wait']) + + ack = None + if stanza_attrs.has_key('ack'): + ack = stanza_attrs['ack'] + self.ack_checker.process_incoming_ack(ack=ack, + socket=self.current_recv_socket) + + if stanza_attrs.has_key('type'): + if stanza_attrs['type'] in ['terminate', 'terminal']: + condition = 'n/a' + if stanza_attrs.has_key('condition'): + condition = stanza_attrs['condition'] + log.error('Received terminating stanza: %s - %s' % (condition, bosh_errors[condition])) + self.set_state(DISCONNECTING) + + if stanza_attrs['type'] == 'error': + # recoverable error + pass + return + + + def append_stanza(self, stanza): + if stanza: + if isinstance(stanza, tuple): + # tuple of BOSH stanza and True/False for whether to add payload + self.prio_bosh_stanzas.append(stanza) + else: + self.stanza_buffer.append(stanza) + def send(self, stanza, now=False): - # body tags should be send only via send_http() + # body tags should be send only via send_BOSH() assert(not isinstance(stanza, BOSHBody)) - self.send_http([stanza]) + self.send_BOSH(stanza) - def send_http(self, payload): - # "Protocol" and string/unicode stanzas should be sent via send() - # (only initiating and terminating BOSH stanzas should be send via send_http) - assert(isinstance(payload, list) or isinstance(payload, BOSHBody)) - log.warn('send_http: stanzas: %s\n%s' % (payload, self.get_current_state())) - - if isinstance(payload, list): - bosh_stanza = self.boshify_stanzas(payload) - else: - # bodytag_payload is , we don't boshify, only add the rid - bosh_stanza = payload - picked_sock = self.pick_socket() - if picked_sock: - log.info('sending to socket %s' % id(picked_sock)) - bosh_stanza.setAttr('rid', self.get_rid()) - picked_sock.send(bosh_stanza) - else: - # no socket was picked but one is about to connect - save the stanza and - # return - log.info('send_http: no free socket:\n%s' % self.get_current_state()) - if self.prio_bosh_stanza: - payload = self.merge_stanzas(payload, self.prio_bosh_stanza) - if payload is None: - # if we cant merge the stanzas (both are BOSH ), add the current to - # queue to be sent later - self.stanzas_to_send.append(bosh_stanza) - log.warn('in BOSH send_http - unable to send %s because %s\ - is already about to be sent' % (str(payload), str(self.prio_bosh_stanza))) - return - self.prio_bosh_stanza = payload - - def merge_stanzas(self, s1, s2): - if isinstance(s1, BOSHBody): - if isinstance(s2, BOSHBody): - # both are boshbodies - return - else: - s1.setPayload(s2, add=True) - return s1 - elif isinstance(s2, BOSHBody): - s2.setPayload(s1, add=True) - return s2 - else: - #both are lists - s1.extend(s2) - return s1 - def get_current_state(self): t = '------ SOCKET_ID\tSOCKET_STATE\tPENDING_REQS\n' for s in self.http_socks: t = '%s------ %s\t%s\t%s\n' % (t,id(s), s.state, s.pending_requests) - t = '%s------ prio stanza to send: %s, queued stanzas: %s' \ - % (t, self.prio_bosh_stanza, self.stanzas_to_send) + t = '%s------ prio stanzas: %s, queued XMPP stanzas: %s, not_acked stanzas: %s' \ + % (t, self.prio_bosh_stanzas, self.stanza_buffer, + self.ack_checker.get_not_acked_rids()) return t - def pick_socket(self): - # try to pick connected socket with no pending reqs - for s in self.http_socks: - if s.state == CONNECTED and s.pending_requests == 0: - return s - - # try to connect some disconnected socket - for s in self.http_socks: - if s.state==DISCONNECTED: - self.connect_and_flush(s) - return - - # if there is any just-connecting socket, it will send the data in its - # connect callback - for s in self.http_socks: - if s.state==CONNECTING: - return - # being here means there are only CONNECTED scokets with pending requests. - # Lets create and connect another one - if len(self.http_socks) < 2: - s = self.get_http_socket() - self.http_socks.append(s) - self.connect_and_flush(s) - return def connect_and_flush(self, socket): socket.connect( conn_5tuple = self.conn_5tuple, - on_connect = self.flush_stanzas, + on_connect = lambda :self.send_BOSH(None), on_connect_failure = self.disconnect) @@ -209,65 +306,163 @@ class NonBlockingBOSH(NonBlockingTransport): return tag - def get_initial_bodytag(self, after_SASL=False): - return BOSHBody( - attrs={'content': self.bosh_content, - 'hold': str(self.bosh_hold), - 'route': '%s:%s' % (self.route_host, self.route_port), - 'to': self.bosh_to, - 'wait': str(self.bosh_wait), - 'xml:lang': self.bosh_xml_lang, - 'xmpp:version': '1.0', - 'ver': '1.6', - 'xmlns:xmpp': 'urn:xmpp:xbosh'}) + def send_init(self, after_SASL=False): + if after_SASL: + t = BOSHBody( + attrs={ 'to': self.bosh_to, + 'sid': self.bosh_sid, + 'xml:lang': self.bosh_xml_lang, + 'xmpp:restart': 'true', + 'xmlns:xmpp': 'urn:xmpp:xbosh'}) + else: + t = BOSHBody( + attrs={ 'content': self.bosh_content, + 'hold': str(self.bosh_hold), + 'route': '%s:%s' % (self.route_host, self.route_port), + 'to': self.bosh_to, + 'wait': str(self.bosh_wait), + 'xml:lang': self.bosh_xml_lang, + 'xmpp:version': '1.0', + 'ver': '1.6', + 'xmlns:xmpp': 'urn:xmpp:xbosh'}) + self.send_BOSH((t,True)) - def get_after_SASL_bodytag(self): - return BOSHBody( - attrs={ 'to': self.bosh_to, - 'sid': self.bosh_sid, - 'xml:lang': self.bosh_xml_lang, - 'xmpp:restart': 'true', - 'xmlns:xmpp': 'urn:xmpp:xbosh'}) - - def get_closing_bodytag(self): - return BOSHBody(attrs={'sid': self.bosh_sid, 'type': 'terminate'}) - - def get_rid(self): - self.bosh_rid = self.bosh_rid + 1 - return str(self.bosh_rid) + def start_disconnect(self): + NonBlockingTransport.start_disconnect(self) + self.send_BOSH( + (BOSHBody(attrs={'sid': self.bosh_sid, 'type': 'terminate'}), True)) - def get_http_socket(self): - s = NonBlockingHTTP( + def get_new_http_socket(self): + s = NonBlockingHTTPBOSH( raise_event=self.raise_event, on_disconnect=self.disconnect, idlequeue = self.idlequeue, on_http_request_possible = self.on_http_request_possible, - http_uri = self.bosh_host, + http_uri = self.bosh_uri, http_port = self.bosh_port, http_version = self.http_version, - http_persistent = self.http_persistent) - if self.current_recv_handler: - s.onreceive(self.current_recv_handler) + http_persistent = self.http_persistent, + on_persistent_fallback = self.on_persistent_fallback) + s.onreceive(self.on_received_http) + s.set_stanza_build_cb(self.build_stanza) return s + def onreceive(self, recv_handler): if recv_handler is None: recv_handler = self._owner.Dispatcher.ProcessNonBlocking self.current_recv_handler = recv_handler - for s in self.http_socks: - s.onreceive(recv_handler) - def http_socket_disconnect(self, socket): - if self.http_persistent: - self.disconnect() + def on_received_http(self, data, socket): + self.current_recv_socket = socket + self.current_recv_handler(data) def disconnect(self, do_callback=True): + self.remove_bosh_wait_timeout() if self.state == DISCONNECTED: return self.fd = -1 for s in self.http_socks: s.disconnect(do_callback=False) NonBlockingTransport.disconnect(self, do_callback) + +def get_rand_number(): + # with 50-bit random initial rid, session would have to go up + # to 7881299347898368 messages to raise rid over 2**53 + # (see http://www.xmpp.org/extensions/xep-0124.html#rids) + # it's also used for sequence key initialization + r = random.Random() + r.seed() + return r.getrandbits(50) + + + +class AckChecker(): + def __init__(self): + self.rid = get_rand_number() + self.ack = 1 + self.last_rids = {} + self.not_acked = [] + + + def get_not_acked_rids(self): return [rid for rid, st in self.not_acked] + + def backup_stanza(self, stanza, socket): + socket.pending_requests += 1 + rid = self.get_rid() + self.not_acked.append((rid, stanza)) + stanza.setAttr('rid', str(rid)) + self.last_rids[socket]=rid + + if self.rid != self.ack + 1: + stanza.setAttr('ack', str(self.ack)) + return stanza + + def process_incoming_ack(self, socket, ack=None): + socket.pending_requests -= 1 + if ack: + ack = int(ack) + else: + ack = self.last_rids[socket] + + i = len([rid for rid, st in self.not_acked if ack >= rid]) + self.not_acked = self.not_acked[i:] + + self.ack = ack + + + def get_rid(self): + self.rid = self.rid + 1 + return self.rid + + + + + +class KeyStack(): + def __init__(self, count): + self.count = count + self.keys = [] + self.reset() + self.first_call = True + + def reset(self): + seed = str(get_rand_number()) + self.keys = [sha.new(seed).hexdigest()] + for i in range(self.count-1): + curr_seed = self.keys[i] + self.keys.append(sha.new(curr_seed).hexdigest()) + + def get(self): + if self.first_call: + self.first_call = False + return (None, self.keys.pop()) + + if len(self.keys)>1: + return (self.keys.pop(), None) + else: + last_key = self.keys.pop() + self.reset() + new_key = self.keys.pop() + return (last_key, new_key) + +# http://www.xmpp.org/extensions/xep-0124.html#errorstatus-terminal +bosh_errors = { + 'n/a': 'none or unknown condition in terminating body stanza', + 'bad-request': 'The format of an HTTP header or binding element received from the client is unacceptable (e.g., syntax error), or Script Syntax is not supported.', + 'host-gone': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is no longer serviced by the connection manager.', + 'host-unknown': 'The target domain specified in the "to" attribute or the target host or port specified in the "route" attribute is unknown to the connection manager.', + 'improper-addressing': 'The initialization element lacks a "to" or "route" attribute (or the attribute has no value) but the connection manager requires one.', + 'internal-server-error': 'The connection manager has experienced an internal error that prevents it from servicing the request.', + 'item-not-found': '(1) "sid" is not valid, (2) "stream" is not valid, (3) "rid" is larger than the upper limit of the expected window, (4) connection manager is unable to resend response, (5) "key" sequence is invalid', + 'other-request': 'Another request being processed at the same time as this request caused the session to terminate.', + 'policy-violation': 'The client has broken the session rules (polling too frequently, requesting too frequently, too many simultaneous requests).', + 'remote-connection-failed': 'The connection manager was unable to connect to, or unable to connect securely to, or has lost its connection to, the server.', + 'remote-stream-error': 'Encapsulates an error in the protocol being transported.', + 'see-other-uri': 'The connection manager does not operate at this URI (e.g., the connection manager accepts only SSL or TLS connections at some https: URI rather than the http: URI requested by the client). The client may try POSTing to the URI in the content of the child element.', + 'system-shutdown': 'The connection manager is being shut down. All active HTTP sessions are being terminated. No new sessions can be created.', + 'undefined-condition': 'The error is not one of those defined herein; the connection manager SHOULD include application-specific information in the content of the wrapper.' +} diff --git a/src/common/xmpp/client_nb.py b/src/common/xmpp/client_nb.py index 693f5e534..eb44d4665 100644 --- a/src/common/xmpp/client_nb.py +++ b/src/common/xmpp/client_nb.py @@ -128,8 +128,8 @@ class NBCommonClient: self.ip_addresses = socket.getaddrinfo(hostname,port, socket.AF_UNSPEC,socket.SOCK_STREAM) except socket.gaierror, (errnum, errstr): - on_failure(err_message='Lookup failure for %s:%s - %s %s' % - (self.Server, self.Port, errnum, errstr)) + on_failure('Lookup failure for %s:%s, hostname: %s - %s' % + (self.Server, self.Port, hostname, errstr)) else: on_success() @@ -385,7 +385,7 @@ class NonBlockingClient(NBCommonClient): # with proxies, client connects to proxy instead of directly to # XMPP server ((hostname, port)) # tcp_host is hostname of machine used for socket connection - # (DNS request will be done for this hostname) + # (DNS request will be done for proxy or BOSH CM hostname) tcp_host, tcp_port, proxy_user, proxy_pass = \ transports_nb.get_proxy_data_from_dict(proxy) @@ -400,7 +400,7 @@ class NonBlockingClient(NBCommonClient): domain = self.Server, bosh_dict = proxy) self.protocol_type = 'BOSH' - self.wait_for_restart_response = proxy['wait_for_restart_response'] + self.wait_for_restart_response = proxy['bosh_wait_for_restart_response'] else: if proxy['type'] == 'socks5': diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py index 09e6a2616..5b1eb0be4 100644 --- a/src/common/xmpp/dispatcher_nb.py +++ b/src/common/xmpp/dispatcher_nb.py @@ -35,7 +35,6 @@ log.setLevel(logging.INFO) DEFAULT_TIMEOUT_SECONDS = 25 ID = 0 -STREAM_TERMINATOR = '' XML_DECLARATION = '' # FIXME: ugly @@ -46,7 +45,8 @@ class Dispatcher(): # named by __class__.__name__ of the dispatcher instance .. long story short: # I wrote following to avoid changing each client.Dispatcher.whatever() in xmpp/ -# If having two kinds of dispatcher will go well, I will rewrite the +# If having two kinds of dispatcher will go well, I will rewrite the dispatcher +# references in other scripts def PlugIn(self, client_obj, after_SASL=False, old_features=None): if client_obj.protocol_type == 'XMPP': XMPPDispatcher().PlugIn(client_obj) @@ -71,7 +71,7 @@ class XMPPDispatcher(PlugIn): self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler, \ self.RegisterEventHandler, self.UnregisterCycleHandler, self.RegisterCycleHandler, \ self.RegisterHandlerOnce, self.UnregisterHandler, self.RegisterProtocol, \ - self.SendAndWaitForResponse, self.StreamTerminate, \ + self.SendAndWaitForResponse, \ self.SendAndCallForResponse, self.getAnID, self.Event, self.send] def getAnID(self): @@ -134,9 +134,6 @@ class XMPPDispatcher(PlugIn): locale.getdefaultlocale()[0].split('_')[0]) self._owner.send("%s%s>" % (XML_DECLARATION,str(self._metastream)[:-2])) - def StreamTerminate(self): - ''' Send a stream terminator. ''' - self._owner.send(STREAM_TERMINATOR) def _check_stream_start(self, ns, tag, attrs): if ns<>NS_STREAMS or tag<>'stream': @@ -445,16 +442,12 @@ class BOSHDispatcher(XMPPDispatcher): locale.getdefaultlocale()[0].split('_')[0]) self.restart = True - if self.after_SASL: - self._owner.Connection.send_http(self._owner.Connection.get_after_SASL_bodytag()) - else: - self._owner.Connection.send_http(self._owner.Connection.get_initial_bodytag()) - + self._owner.Connection.send_init(after_SASL = self.after_SASL) def StreamTerminate(self): ''' Send a stream terminator. ''' - self._owner.Connection.send_http(self._owner.Connection.get_closing_bodytag()) + self._owner.Connection.send_terminator() def ProcessNonBlocking(self, data=None): @@ -472,22 +465,13 @@ class BOSHDispatcher(XMPPDispatcher): if stanza.getName()=='body' and stanza.getNamespace()==NS_HTTP_BIND: stanza_attrs = stanza.getAttrs() - if stanza_attrs.has_key('authid'): # should be only in init response # auth module expects id of stream in document attributes self.Stream._document_attrs['id'] = stanza_attrs['authid'] - if stanza_attrs.has_key('sid'): - # session ID should be only in init response - self._owner.Connection.bosh_sid = stanza_attrs['sid'] + self._owner.Connection.handle_body_attrs(stanza_attrs) - if stanza_attrs.has_key('terminate'): - self._owner.disconnect() - - if stanza_attrs.has_key('error'): - # recoverable error - pass children = stanza.getChildren() diff --git a/src/common/xmpp/idlequeue.py b/src/common/xmpp/idlequeue.py index f358e45f5..765063167 100644 --- a/src/common/xmpp/idlequeue.py +++ b/src/common/xmpp/idlequeue.py @@ -15,7 +15,6 @@ import select import logging log = logging.getLogger('gajim.c.x.idlequeue') -log.setLevel(logging.DEBUG) class IdleObject: ''' base class for all idle listeners, these are the methods, which are called from IdleQueue @@ -68,6 +67,25 @@ class IdleQueue: self.alarms[alarm_time].append(alarm_cb) else: self.alarms[alarm_time] = [alarm_cb] + return alarm_time + + + + def remove_alarm(self, alarm_cb, alarm_time): + ''' removes alarm callback alarm_cb scheduled on alarm_time''' + if not self.alarms.has_key(alarm_time): return False + i = -1 + for i in range(len(self.alarms[alarm_time])): + # let's not modify the list inside the loop + if self.alarms[alarm_time][i] is alarm_cb: break + if i != -1: + del self.alarms[alarm_time][i] + if self.alarms[alarm_time] == []: + del self.alarms[alarm_time] + return True + else: + return False + def set_read_timeout(self, fd, seconds): ''' set a new timeout, if it is not removed after 'seconds', @@ -91,9 +109,10 @@ class IdleQueue: for alarm_time in times: if alarm_time > current_time: break - for cb in self.alarms[alarm_time]: - cb() - del(self.alarms[alarm_time]) + if self.alarms.has_key(alarm_time): + for cb in self.alarms[alarm_time]: + cb() + del(self.alarms[alarm_time]) def plug_idle(self, obj, writable = True, readable = True): if obj.fd == -1: diff --git a/src/common/xmpp/transports_nb.py b/src/common/xmpp/transports_nb.py index 9a60cb9c5..f110e4371 100644 --- a/src/common/xmpp/transports_nb.py +++ b/src/common/xmpp/transports_nb.py @@ -46,25 +46,17 @@ def urisplit(uri): return proto, host, path def get_proxy_data_from_dict(proxy): + tcp_host, tcp_port, proxy_user, proxy_pass = None, None, None, None type = proxy['type'] - # with http-connect/socks5 proxy, we do tcp connecting to the proxy machine - tcp_host, tcp_port = proxy['host'], proxy['port'] - if type == 'bosh': - # in ['host'] is whole URI - tcp_host = urisplit(proxy['host'])[1] - # in BOSH, client connects to Connection Manager instead of directly to - # XMPP server ((hostname, port)). If HTTP Proxy is specified, client connects - # to HTTP proxy and Connection Manager is specified at URI and Host header - # in HTTP message - if proxy.has_key('proxy_host') and proxy.has_key('proxy_port'): - tcp_host, tcp_port = proxy['proxy_host'], proxy['proxy_port'] - - # user and pass for socks5/http_connect proxy. In case of BOSH, it's user and - # pass for http proxy - If there's no proxy_host they won't be used - if proxy.has_key('user'): proxy_user = proxy['user'] - else: proxy_user = None - if proxy.has_key('pass'): proxy_pass = proxy['pass'] - else: proxy_pass = None + if type == 'bosh' and not proxy['bosh_useproxy']: + # with BOSH not over proxy we have to parse the hostname from BOSH URI + tcp_host, tcp_port = urisplit(proxy['bosh_uri'])[1], proxy['bosh_port'] + else: + # with proxy!=bosh or with bosh over HTTP proxy we're connecting to proxy + # machine + tcp_host, tcp_port = proxy['host'], proxy['port'] + if proxy['useauth']: + proxy_user, proxy_pass = proxy['user'], proxy['pass'] return tcp_host, tcp_port, proxy_user, proxy_pass @@ -104,7 +96,7 @@ class NonBlockingTransport(PlugIn): self.port = None self.state = DISCONNECTED self._exported_methods=[self.disconnect, self.onreceive, self.set_send_timeout, - self.set_timeout, self.remove_timeout] + self.set_timeout, self.remove_timeout, self.start_disconnect] # time to wait for SOME stanza to come and then send keepalive self.sendtimeout = 0 @@ -152,10 +144,10 @@ class NonBlockingTransport(PlugIn): self.on_connect_failure(err_message=err_message) def send(self, raw_data, now=False): - if self.state != CONNECTED: - log.error('Trying to send %s when state is %s.' % + if self.state not in [CONNECTED]: + log.error('Unable to send %s \n because state is %s.' % (raw_data, self.state)) - return + def disconnect(self, do_callback=True): self.set_state(DISCONNECTED) @@ -205,6 +197,10 @@ class NonBlockingTransport(PlugIn): else: self.on_timeout = None + def start_disconnect(self): + self.set_state(DISCONNECTING) + + class NonBlockingTCP(NonBlockingTransport, IdleObject): ''' @@ -221,8 +217,12 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): # bytes remained from the last send message self.sendbuff = '' - + self.terminator = '' + + def start_disconnect(self): + self.send('') + NonBlockingTransport.start_disconnect(self) def connect(self, conn_5tuple, on_connect, on_connect_failure): ''' @@ -316,7 +316,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): def disconnect(self, do_callback=True): if self.state == DISCONNECTED: return - self.set_state(DISCONNECTING) + self.set_state(DISCONNECTED) self.idlequeue.unplug_idle(self.fd) try: self._sock.shutdown(socket.SHUT_RDWR) @@ -342,7 +342,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): def set_timeout(self, timeout): - if self.state in [CONNECTING, CONNECTED] and self.fd != -1: + if self.state != DISCONNECTED and self.fd != -1: NonBlockingTransport.set_timeout(self, timeout) else: log.warn('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' % (self.state, self.fd)) @@ -394,6 +394,9 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): if not self.sendbuff: if not self.sendqueue: log.warn('calling send on empty buffer and queue') + self._plug_idle( + writable= ((self.sendqueue!=[]) or (self.sendbuff!='')), + readable=True) return None self.sendbuff = self.sendqueue.pop(0) try: @@ -402,7 +405,7 @@ class NonBlockingTCP(NonBlockingTransport, IdleObject): sent_data = self.sendbuff[:send_count] self.sendbuff = self.sendbuff[send_count:] self._plug_idle( - writable=self.sendqueue or self.sendbuff, + writable= ((self.sendqueue!=[]) or (self.sendbuff!='')), readable=True) self.raise_event(DATA_SENT, sent_data) @@ -477,7 +480,8 @@ class NonBlockingHTTP(NonBlockingTCP): ''' def __init__(self, raise_event, on_disconnect, idlequeue, on_http_request_possible, - http_uri, http_port, http_version='HTTP/1.1', http_persistent=False): + http_uri, http_port, on_persistent_fallback, http_version='HTTP/1.1', + http_persistent=False): NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue) @@ -493,22 +497,29 @@ class NonBlockingHTTP(NonBlockingTCP): self.recvbuff = '' self.expected_length = 0 self.pending_requests = 0 + self.on_persistent_fallback = on_persistent_fallback self.on_http_request_possible = on_http_request_possible + self.just_responed = False + self.last_recv_time = 0 def send(self, raw_data, now=False): NonBlockingTCP.send( self, self.build_http_message(raw_data), now) - self.pending_requests += 1 def on_remote_disconnect(self): + log.warn('on_remote_disconnect called, http_persistent = %s' % self.http_persistent) if self.http_persistent: self.http_persistent = False + self.on_persistent_fallback() self.disconnect(do_callback=False) self.connect( conn_5tuple = self.conn_5tuple, + # after connect, the socket will be plugged as writable - pollout will be + # called, and since there are still data in sendbuff, _do_send will be + # called and sendbuff will be flushed on_connect = lambda: self._plug_idle(writable=True, readable=True), on_connect_failure = self.disconnect) @@ -534,20 +545,21 @@ class NonBlockingHTTP(NonBlockingTCP): if self.expected_length > len(self.recvbuff): # If we haven't received the whole HTTP mess yet, let's end the thread. # It will be finnished from one of following polls (io_watch) on plugged socket. - log.info('not enough bytes - %d expected, %d got' % (self.expected_length, len(self.recvbuff))) + log.info('not enough bytes in HTTP response - %d expected, %d got' % + (self.expected_length, len(self.recvbuff))) return - # all was received, now call the on_receive callback + # everything was received httpbody = self.recvbuff self.recvbuff='' self.expected_length=0 - self.pending_requests -= 1 - assert(self.pending_requests >= 0) + if not self.http_persistent: # not-persistent connections disconnect after response self.disconnect(do_callback = False) - self.on_receive(httpbody) + self.last_recv_time = time.time() + self.on_receive(data=httpbody, socket=self) self.on_http_request_possible() @@ -563,6 +575,9 @@ class NonBlockingHTTP(NonBlockingTCP): 'Host: %s:%s' % (self.http_host, self.http_port), 'Content-Type: text/xml; charset=utf-8', 'Content-Length: %s' % len(str(httpbody)), + 'Proxy-Connection: keep-alive', + 'Pragma: no-cache', + 'Accept-Encoding: gzip, deflate', '\r\n'] headers = '\r\n'.join(headers) return('%s%s\r\n' % (headers, httpbody)) @@ -587,6 +602,35 @@ class NonBlockingHTTP(NonBlockingTCP): headers[row[0][:-1]] = row[1] return (statusline, headers, httpbody) +class NonBlockingHTTPBOSH(NonBlockingHTTP): + + + def set_stanza_build_cb(self, build_cb): + self.build_cb = build_cb + + def _do_send(self): + if not self.sendbuff: + stanza = self.build_cb(socket=self) + stanza = self.build_http_message(httpbody=stanza) + if isinstance(stanza, unicode): + stanza = stanza.encode('utf-8') + elif not isinstance(stanza, str): + stanza = ustr(stanza).encode('utf-8') + self.sendbuff = stanza + try: + send_count = self._send(self.sendbuff) + if send_count: + sent_data = self.sendbuff[:send_count] + self.sendbuff = self.sendbuff[send_count:] + self._plug_idle(writable = self.sendbuff != '', readable = True) + self.raise_event(DATA_SENT, sent_data) + + except socket.error, e: + log.error('_do_send:', exc_info=True) + traceback.print_exc() + self.disconnect() + + class NBProxySocket(NonBlockingTCP): ''' @@ -663,7 +707,6 @@ class NBHTTPProxySocket(NBProxySocket): self.after_proxy_connect() #self.onreceive(self._on_proxy_auth) - # FIXME: find out what it this method for def _on_proxy_auth(self, reply): if self.reply.find('\n\n') == -1: if reply is None: diff --git a/src/config.py b/src/config.py index c678599e8..98b24a0c2 100644 --- a/src/config.py +++ b/src/config.py @@ -1101,9 +1101,30 @@ class ManageProxiesWindow: self.proxies_treeview = self.xml.get_widget('proxies_treeview') self.proxyname_entry = self.xml.get_widget('proxyname_entry') self.proxytype_combobox = self.xml.get_widget('proxytype_combobox') + self.init_list() self.xml.signal_autoconnect(self) self.window.show_all() + # hide the BOSH fields by default + self.show_bosh_fields() + + def show_bosh_fields(self, show=True): + if show: + self.xml.get_widget('boshuri_entry').show() + self.xml.get_widget('boshport_entry').show() + self.xml.get_widget('boshuri_label').show() + self.xml.get_widget('boshport_label').show() + self.xml.get_widget('boshuseproxy_checkbutton').show() + else: + cb = self.xml.get_widget('boshuseproxy_checkbutton') + cb.hide() + cb.set_active(True) + self.on_boshuseproxy_checkbutton_toggled(cb) + self.xml.get_widget('boshuri_entry').hide() + self.xml.get_widget('boshport_entry').hide() + self.xml.get_widget('boshuri_label').hide() + self.xml.get_widget('boshport_label').hide() + def fill_proxies_treeview(self): model = self.proxies_treeview.get_model() @@ -1158,9 +1179,18 @@ class ManageProxiesWindow: def on_useauth_checkbutton_toggled(self, widget): act = widget.get_active() + proxy = self.proxyname_entry.get_text().decode('utf-8') + gajim.config.set_per('proxies', proxy, 'useauth', act) self.xml.get_widget('proxyuser_entry').set_sensitive(act) self.xml.get_widget('proxypass_entry').set_sensitive(act) + def on_boshuseproxy_checkbutton_toggled(self, widget): + act = widget.get_active() + proxy = self.proxyname_entry.get_text().decode('utf-8') + gajim.config.set_per('proxies', proxy, 'bosh_useproxy', act) + self.xml.get_widget('proxyhost_entry').set_sensitive(act) + self.xml.get_widget('proxyport_entry').set_sensitive(act) + def on_proxies_treeview_cursor_changed(self, widget): #FIXME: check if off proxy settings are correct (see # http://trac.gajim.org/changeset/1921#file2 line 1221 @@ -1173,19 +1203,33 @@ class ManageProxiesWindow: proxyport_entry = self.xml.get_widget('proxyport_entry') proxyuser_entry = self.xml.get_widget('proxyuser_entry') proxypass_entry = self.xml.get_widget('proxypass_entry') + boshuri_entry = self.xml.get_widget('boshuri_entry') + boshport_entry = self.xml.get_widget('boshport_entry') useauth_checkbutton = self.xml.get_widget('useauth_checkbutton') + boshuseproxy_checkbutton = self.xml.get_widget('boshuseproxy_checkbutton') proxyhost_entry.set_text('') proxyport_entry.set_text('') proxyuser_entry.set_text('') proxypass_entry.set_text('') - useauth_checkbutton.set_active(False) - self.on_useauth_checkbutton_toggled(useauth_checkbutton) + boshuri_entry.set_text('') + + #boshuseproxy_checkbutton.set_active(False) + #self.on_boshuseproxy_checkbutton_toggled(boshuseproxy_checkbutton) + + #useauth_checkbutton.set_active(False) + #self.on_useauth_checkbutton_toggled(useauth_checkbutton) + if proxy == _('None'): # special proxy None + self.show_bosh_fields(False) self.proxyname_entry.set_editable(False) self.xml.get_widget('remove_proxy_button').set_sensitive(False) self.xml.get_widget('proxytype_combobox').set_sensitive(False) self.xml.get_widget('proxy_table').set_sensitive(False) else: + proxytype = gajim.config.get_per('proxies', proxy, 'type') + + self.show_bosh_fields(proxytype=='bosh') + self.proxyname_entry.set_editable(True) self.xml.get_widget('remove_proxy_button').set_sensitive(True) self.xml.get_widget('proxytype_combobox').set_sensitive(True) @@ -1198,11 +1242,16 @@ class ManageProxiesWindow: 'user')) proxypass_entry.set_text(gajim.config.get_per('proxies', proxy, 'pass')) - proxytype = gajim.config.get_per('proxies', proxy, 'type') + boshuri_entry.set_text(gajim.config.get_per('proxies', proxy, + 'bosh_uri')) + boshport_entry.set_text(unicode(gajim.config.get_per('proxies', proxy, + 'bosh_port'))) types = ['http', 'socks5', 'bosh'] self.proxytype_combobox.set_active(types.index(proxytype)) - if gajim.config.get_per('proxies', proxy, 'user'): - useauth_checkbutton.set_active(True) + boshuseproxy_checkbutton.set_active( + gajim.config.get_per('proxies', proxy, 'bosh_useproxy')) + useauth_checkbutton.set_active( + gajim.config.get_per('proxies', proxy, 'useauth')) def on_proxies_treeview_key_press_event(self, widget, event): if event.keyval == gtk.keysyms.Delete: @@ -1229,6 +1278,7 @@ class ManageProxiesWindow: def on_proxytype_combobox_changed(self, widget): types = ['http', 'socks5', 'bosh'] type_ = self.proxytype_combobox.get_active() + self.show_bosh_fields(types[type_]=='bosh') proxy = self.proxyname_entry.get_text().decode('utf-8') gajim.config.set_per('proxies', proxy, 'type', types[type_]) @@ -1247,6 +1297,16 @@ class ManageProxiesWindow: proxy = self.proxyname_entry.get_text().decode('utf-8') gajim.config.set_per('proxies', proxy, 'user', value) + def on_boshuri_entry_changed(self, widget): + value = widget.get_text().decode('utf-8') + proxy = self.proxyname_entry.get_text().decode('utf-8') + gajim.config.set_per('proxies', proxy, 'bosh_uri', value) + + def on_boshport_entry_changed(self, widget): + value = widget.get_text().decode('utf-8') + proxy = self.proxyname_entry.get_text().decode('utf-8') + gajim.config.set_per('proxies', proxy, 'bosh_port', value) + def on_proxypass_entry_changed(self, widget): value = widget.get_text().decode('utf-8') proxy = self.proxyname_entry.get_text().decode('utf-8') diff --git a/src/gajim.py b/src/gajim.py index fcb069016..748810a3b 100755 --- a/src/gajim.py +++ b/src/gajim.py @@ -50,7 +50,7 @@ import logging consoleloghandler = logging.StreamHandler() consoleloghandler.setLevel(1) consoleloghandler.setFormatter( - logging.Formatter('%(name)s: %(levelname)s: %(message)s') + logging.Formatter('%(asctime)s %(name)s: %(levelname)s: %(message)s') ) log = logging.getLogger('gajim') log.setLevel(logging.WARNING)