diff --git a/src/common/connection.py b/src/common/connection.py index 6972f2892..b58464725 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -35,7 +35,6 @@ import os import time import sre import traceback -import threading import select import socket import random @@ -58,19 +57,6 @@ USE_GPG = GnuPG.USE_GPG from common import i18n _ = i18n._ -# determine which DNS resolution library is available -HAS_DNSPYTHON = False -HAS_PYDNS = False -try: - import dns.resolver # http://dnspython.org/ - HAS_DNSPYTHON = True -except ImportError: - try: - import DNS # http://pydns.sf.net/ - HAS_PYDNS = True - except ImportError: - gajim.log.debug("Could not load one of the supported DNS libraries (dnspython or pydns). SRV records will not be queried and you may need to set custom hostname/port for some servers to be accessible.") - HAS_IDLE = True try: import common.idle as idle # when we launch gajim from sources @@ -174,6 +160,8 @@ class Connection: self.vcard_shas = {} # sha of contacts self.status = '' self.old_show = '' + # increase/decrease default timeout for server responses + self.try_connecting_for_foo_secs = 45 # holds the actual hostname to which we are connected self.connected_hostname = None self.time_to_reconnect = None @@ -181,12 +169,15 @@ class Connection: self.bookmarks = [] self.on_purpose = False self.last_io = time.time() - self.to_be_sent = [] self.last_sent = [] self.files_props = {} self.last_history_line = {} self.password = gajim.config.get_per('accounts', name, 'password') self.server_resource = gajim.config.get_per('accounts', name, 'resource') + if gajim.config.get_per('accounts', self.name, 'keep_alives_enabled'): + self.keepalives = gajim.config.get_per('accounts', self.name,'keep_alive_every_foo_secs') + else: + self.keepalives = 0 self.privacy_rules_supported = False # Do we continue connection when we get roster (send presence,get vcard...) self.continue_connect_info = None @@ -205,7 +196,7 @@ class Connection: idle.init() except: HAS_IDLE = False - + self.on_connect_success = None self.retrycount = 0 # END __init__ @@ -219,13 +210,12 @@ class Connection: return gajim.get_jid_without_resource(jid) def put_event(self, ev): - if gajim.events_for_ui.has_key(self.name): - gajim.events_for_ui[self.name].append(ev) + if gajim.handlers.has_key(ev[0]): + gajim.handlers[ev[0]](self.name, ev[1]) def dispatch(self, event, data): '''always passes account name as first param''' - gajim.mutex_events_for_ui.lock(self.put_event, [event, data]) - gajim.mutex_events_for_ui.unlock() + self.put_event((event, data)) def add_sha(self, p): c = p.setTag('x', namespace = common.xmpp.NS_VCARD_UPDATE) @@ -240,7 +230,7 @@ class Connection: iq = common.xmpp.Iq(typ = 'get', to = jid, queryNS = ns) if node: iq.setQuerynode(node) - self.to_be_sent.append(iq) + self.connection.send(iq) def discoverItems(self, jid, node = None): '''According to JEP-0030: jid is mandatory, @@ -338,7 +328,7 @@ class Connection: p = common.xmpp.Presence(typ = None, priority = prio, show = sshow, status = self.status) p = self.add_sha(p) - self.to_be_sent.append(p) + self.connection.send(p) else: self.dispatch('VCARD', vcard) @@ -354,7 +344,7 @@ class Connection: iq.setAttr('id', '13') query = iq.setTag('query') query.setNamespace(common.xmpp.NS_GMAILNOTIFY) - self.to_be_sent.append(iq) + self.connection.send(iq) raise common.xmpp.NodeProcessed def _gMailQueryCB(self, con, gm): @@ -553,7 +543,7 @@ class Connection: if self.connection: p = common.xmpp.Presence(who, 'subscribed') p = self.add_sha(p) - self.to_be_sent.append(p) + self.connection.send(p) if who.find("@") <= 0: self.dispatch('NOTIFY', (jid_stripped, 'offline', 'offline', resource, prio, keyID)) @@ -614,14 +604,12 @@ class Connection: def _reconnect(self): # Do not try to reco while we are already trying self.time_to_reconnect = None - t = threading.Thread(target=self._reconnect2) - t.start() - - def _reconnect2(self): gajim.log.debug('reconnect') self.retrycount += 1 signed = self.get_signed_msg(self.status) + self.on_connect_auth = self._init_roster self.connect_and_init(self.old_show, self.status, signed) + if self.connected < 2: #connection failed if self.retrycount > 10: self.connected = 0 @@ -632,11 +620,12 @@ class Connection: self.retrycount = 0 return if self.retrycount > 5: - self.time_to_reconnect = time.time() + 20 + self.time_to_reconnect = 20 else: - self.time_to_reconnect = time.time() + 10 + self.time_to_reconnect = 10 + gajim.idlequeue.set_alarm(self._reconnect_alarm, self.time_to_reconnect) else: - #reconnect succeeded + # reconnect succeeded self.time_to_reconnect = None self.retrycount = 0 @@ -653,7 +642,8 @@ class Connection: if gajim.config.get_per('accounts', self.name, 'autoreconnect'): self.connected = 1 self.dispatch('STATUS', 'connecting') - self.time_to_reconnect = time.time() + 10 + self.time_to_reconnect = 10 + gajim.idlequeue.set_alarm(self._reconnect_alarm, 10) else: self.dispatch('ERROR', (_('Connection with account "%s" has been lost') % self.name, @@ -731,7 +721,7 @@ class Connection: query.setNamespace(common.xmpp.NS_BYTESTREAM) stream_tag = query.setTag('streamhost-used') stream_tag.setAttr('jid', streamhost['jid']) - self.to_be_sent.append(iq) + self.connection.send(iq) def _connect_error(self, to, _id, sid, code = 404): msg_dict = { @@ -747,7 +737,7 @@ class Connection: err = iq.setTag('error') err.setAttr('code', unicode(code)) err.setData(msg) - self.to_be_sent.append(iq) + self.connection.send(iq) if code == 404: file_props = gajim.socks5queue.get_file_props(self.name, sid) if file_props is not None: @@ -890,7 +880,7 @@ class Connection: activate = query.setTag('activate') activate.setData(file_props['proxy_receiver']) iq.setID(auth_id) - self.to_be_sent.append(iq) + self.connection.send(iq) def _discoGetCB(self, con, iq_obj): ''' get disco info ''' @@ -914,7 +904,7 @@ class Connection: feature.setAttr('var', common.xmpp.NS_FILE) query.addChild(node=feature) - self.to_be_sent.append(iq) + self.connection.send(iq) raise common.xmpp.NodeProcessed def _siResultCB(self, con, iq_obj): @@ -973,7 +963,7 @@ class Connection: query.setNamespace(common.xmpp.NS_BYTESTREAM) # FIXME bad logic - this should be somewhere else! # this line should be put somewhere else - # self.to_be_sent.append(iq) + # self.connection.send(iq) # ensure that we don;t return empty vars if None not in (host, port, jid) or '' not in (host, port, jid): return (host, port, jid) @@ -1051,7 +1041,7 @@ class Connection: # don't add the proxy child tag for streamhosts, which are proxies # proxy = streamhost.setTag('proxy') # proxy.setNamespace(common.xmpp.NS_STREAM) - self.to_be_sent.append(iq) + self.connection.send(iq) def _siSetCB(self, con, iq_obj): gajim.log.debug('_siSetCB') @@ -1112,7 +1102,7 @@ class Connection: err = common.xmpp.ErrorNode(code = '406', typ = 'auth', name = 'not-acceptable') iq.addChild(node=err) - self.to_be_sent.append(iq) + self.connection.send(iq) def send_file_approval(self, file_props): ''' comfirm that we want to download the file ''' @@ -1132,8 +1122,8 @@ class Connection: feature.addChild(node=_feature) field = _feature.setField('stream-method') field.delAttr('type') - field.setValue('http://jabber.org/protocol/bytestreams') - self.to_be_sent.append(iq) + field.setValue(common.xmpp.NS_BYTESTREAM) + self.connection.send(iq) def send_file_request(self, file_props): our_jid = gajim.get_jid_from_account(self.name) @@ -1164,7 +1154,7 @@ class Connection: field = _feature.setField('stream-method') field.setAttr('type', 'list-single') field.addOption(common.xmpp.NS_BYTESTREAM) - self.to_be_sent.append(iq) + self.connection.send(iq) def _rosterSetCB(self, con, iq_obj): gajim.log.debug('rosterSetCB') @@ -1247,7 +1237,7 @@ class Connection: send_os = gajim.config.get('send_os_info') if send_os: qp.setTagData('os', get_os_info()) - self.to_be_sent.append(iq_obj) + self.connection.send(iq_obj) raise common.xmpp.NodeProcessed def _IdleCB(self, con, iq_obj): @@ -1259,7 +1249,7 @@ class Connection: else: qp.attrs['seconds'] = idle.getIdleSec() - self.to_be_sent.append(iq_obj) + self.connection.send(iq_obj) raise common.xmpp.NodeProcessed def _VersionResultCB(self, con, iq_obj): @@ -1377,8 +1367,10 @@ class Connection: def _getRosterCB(self, con, iq_obj): if not self.connection: return - raw_roster = self.connection.getRoster().getRaw() - + self.connection.getRoster(self._on_roster_set) + + def _on_roster_set(self, roster): + raw_roster = roster.getRaw() roster = {} for jid in raw_roster: try: @@ -1438,13 +1430,13 @@ class Connection: query.setNamespace(common.xmpp.NS_GTALKSETTING) query = query.setTag('mailnotifications') query.setAttr('value', 'true') - self.to_be_sent.append(iq) + self.connection.send(iq) # Ask how many messages there are now iq = common.xmpp.Iq(typ = 'get') iq.setAttr('id', '13') query = iq.setTag('query') query.setNamespace(common.xmpp.NS_GMAILNOTIFY) - self.to_be_sent.append(iq) + self.connection.send(iq) #Inform GUI we just signed in self.dispatch('SIGNED_IN', ()) @@ -1516,7 +1508,7 @@ class Connection: elif answer == 'no': iq = iq_obj.buildReply('error') iq.setError('not-authorized', 401) - self.to_be_sent.append(iq) + self.connection.send(iq) def _HttpAuthCB(self, con, iq_obj): gajim.log.debug('HttpAuthCB') @@ -1581,7 +1573,7 @@ class Connection: p = common.xmpp.Presence(typ = None, priority = prio, show = sshow, status = self.status) p = self.add_sha(p) - self.to_be_sent.append(p) + self.connection.send(p) elif iq_obj.getType() == 'error': self.dispatch('VCARD_NOT_PUBLISHED', ()) elif self.awaiting_answers[id][0] == VCARD_ARRIVED: @@ -1601,7 +1593,7 @@ class Connection: def _event_dispatcher(self, realm, event, data): if realm == common.xmpp.NS_REGISTER: - if event == common.xmpp.features.REGISTER_DATA_RECEIVED: + if event == common.xmpp.features_nb.REGISTER_DATA_RECEIVED: # data is (agent, DataFrom, is_form) if self.new_account_info and\ self.new_account_info['hostname'] == data[0]: @@ -1609,22 +1601,23 @@ class Connection: req = data[1].asDict() req['username'] = self.new_account_info['name'] req['password'] = self.new_account_info['password'] - if not common.xmpp.features.register(self.connection, data[0], - req): - self.dispatch('ACC_NOT_OK', (self.connection.lastErr)) - return - self.connected = 0 - self.password = self.new_account_info['password'] - if USE_GPG: - self.gpg = GnuPG.GnuPG() - gajim.config.set('usegpg', True) - else: - gajim.config.set('usegpg', False) - gajim.connections[self.name] = self - self.dispatch('ACC_OK', (self.new_account_info)) - self.new_account_info = None - self.connection = None - return + def _on_register_result(result): + if not result: + self.dispatch('ACC_NOT_OK', (self.connection.lastErr)) + return + self.connected = 0 + self.password = self.new_account_info['password'] + if USE_GPG: + self.gpg = GnuPG.GnuPG() + gajim.config.set('usegpg', True) + else: + gajim.config.set('usegpg', False) + gajim.connections[self.name] = self + self.dispatch('ACC_OK', (self.new_account_info)) + self.new_account_info = None + self.connection = None + common.xmpp.features_nb.register(self.connection, data[0], + req, _on_register_result) is_form = data[2] if is_form: conf = self.parse_data_form(data[1]) @@ -1661,7 +1654,7 @@ class Connection: return h def connect(self, data = None): - '''Connect and authenticate to the Jabber server + ''' Start a connection to the Jabber server. Returns connection, and connection type ('tls', 'ssl', 'tcp', '') data MUST contain name, hostname, resource, usessl, proxy, use_custom_host, custom_host (if use_custom_host), custom_port (if @@ -1674,7 +1667,7 @@ class Connection: hostname = data['hostname'] resource = data['resource'] usessl = data['usessl'] - try_connecting_for_foo_secs = 45 + self.try_connecting_for_foo_secs = 45 p = data['proxy'] use_srv = True use_custom = data['use_custom_host'] @@ -1686,7 +1679,7 @@ class Connection: hostname = gajim.config.get_per('accounts', self.name, 'hostname') resource = gajim.config.get_per('accounts', self.name, 'resource') usessl = gajim.config.get_per('accounts', self.name, 'usessl') - try_connecting_for_foo_secs = gajim.config.get_per('accounts', + self.try_connecting_for_foo_secs = gajim.config.get_per('accounts', self.name, 'try_connecting_for_foo_secs') p = gajim.config.get_per('accounts', self.name, 'proxy') use_srv = gajim.config.get_per('accounts', self.name, 'use_srv') @@ -1720,85 +1713,71 @@ class Connection: hosts = [] # SRV resolver - if use_srv and (HAS_DNSPYTHON or HAS_PYDNS): - # query should not be Unicode instance or dnspython TBs! - query = '_xmpp-client._tcp.' + h.encode('utf-8') - try: - if HAS_DNSPYTHON: - try: - answers = [x for x in dns.resolver.query(query, 'SRV')] - if answers: - for a in answers: - target = dns.name.from_text(str(a.target)) - # target is f.e. talk.google.com. remove last dot - target = target.to_text(omit_final_dot = True) - hosts.append({'host': target, - 'port': int(a.port), - 'prio': int(a.priority), - 'weight': int(a.weight)}) - except: - pass - elif HAS_PYDNS: - # ensure we haven't cached an old configuration - DNS.ParseResolvConf() - response = DNS.Request().req(query, qtype = 'SRV') - answers = response.answers - if len(answers) > 0: - # ignore the priority and weight for now - for a in answers: - prio, weight, port, host = a['data'] - hosts.append({'host': host, - 'port': port, - 'prio': prio, - 'weight': weight}) - except: - gajim.log.debug('An error occurred while looking up %s:' % query) - try: - traceback.print_exc() - except IOError: - pass - # end of SRV resolver - - if len(hosts) == 0: # SRV fails or misconfigred on the server - hosts = [ {'host': h, 'port': p, 'prio': 10, 'weight': 10} ] - - con_type = None - while len(hosts) and not con_type: + self._proxy = proxy + self._secure = secur + self._hosts = [ {'host': h, 'port': p, 'prio': 10, 'weight': 10} ] + self._hostname = hostname + if use_srv: + # add request for srv query to the resolve, on result '_on_resolve' will be called + gajim.resolver.resolve('_xmpp-client._tcp.' + h.encode('utf-8'), self._on_resolve) + else: + self._on_resolve('', []) + + def _on_resolve(self, host, result_array): + # SRV query returned at least one valid result, we put it in hosts dict + if len(result_array) != 0: + self._hosts = [i for i in result_array] + self.connect_to_next_host() + + def connect_to_next_host(self): + if len(self._hosts): if gajim.verbose: - con = common.xmpp.Client(hostname, caller = self) + con = common.xmpp.NonBlockingClient(self._hostname, caller = self, + on_connect = self.on_connect_success, + on_connect_failure = self.connect_to_next_host) else: - con = common.xmpp.Client(hostname, debug = [], caller = self) - common.xmpp.dispatcher.DefaultTimeout = try_connecting_for_foo_secs - con.UnregisterDisconnectHandler(con.DisconnectHandler) + con = common.xmpp.NonBlockingClient(self._hostname, debug = [], caller = self, + on_connect = self.on_connect_success, + on_connect_failure = self.connect_to_next_host) + # increase default timeout for server responses + common.xmpp.dispatcher_nb.DEFAULT_TIMEOUT_SECONDS = self.try_connecting_for_foo_secs + con.set_idlequeue(gajim.idlequeue) + host = self.select_next_host(self._hosts) + self._current_host = host + self._hosts.remove(host) + res = con.connect((host['host'], host['port']), proxy = self._proxy, + secure = self._secure) + return + else: + self._connect_failure(None) - host = self.select_next_host(hosts) - hosts.remove(host) - con_type = con.connect((host['host'], host['port']), proxy = proxy, - secure = secur) - if not self.connected: # We went offline during connecting process - return None, '' - if not con_type: - gajim.log.debug('Could not connect to %s:%s' % (host['host'], - host['port'])) + def _connect_failure(self, con_type): if not con_type: - if not self.retrycount: + # we are not retrying, and not conecting + if not self.retrycount and self.connected != 0: self.connected = 0 self.dispatch('STATUS', 'offline') - self.dispatch('ERROR', (_('Could not connect to "%s"') % h, + self.dispatch('ERROR', (_('Could not connect to "%s"') % self._hostname, _('Check your connection or try again later.'))) + + def _connect_success(self, con, con_type): + if not self.connected: # We went offline during connecting process return None, '' - self.connected_hostname = host['host'] - con.RegisterDisconnectHandler(self._disconnectedReconnCB) - gajim.log.debug(_('Connected to server %s:%s with %s') % (host['host'], - host['port'], con_type)) - return con, con_type - - def connect_and_auth(self): - con, con_type = self.connect() + self.hosts = [] if not con_type: - return None - self.peerhost = con.get_peerhost() + gajim.log.debug('Could not connect to %s:%s' % (self._current_host['host'], + self._current_host['port'])) + self.connected_hostname = self._current_host['host'] + con.RegisterDisconnectHandler(self._disconnectedReconnCB) + gajim.log.debug(_('Connected to server %s:%s with %s') % (self._current_host['host'], + self._current_host['port'], con_type)) + # Ask meta_contacts before roster + self.get_meta_contacts() + self._register_handlers(con, con_type) + return True + def _register_handlers(self, con, con_type): + self.peerhost = con.get_peerhost() # notify the gui about con_type self.dispatch('CON_TYPE', con_type) @@ -1861,33 +1840,44 @@ class Connection: name = gajim.config.get_per('accounts', self.name, 'name') hostname = gajim.config.get_per('accounts', self.name, 'hostname') resource = gajim.config.get_per('accounts', self.name, 'resource') - - try: - auth = con.auth(name, self.password, resource, 1) - except IOError: #probably a timeout + self.connection = con + con.auth(name, self.password, resource, 1, self.__on_auth) + + def __on_auth(self, con, auth): + if not con: self.connected = 0 self.dispatch('STATUS', 'offline') - self.dispatch('ERROR', (_('Could not connect to "%s"') % hostname, + self.dispatch('ERROR', (_('Could not connect to "%s"') % self._hostname, _('Check your connection or try again later'))) - return None + if self.on_connect_auth: + self.on_connect_auth(None) + self.on_connect_auth = None + return if not self.connected: # We went offline during connecting process - return None + if self.on_connect_auth: + self.on_connect_auth(None) + self.on_connect_auth = None + return if hasattr(con, 'Resource'): self.server_resource = con.Resource if auth: self.last_io = time.time() self.connected = 2 - return con # return connection + if self.on_connect_auth: + self.on_connect_auth(con) + self.on_connect_auth = None else: # Forget password if needed if not gajim.config.get_per('accounts', self.name, 'savepass'): self.password = None - gajim.log.debug("Couldn't authenticate to %s" % hostname) + gajim.log.debug("Couldn't authenticate to %s" % self._hostname) self.connected = 0 self.dispatch('STATUS', 'offline') - self.dispatch('ERROR', (_('Authentication failed with "%s"') % hostname, + self.dispatch('ERROR', (_('Authentication failed with "%s"') % self._hostname, _('Please check your login and password for correctness.'))) - return None + if self.on_connect_auth: + self.on_connect_auth(None) + self.on_connect_auth = None # END connect def quit(self, kill_core): @@ -1895,6 +1885,7 @@ class Connection: if self.connected > 1: self.connected = 0 self.connection.disconnect() + self.time_to_reconnect = None return def build_privacy_rule(self, name, action): @@ -1974,20 +1965,25 @@ class Connection: self.dispatch('BAD_PASSPHRASE', ()) return signed + def connect_and_auth(self): + self.on_connect_success = self._connect_success + self.connect() + def connect_and_init(self, show, msg, signed): self.continue_connect_info = [show, msg, signed] - self.connection = self.connect_and_auth() + self.on_connect_auth = self._init_roster + self.connect_and_auth() + + def _init_roster(self, con): + self.connection = con if self.connection: + con.set_send_timeout(self.keepalives, self.send_keepalive) + self.connection.onreceive(None) # Ask meta_contacts before roster self.get_meta_contacts() - + def change_status(self, show, msg, sync = False, auto = False): - if sync: - self.change_status2(show, msg, auto) - else: - t = threading.Thread(target=self.change_status2, args = (show, msg, - auto)) - t.start() + self.change_status2(show, msg, auto) def change_status2(self, show, msg, auto = False): if not show in STATUS_LIST: @@ -2018,14 +2014,12 @@ class Connection: if msg: p.setStatus(msg) self.remove_all_transfers() - if self.connection: - self.connection.send(p) - try: - self.connection.disconnect() - except: - pass - self.dispatch('STATUS', 'offline') - self.connection = None + self.time_to_reconnect = None + self.connection.start_disconnect(p, self._on_disconnected) + else: + self.time_to_reconnect = None + self._on_disconnected() + elif show != 'offline' and self.connected: was_invisible = self.connected == STATUS_LIST.index('invisible') self.connected = STATUS_LIST.index(show) @@ -2047,6 +2041,11 @@ class Connection: if self.connection: self.connection.send(p) self.dispatch('STATUS', show) + + def _on_disconnected(self): + ''' called when a disconnect request has completed successfully''' + self.dispatch('STATUS', 'offline') + self.connection = None def get_status(self): return STATUS_LIST[self.connected] @@ -2055,7 +2054,7 @@ class Connection: if not self.connection: return msg_iq = common.xmpp.Message(to = jid, body = msg, subject = subject) - self.to_be_sent.append(msg_iq) + self.connection.send(msg_iq) def send_message(self, jid, msg, keyID, type = 'chat', subject='', chatstate = None): if not self.connection: @@ -2092,7 +2091,7 @@ class Connection: msg_iq.setTag(chatstate, {}, namespace = 'http://jabber.org/protocol/chatstates') - self.to_be_sent.append(msg_iq) + self.connection.send(msg_iq) no_log_for = gajim.config.get_per('accounts', self.name, 'no_log_for') ji = gajim.get_jid_without_resource(jid) if self.name not in no_log_for and ji not in no_log_for: @@ -2111,21 +2110,21 @@ class Connection: ''' send a stanza untouched ''' if not self.connection: return - self.to_be_sent.append(stanza) + self.connection.send(stanza) def ack_subscribed(self, jid): if not self.connection: return gajim.log.debug('ack\'ing subscription complete for %s' % jid) p = common.xmpp.Presence(jid, 'subscribe') - self.to_be_sent.append(p) + self.connection.send(p) def ack_unsubscribed(self, jid): if not self.connection: return gajim.log.debug('ack\'ing unsubscription complete for %s' % jid) p = common.xmpp.Presence(jid, 'unsubscribe') - self.to_be_sent.append(p) + self.connection.send(p) def request_subscription(self, jid, msg): if not self.connection: @@ -2136,21 +2135,21 @@ class Connection: if not msg: msg = _('I would like to add you to my roster.') p.setStatus(msg) - self.to_be_sent.append(p) + self.connection.send(p) def send_authorization(self, jid): if not self.connection: return p = common.xmpp.Presence(jid, 'subscribed') p = self.add_sha(p) - self.to_be_sent.append(p) + self.connection.send(p) def refuse_authorization(self, jid): if not self.connection: return p = common.xmpp.Presence(jid, 'unsubscribed') p = self.add_sha(p) - self.to_be_sent.append(p) + self.connection.send(p) def unsubscribe(self, jid, remove_auth = True): if not self.connection: @@ -2161,6 +2160,9 @@ class Connection: self.connection.getRoster().Unsubscribe(jid) def _continue_unsubscribe(self, con, iq_obj, agent): + if iq_obj.getTag('error'): + # error, probably not a real agent + return self.connection.getRoster().delItem(agent) def unsubscribe_agent(self, agent): @@ -2178,7 +2180,7 @@ class Connection: groups = groups) def _ReceivedRegInfo(self, con, resp, agent): - common.xmpp.features._ReceivedRegInfo(con, resp, agent) + common.xmpp.features_nb._ReceivedRegInfo(con, resp, agent) self._IqCB(con, resp) def request_register_agent_info(self, agent): @@ -2201,33 +2203,32 @@ class Connection: iq = common.xmpp.Iq('set', common.xmpp.NS_REGISTER, to = agent) query = iq.getTag('query') self.build_data_from_dict(query, info) - self.to_be_sent.append(iq) + self.connection.send(iq) else: - # FIXME: Blocking - common.xmpp.features.register(self.connection, agent, info) + # fixed: blocking + common.xmpp.features_nb.register(self.connection, agent, info, None) def new_account(self, name, config, sync = False): - if sync: - self.new_account2(name, config) - else: - t = threading.Thread(target=self.new_account2, args = (name, config)) - t.start() + self.new_account2(name, config) def new_account2(self, name, config): # If a connection already exist we cannot create a new account if self.connection: return - con, con_type = self.connect(config) + self._hostname = config['hostname'] + self.new_account_info = config + self.name = name + self.on_connect_success = self._on_new_account + self.connect(config) + + def _on_new_account(self,con, con_type): if not con_type: self.dispatch('ACC_NOT_OK', - (_('Could not connect to "%s"') % config['hostname'])) + (_('Could not connect to "%s"') % self._hostname)) return - - self.new_account_info = config self.connection = con - self.name = name - common.xmpp.features.getRegInfo(con, config['hostname']) - + common.xmpp.features_nb.getRegInfo(con, self._hostname) + def account_changed(self, new_name): self.name = new_name @@ -2239,7 +2240,7 @@ class Connection: to_whom_jid += '/' + resource iq = common.xmpp.Iq(to=to_whom_jid, typ = 'get', queryNS =\ common.xmpp.NS_VERSION) - self.to_be_sent.append(iq) + self.connection.send(iq) def get_cached_vcard(self, fjid): '''return the vcard as a dict @@ -2280,7 +2281,7 @@ class Connection: id = self.connection.getAnID() iq.setID(id) self.awaiting_answers[id] = (VCARD_ARRIVED, jid) - self.to_be_sent.append(iq) + self.connection.send(iq) #('VCARD', {entry1: data, entry2: {entry21: data, ...}, ...}) def send_vcard(self, vcard): @@ -2305,7 +2306,7 @@ class Connection: id = self.connection.getAnID() iq.setID(id) - self.to_be_sent.append(iq) + self.connection.send(iq) # Add the sha of the avatar if vcard.has_key('PHOTO') and isinstance(vcard['PHOTO'], dict) and \ @@ -2324,7 +2325,7 @@ class Connection: iq = common.xmpp.Iq(typ='get') iq2 = iq.addChild(name='query', namespace='jabber:iq:private') iq3 = iq2.addChild(name='gajim', namespace='gajim:prefs') - self.to_be_sent.append(iq) + self.connection.send(iq) def get_bookmarks(self): '''Get Bookmarks from storage as described in JEP 0048''' @@ -2334,7 +2335,7 @@ class Connection: iq = common.xmpp.Iq(typ='get') iq2 = iq.addChild(name='query', namespace='jabber:iq:private') iq2.addChild(name='storage', namespace='storage:bookmarks') - self.to_be_sent.append(iq) + self.connection.send(iq) def store_bookmarks(self): ''' Send bookmarks to the storage namespace ''' @@ -2355,7 +2356,7 @@ class Connection: iq5 = iq4.setTagData('nick', bm['nick']) if bm['password']: iq5 = iq4.setTagData('password', bm['password']) - self.to_be_sent.append(iq) + self.connection.send(iq) def get_meta_contacts(self): '''Get meta_contacts list from storage as described in JEP 0049''' @@ -2364,7 +2365,7 @@ class Connection: iq = common.xmpp.Iq(typ='get') iq2 = iq.addChild(name='query', namespace='jabber:iq:private') iq2.addChild(name='gajim', namespace='gajim:metacontacts') - self.to_be_sent.append(iq) + self.connection.send(iq) def store_meta_contacts(self, children_list): ''' Send meta contacts to the storage namespace ''' @@ -2377,14 +2378,14 @@ class Connection: parent_tag = iq3.addChild(name='parent', attrs = {'name': parent_jid}) for child_jid in children_list[parent_jid]: parent_tag.addChild(name='child', attrs = {'name': child_jid}) - self.to_be_sent.append(iq) + self.connection.send(iq) def send_agent_status(self, agent, ptype): if not self.connection: return p = common.xmpp.Presence(to = agent, typ = ptype) p = self.add_sha(p) - self.to_be_sent.append(p) + self.connection.send(p) def join_gc(self, nick, room, server, password): if not self.connection: @@ -2397,7 +2398,7 @@ class Connection: t = p.setTag(common.xmpp.NS_MUC + ' x') if password: t.setTagData('password', password) - self.to_be_sent.append(p) + self.connection.send(p) #last date/time in history to avoid duplicate # FIXME: This JID needs to be normalized; see #1364 jid='%s@%s' % (room, server) @@ -2410,26 +2411,26 @@ class Connection: if not self.connection: return msg_iq = common.xmpp.Message(jid, msg, typ = 'groupchat') - self.to_be_sent.append(msg_iq) + self.connection.send(msg_iq) self.dispatch('MSGSENT', (jid, msg)) def send_gc_subject(self, jid, subject): if not self.connection: return msg_iq = common.xmpp.Message(jid,typ = 'groupchat', subject = subject) - self.to_be_sent.append(msg_iq) + self.connection.send(msg_iq) def request_gc_config(self, room_jid): iq = common.xmpp.Iq(typ = 'get', queryNS = common.xmpp.NS_MUC_OWNER, to = room_jid) - self.to_be_sent.append(iq) + self.connection.send(iq) def change_gc_nick(self, room_jid, nick): if not self.connection: return p = common.xmpp.Presence(to = '%s/%s' % (room_jid, nick)) p = self.add_sha(p) - self.to_be_sent.append(p) + self.connection.send(p) def send_gc_status(self, nick, jid, show, status): if not self.connection: @@ -2457,7 +2458,7 @@ class Connection: item.setAttr('role', role) if reason: item.addChild(name = 'reason', payload = reason) - self.to_be_sent.append(iq) + self.connection.send(iq) def gc_set_affiliation(self, room_jid, jid, affiliation, reason = ''): '''affiliation is for all the life of the room so it's based on jid''' @@ -2470,7 +2471,7 @@ class Connection: item.setAttr('affiliation', affiliation) if reason: item.addChild(name = 'reason', payload = reason) - self.to_be_sent.append(iq) + self.connection.send(iq) def send_gc_affiliation_list(self, room_jid, list): if not self.connection: @@ -2483,7 +2484,7 @@ class Connection: 'affiliation': list[jid]['affiliation']}) if list[jid].has_key('reason') and list[jid]['reason']: item_tag.setTagData('reason', list[jid]['reason']) - self.to_be_sent.append(iq) + self.connection.send(iq) def get_affiliation_list(self, room_jid, affiliation): if not self.connection: @@ -2492,7 +2493,7 @@ class Connection: common.xmpp.NS_MUC_ADMIN) item = iq.getTag('query').setTag('item') item.setAttr('affiliation', affiliation) - self.to_be_sent.append(iq) + self.connection.send(iq) def build_data_from_dict(self, query, config): x = query.setTag(common.xmpp.NS_DATA + ' x', attrs = {'type': 'submit'}) @@ -2522,7 +2523,7 @@ class Connection: common.xmpp.NS_MUC_OWNER) query = iq.getTag('query') self.build_data_from_dict(query, config) - self.to_be_sent.append(iq) + self.connection.send(iq) def gpg_passphrase(self, passphrase): if USE_GPG: @@ -2553,19 +2554,27 @@ class Connection: q = iq.setTag(common.xmpp.NS_REGISTER + ' query') q.setTagData('username',username) q.setTagData('password',password) - self.to_be_sent.append(iq) + self.connection.send(iq) - def unregister_account(self): + def unregister_account(self, on_remove_success): + # no need to write this as a class method and keep the value of on_remove_success + # as a class property as pass it as an argument + def _on_unregister_account_connect(con): + self.on_connect_auth = None + if self.connected > 1: + hostname = gajim.config.get_per('accounts', self.name, 'hostname') + iq = common.xmpp.Iq(typ = 'set', to = hostname) + q = iq.setTag(common.xmpp.NS_REGISTER + ' query').setTag('remove') + con.send(iq) + on_remove_success(True) + return + on_remove_success(False) if self.connected == 0: - self.connection = self.connect_and_auth() - if self.connected > 1: - hostname = gajim.config.get_per('accounts', self.name, 'hostname') - iq = common.xmpp.Iq(typ = 'set', to = hostname) - q = iq.setTag(common.xmpp.NS_REGISTER + ' query').setTag('remove') - self.connection.send(iq) - return True - return False - + self.on_connect_auth = _on_unregister_account_connect + self.connect_and_auth() + else: + _on_unregister_account_connect(self.connection) + def send_invite(self, room, to, reason=''): '''sends invitation''' message=common.xmpp.Message(to = room) @@ -2573,74 +2582,18 @@ class Connection: c = c.addChild(name = 'invite', attrs={'to' : to}) if reason != '': c.setTagData('reason', reason) - self.to_be_sent.append(message) + self.connection.send(message) def send_keepalive(self): # nothing received for the last foo seconds (60 secs by default) - self.to_be_sent.append(' ') - - def process(self, timeout): - # Check if a timeout append - if len(self.awaiting_timeouts): - first_tim = self.awaiting_timeouts.keys()[0] - if time.time() > first_tim: - self.dispatch('INFORMATION', (_('Timeout'), - self.awaiting_timeouts[first_tim][1])) - del self.awaiting_timeouts[first_tim] + if self.connection: + self.connection.send(' ') + def _reconnect_alarm(self): if self.time_to_reconnect: if self.connected < 2: - if time.time() > self.time_to_reconnect: - self._reconnect() + self._reconnect() else: self.time_to_reconnect = None - if not self.connection: - return - if self.connected: - now = time.time() - l = [] - for t in self.last_sent: - if (now - t) < 1: - l.append(t) - self.last_sent = l - t_limit = time.time() + timeout - while time.time() < t_limit and len(self.to_be_sent) and \ - len(self.last_sent) < gajim.config.get_per('accounts', - self.name, 'max_stanza_per_sec'): - tosend = self.to_be_sent.pop(0) - - self.connection.send(tosend) - t = time.time() - self.last_io = t - self.last_sent.append(t) - try: - # do we want keepalives? - if gajim.config.get_per('accounts', self.name, 'keep_alives_enabled'): - t = gajim.config.get_per('accounts', self.name, - 'keep_alive_every_foo_secs') - # should we send keepalive? - if time.time() > (self.last_io + t): - self.send_keepalive() - - if self.connection: - self.connection.Process(timeout) - except: - gajim.log.debug(_('A protocol error has occured:')) - try: - traceback.print_exc() - except IOError: - pass - self.connected = 0 - self.dispatch('STATUS', 'offline') - if not self.connection: - return - try: - self.connection.disconnect() - except: - gajim.log.debug(_('A protocol error has occured:')) - try: - traceback.print_exc() - except IOError: - pass - self.connection = None + # END Connection diff --git a/src/common/gajim.py b/src/common/gajim.py index 33ba4511f..3e0535404 100644 --- a/src/common/gajim.py +++ b/src/common/gajim.py @@ -26,7 +26,6 @@ import os import sys import logging -import mutex import config from contacts import Contacts @@ -111,10 +110,6 @@ sleeper_state = {} # whether we pass auto away / xa or not #'autoaway': autoaway and use sleeper #'autoxa': autoxa and use sleeper status_before_autoaway = {} -#queues of events from connections... -events_for_ui = {} -#... and its mutex -mutex_events_for_ui = mutex.mutex() SHOW_LIST = ['offline', 'connecting', 'online', 'chat', 'away', 'xa', 'dnd', 'invisible'] diff --git a/src/common/nslookup.py b/src/common/nslookup.py new file mode 100644 index 000000000..5c7ccc8af --- /dev/null +++ b/src/common/nslookup.py @@ -0,0 +1,317 @@ +## common/nslookup.py +## +## Contributors for this file: +## - Dimitur Kirov +## +## Copyright (C) 2003-2004 Yann Le Boulanger +## Vincent Hanquez +## Copyright (C) 2006 Yann Le Boulanger +## Vincent Hanquez +## Nikos Kouremenos +## Dimitur Kirov +## Travis Shirk +## Norman Rasmussen +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published +## by the Free Software Foundation; version 2 only. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## + +import sys, os, sre + +from xmpp.idlequeue import IdleObject, IdleQueue +if os.name == 'nt': + from subprocess import * +elif os.name == 'posix': + import fcntl +# it is good to check validity of arguments, when calling system commands +ns_type_pattern = sre.compile('^[a-z]+$') + +# match srv host_name +host_pattern = sre.compile('^[a-z0-9\-._]*[a-z0-9]\.[a-z]{2,}$') + +class Resolver: + def __init__(self, idlequeue): + self.idlequeue = idlequeue + # dict {host : list of srv records} + self.resolved_hosts = {} + # dict {host : list of callbacks} + self.handlers = {} + + def parse_srv_result(self, fqdn, result): + ''' parse the output of nslookup command and return list of + properties: 'host', 'port','weight', 'priority' corresponding to the found + srv hosts ''' + if os.name == 'nt': + return self._parse_srv_result_nt(fqdn, result) + elif os.name == 'posix': + return self._parse_srv_result_posix(fqdn, result) + + def _parse_srv_result_nt(self, fqdn, result): + # output from win32 nslookup command + if not result: + return [] + hosts = [] + lines = result.replace('\r','').split('\n') + current_host = None + for line in lines: + line = line.lstrip() + if line == '': + continue + if line.startswith(fqdn): + rest = line[len(fqdn):] + if rest.find('service') > -1: + current_host = {} + elif isinstance(current_host, dict): + res = line.strip().split('=') + if len(res) != 2: + if len(current_host) == 4: + hosts.append(current_host) + current_host = None + continue + prop_type = res[0].strip() + prop_value = res[1].strip() + if prop_type.find('prio') > -1: + try: + current_host['prio'] = int(prop_value) + except ValueError: + continue + elif prop_type.find('weight') > -1: + try: + current_host['weight'] = int(prop_value) + except ValueError: + continue + elif prop_type.find('port') > -1: + try: + current_host['port'] = int(prop_value) + except ValueError: + continue + elif prop_type.find('host') > -1: + current_host['host'] = prop_value + if len(current_host) == 4: + hosts.append(current_host) + current_host = None + return hosts + + def _parse_srv_result_posix(self, fqdn, result): + # typical output of bind-tools nslookup command + if not result: + return [] + hosts = [] + lines = result.split('\n') + for line in lines: + if line == '': + continue + if line.startswith(fqdn): + rest = line[len(fqdn):].split('=') + if len(rest) != 2: + continue + answer_type, props_str = rest + if answer_type.strip() != 'service': + continue + props = props_str.strip().split(' ') + if len(props) < 4: + continue + prio, weight, port, host = props[-4:] + if host[-1] == '.': + host = host[:-1] + try: + prio = int(prio) + weight = int(weight) + port = int(port) + except ValueError: + continue + hosts.append({'host': host, 'port': port,'weight': weight, + 'prio': prio}) + return hosts + + def _on_ready(self, host, result): + # nslookup finished, parse the result and call the handlers + result_list = self.parse_srv_result(host, result) + + # practically it is impossible to be the opposite, but who knows :) + if not self.resolved_hosts.has_key(host): + self.resolved_hosts[host] = result_list + if self.handlers.has_key(host): + for callback in self.handlers[host]: + callback(host, result_list) + del(self.handlers[host]) + + def start_resolve(self, host): + ''' spawn new nslookup process and start waiting for results ''' + ns = NsLookup(self._on_ready, host) + ns.set_idlequeue(self.idlequeue) + ns.commandtimeout = 10 + ns.start() + + def resolve(self, host, on_ready): + if not host: + # empty host, return empty list of srv records + on_ready([]) + return + if self.resolved_hosts.has_key(host): + # host is already resolved, return cached values + on_ready(host, self.resolved_hosts[host]) + return + if self.handlers.has_key(host): + # host is about to be resolved by another connection, + # attach our callback + self.handlers[host].append(on_ready) + else: + # host has never been resolved, start now + self.handlers[host] = [on_ready] + self.start_resolve(host) + +class IdleCommand(IdleObject): + def __init__(self, on_result): + # how long (sec.) to wait for result ( 0 - forever ) + # it is a class var, instead of a constant and we can override it. + self.commandtimeout = 0 + # when we have some kind of result (valid, ot not) we call this handler + self.result_handler = on_result + # if it is True, we can safetely execute the command + self.canexecute = True + self.idlequeue = None + self.result ='' + + def set_idlequeue(self, idlequeue): + self.idlequeue = idlequeue + + def _return_result(self): + if self.result_handler: + self.result_handler(self.result) + self.result_handler = None + + def _compose_command_args(self): + return ['echo', 'da'] + + def _compose_command_line(self): + ''' return one line representation of command and its arguments ''' + return reduce(lambda left, right: left + ' ' + right, self._compose_command_args()) + + def wait_child(self): + if self.pipe.poll() is None: + # result timeout + if self.endtime < self.idlequeue.current_time(): + self._return_result() + self.pipe.stdout.close() + else: + # child is still active, continue to wait + self.idlequeue.set_alarm(self.wait_child, 0.1) + else: + # child has quit + self.result = self.pipe.stdout.read() + self._return_result() + self.pipe.stdout.close() + + def start(self): + if os.name == 'nt': + self._start_nt() + elif os.name == 'posix': + self._start_posix() + + def _start_nt(self): + if not self.canexecute: + self.result = '' + self._return_result() + return + self.pipe = Popen(self._compose_command_args(), stdout=PIPE, + bufsize = 1024, shell = True, stderr = STDOUT, stdin = None) + if self.commandtimeout >= 0: + self.endtime = self.idlequeue.current_time() + self.idlequeue.set_alarm(self.wait_child, 0.1) + + def _start_posix(self): + self.pipe = os.popen(self._compose_command_line()) + self.fd = self.pipe.fileno() + fcntl.fcntl(self.pipe, fcntl.F_SETFL, os.O_NONBLOCK) + self.idlequeue.plug_idle(self, False, True) + if self.commandtimeout >= 0: + self.idlequeue.set_read_timeout(self.fd, self.commandtimeout) + + def end(self): + self.idlequeue.unplug_idle(self.fd) + try: + self.pipe.close() + except: + pass + + def pollend(self): + self.idlequeue.remove_timeout(self.fd) + self.end() + self._return_result() + + def pollin(self): + try: + res = self.pipe.read() + except Exception, e: + res = '' + if res == '': + return self.pollend() + else: + self.result += res + + def read_timeout(self): + self.end() + self._return_result() + +class NsLookup(IdleCommand): + def __init__(self, on_result, host='_xmpp-client', type = 'srv'): + IdleCommand.__init__(self, on_result) + self.commandtimeout = 30 + self.host = host.lower() + self.type = type.lower() + if not host_pattern.match(self.host): + # invalid host name + # TODO: notify user about this. Maybe message to stderr will be enough + self.host = None + self.canexecute = False + return + if not ns_type_pattern.match(self.type): + self.type = None + self.host = None + self.canexecute = False + return + + def _compose_command_args(self): + return ['nslookup', '-type=' + self.type , self.host] + + def _return_result(self): + if self.result_handler: + self.result_handler(self.host, self.result) + self.result_handler = None + +if __name__ == '__main__': + if os.name != 'posix': + sys.exit() + # testing Resolver class + import gobject + import gtk + idlequeue = IdleQueue() + resolver = Resolver(idlequeue) + + def clicked(widget): + global resolver + host = text_view.get_text() + def on_result(host, result_array): + print 'Result:\n' + repr(result_array) + resolver.resolve(host, on_result) + win = gtk.Window() + win.set_border_width(6) + text_view = gtk.Entry() + text_view.set_text('_xmpp-client._tcp.jabber.org') + hbox = gtk.HBox() + hbox.set_spacing(3) + but = gtk.Button(' Lookup SRV ') + hbox.pack_start(text_view, 5) + hbox.pack_start(but, 0) + but.connect('clicked', clicked) + win.add(hbox) + win.show_all() + gobject.timeout_add(200, idlequeue.process) + gtk.main() diff --git a/src/common/socks5.py b/src/common/socks5.py index ff8e0a42c..6c0157c9b 100644 --- a/src/common/socks5.py +++ b/src/common/socks5.py @@ -817,7 +817,7 @@ class Socks5Receiver(Socks5): def connect(self): ''' create the socket and start the connect loop ''' self._sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.settimeout(50) + # this will not block the GUI self._sock.setblocking(False) self.state = 0 # about to be connected diff --git a/src/common/xmpp/__init__.py b/src/common/xmpp/__init__.py index ad03b2881..aa06c9c4a 100644 --- a/src/common/xmpp/__init__.py +++ b/src/common/xmpp/__init__.py @@ -26,6 +26,7 @@ and use only methods for access all values you should not have any problems. """ -import simplexml,protocol,debug,auth,transports,roster,dispatcher,features,browser,filetransfer,commands +import simplexml,protocol,debug,auth_nb,auth,transports,transports_nb,roster_nb,roster,dispatcher_nb,dispatcher,features_nb,features,browser,filetransfer,commands, idlequeue +from client_nb import * from client import * from protocol import * diff --git a/src/common/xmpp/auth_nb.py b/src/common/xmpp/auth_nb.py new file mode 100644 index 000000000..a0914e7f9 --- /dev/null +++ b/src/common/xmpp/auth_nb.py @@ -0,0 +1,366 @@ +## auth_nb.py +## based on auth.py +## +## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov +## modified by Dimitur Kirov +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2, or (at your option) +## any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +''' +Provides library with all Non-SASL and SASL authentication mechanisms. +Can be used both for client and transport authentication. +''' + +from protocol import * +from auth import * +from client import PlugIn +import sha,base64,random,dispatcher_nb + +class SASL(PlugIn): + ''' Implements SASL authentication. ''' + def __init__(self,username,password, on_sasl): + PlugIn.__init__(self) + self.username=username + self.password=password + self.on_sasl = on_sasl + + def plugin(self,owner): + if not self._owner.Dispatcher.Stream._document_attrs.has_key('version'): + self.startsasl='not-supported' + elif self._owner.Dispatcher.Stream.features: + try: + self.FeaturesHandler(self._owner.Dispatcher, self._owner.Dispatcher.Stream.features) + except NodeProcessed: + pass + else: self.startsasl=None + + def auth(self): + ''' Start authentication. Result can be obtained via "SASL.startsasl" attribute and will be + either "success" or "failure". Note that successfull auth will take at least + two Dispatcher.Process() calls. ''' + if self.startsasl: + pass + elif self._owner.Dispatcher.Stream.features: + try: + self.FeaturesHandler(self._owner.Dispatcher, self._owner.Dispatcher.Stream.features) + except NodeProcessed: + pass + else: self._owner.RegisterHandler('features', self.FeaturesHandler, xmlns=NS_STREAMS) + + def plugout(self): + ''' Remove SASL handlers from owner's dispatcher. Used internally. ''' + self._owner.UnregisterHandler('features', self.FeaturesHandler, xmlns=NS_STREAMS) + self._owner.UnregisterHandler('challenge', self.SASLHandler, xmlns=NS_SASL) + self._owner.UnregisterHandler('failure', self.SASLHandler, xmlns=NS_SASL) + self._owner.UnregisterHandler('success', self.SASLHandler, xmlns=NS_SASL) + + def FeaturesHandler(self, conn, feats): + ''' Used to determine if server supports SASL auth. Used internally. ''' + if not feats.getTag('mechanisms', namespace=NS_SASL): + self.startsasl='not-supported' + self.DEBUG('SASL not supported by server', 'error') + return + mecs=[] + for mec in feats.getTag('mechanisms', namespace=NS_SASL).getTags('mechanism'): + mecs.append(mec.getData()) + self._owner.RegisterHandler('challenge', self.SASLHandler, xmlns=NS_SASL) + self._owner.RegisterHandler('failure', self.SASLHandler, xmlns=NS_SASL) + self._owner.RegisterHandler('success', self.SASLHandler, xmlns=NS_SASL) + if "DIGEST-MD5" in mecs: + node=Node('auth',attrs={'xmlns':NS_SASL,'mechanism':'DIGEST-MD5'}) + elif "PLAIN" in mecs: + sasl_data='%s\x00%s\x00%s' % (self.username+'@' + self._owner.Server, + self.username, self.password) + node=Node('auth', attrs={'xmlns':NS_SASL,'mechanism':'PLAIN'}, + payload=[base64.encodestring(sasl_data)]) + else: + self.startsasl='failure' + self.DEBUG('I can only use DIGEST-MD5 and PLAIN mecanisms.', 'error') + return + self.startsasl='in-process' + self._owner.send(node.__str__()) + raise NodeProcessed + + def SASLHandler(self, conn, challenge): + ''' Perform next SASL auth step. Used internally. ''' + if challenge.getNamespace() <> NS_SASL: + return + if challenge.getName() == 'failure': + self.startsasl = 'failure' + try: + reason = challenge.getChildren()[0] + except: + reason = challenge + self.DEBUG('Failed SASL authentification: %s' % reason, 'error') + if self.on_sasl : + self.on_sasl () + raise NodeProcessed + elif challenge.getName() == 'success': + self.startsasl='success' + self.DEBUG('Successfully authenticated with remote server.', 'ok') + handlers=self._owner.Dispatcher.dumpHandlers() + self._owner.Dispatcher.PlugOut() + dispatcher_nb.Dispatcher().PlugIn(self._owner) + self._owner.Dispatcher.restoreHandlers(handlers) + self._owner.User = self.username + if self.on_sasl : + self.on_sasl () + raise NodeProcessed +########################################3333 + incoming_data = challenge.getData() + chal={} + data=base64.decodestring(incoming_data) + self.DEBUG('Got challenge:'+data,'ok') + for pair in data.split(','): + key, value = pair.split('=', 1) + if value[:1] == '"' and value[-1:] == '"': + value=value[1:-1] + chal[key]=value + if chal.has_key('qop') and chal['qop']=='auth': + resp={} + resp['username']=self.username + resp['realm']=self._owner.Server + resp['nonce']=chal['nonce'] + cnonce='' + for i in range(7): + cnonce+=hex(int(random.random()*65536*4096))[2:] + resp['cnonce']=cnonce + resp['nc']=('00000001') + resp['qop']='auth' + resp['digest-uri']='xmpp/'+self._owner.Server + A1=C([H(C([resp['username'], resp['realm'], self.password])), resp['nonce'], resp['cnonce']]) + A2=C(['AUTHENTICATE',resp['digest-uri']]) + response= HH(C([HH(A1),resp['nonce'],resp['nc'],resp['cnonce'],resp['qop'],HH(A2)])) + resp['response']=response + resp['charset']='utf-8' + sasl_data='' + for key in ['charset', 'username', 'realm', 'nonce', 'nc', 'cnonce', 'digest-uri', 'response', 'qop']: + if key in ['nc','qop','response','charset']: + sasl_data += "%s=%s," % (key,resp[key]) + else: + sasl_data += '%s="%s",' % (key,resp[key]) +########################################3333 + node=Node('response', attrs={'xmlns':NS_SASL}, + payload=[base64.encodestring(sasl_data[:-1]).replace('\r','').replace('\n','')]) + self._owner.send(node.__str__()) + elif chal.has_key('rspauth'): + self._owner.send(Node('response', attrs={'xmlns':NS_SASL}).__str__()) + else: + self.startsasl='failure' + self.DEBUG('Failed SASL authentification: unknown challenge', 'error') + if self.on_sasl : + self.on_sasl () + raise NodeProcessed + +class NonBlockingNonSASL(PlugIn): + ''' Implements old Non-SASL (JEP-0078) authentication used + in jabberd1.4 and transport authentication. + ''' + def __init__(self, user, password, resource, on_auth): + ''' Caches username, password and resource for auth. ''' + PlugIn.__init__(self) + self.DBG_LINE ='gen_auth' + self.user = user + self.password= password + self.resource = resource + self.on_auth = on_auth + + def plugin(self, owner): + ''' Determine the best auth method (digest/0k/plain) and use it for auth. + Returns used method name on success. Used internally. ''' + if not self.resource: + return self.authComponent(owner) + self.DEBUG('Querying server about possible auth methods', 'start') + self.owner = owner + + resp = owner.Dispatcher.SendAndWaitForResponse( + Iq('get', NS_AUTH, payload=[Node('username', payload=[self.user])]), func=self._on_username + ) + + def _on_username(self, resp): + if not isResultNode(resp): + self.DEBUG('No result node arrived! Aborting...','error') + return self.on_auth(None) + iq=Iq(typ='set',node=resp) + query=iq.getTag('query') + query.setTagData('username',self.user) + query.setTagData('resource',self.resource) + + if query.getTag('digest'): + self.DEBUG("Performing digest authentication",'ok') + query.setTagData('digest', + sha.new(self.owner.Dispatcher.Stream._document_attrs['id']+self.password).hexdigest()) + if query.getTag('password'): + query.delChild('password') + self._method='digest' + elif query.getTag('token'): + token=query.getTagData('token') + seq=query.getTagData('sequence') + self.DEBUG("Performing zero-k authentication",'ok') + hash = sha.new(sha.new(self.password).hexdigest()+token).hexdigest() + for foo in xrange(int(seq)): + hash = sha.new(hash).hexdigest() + query.setTagData('hash',hash) + self._method='0k' + else: + self.DEBUG("Sequre methods unsupported, performing plain text authentication",'warn') + query.setTagData('password',self.password) + self._method='plain' + resp=self.owner.Dispatcher.SendAndWaitForResponse(iq, func=self._on_auth) + + def _on_auth(self, resp): + if isResultNode(resp): + self.DEBUG('Sucessfully authenticated with remove host.','ok') + self.owner.User=self.user + self.owner.Resource=self.resource + self.owner._registered_name=self.owner.User+'@'+self.owner.Server+'/'+self.owner.Resource + return self.on_auth(self._method) + self.DEBUG('Authentication failed!','error') + return self.on_auth(None) + + def authComponent(self,owner): + ''' Authenticate component. Send handshake stanza and wait for result. Returns "ok" on success. ''' + self.handshake=0 + owner.send(Node(NS_COMPONENT_ACCEPT+' handshake', + payload=[sha.new(owner.Dispatcher.Stream._document_attrs['id']+self.password).hexdigest()])) + owner.RegisterHandler('handshake', self.handshakeHandler, xmlns=NS_COMPONENT_ACCEPT) + self._owner.onreceive(self._on_auth_component) + + def _on_auth_component(self, data): + ''' called when we receive some response, after we send the handshake ''' + if data: + self.Dispatcher.ProcessNonBlocking(data) + if not self.handshake: + self.DEBUG('waiting on handshake', 'notify') + return + self._owner.onreceive(None) + owner._registered_name=self.user + if self.handshake+1: + return self.on_auth('ok') + self.on_auth(None) + + def handshakeHandler(self,disp,stanza): + ''' Handler for registering in dispatcher for accepting transport authentication. ''' + if stanza.getName() == 'handshake': + self.handshake=1 + else: + self.handshake=-1 + +class NonBlockingBind(Bind): + ''' Bind some JID to the current connection to allow router know of our location.''' + def plugin(self, owner): + ''' Start resource binding, if allowed at this time. Used internally. ''' + if self._owner.Dispatcher.Stream.features: + try: + self.FeaturesHandler(self._owner.Dispatcher, self._owner.Dispatcher.Stream.features) + except NodeProcessed: + pass + else: self._owner.RegisterHandler('features', self.FeaturesHandler, xmlns=NS_STREAMS) + + def plugout(self): + ''' Remove Bind handler from owner's dispatcher. Used internally. ''' + self._owner.UnregisterHandler('features', self.FeaturesHandler, xmlns=NS_STREAMS) + + def NonBlockingBind(self, resource=None, on_bound=None): + ''' Perform binding. Use provided resource name or random (if not provided). ''' + self.on_bound = on_bound + self._resource = resource + if self._resource: + self._resource = [Node('resource', payload=[self._resource])] + else: + self._resource = [] + + self._owner.onreceive(None) + resp=self._owner.Dispatcher.SendAndWaitForResponse( + Protocol('iq',typ='set', + payload=[Node('bind', attrs={'xmlns':NS_BIND}, payload=self._resource)]), + func=self._on_bound) + def _on_bound(self, resp): + if isResultNode(resp): + self.bound.append(resp.getTag('bind').getTagData('jid')) + self.DEBUG('Successfully bound %s.'%self.bound[-1],'ok') + jid=JID(resp.getTag('bind').getTagData('jid')) + self._owner.User=jid.getNode() + self._owner.Resource=jid.getResource() + self._owner.SendAndWaitForResponse(Protocol('iq', typ='set', + payload=[Node('session', attrs={'xmlns':NS_SESSION})]), func=self._on_session) + elif resp: + self.DEBUG('Binding failed: %s.' % resp.getTag('error'),'error') + self.on_bound(None) + else: + self.DEBUG('Binding failed: timeout expired.', 'error') + self.on_bound(None) + + def _on_session(self, resp): + self._owner.onreceive(None) + if isResultNode(resp): + self.DEBUG('Successfully opened session.', 'ok') + self.session = 1 + self.on_bound('ok') + else: + self.DEBUG('Session open failed.', 'error') + self.session = 0 + self.on_bound(None) + self._owner.onreceive(None) + if isResultNode(resp): + self.DEBUG('Successfully opened session.', 'ok') + self.session = 1 + self.on_bound('ok') + else: + self.DEBUG('Session open failed.', 'error') + self.session = 0 + self.on_bound(None) + +class NBComponentBind(ComponentBind): + ''' ComponentBind some JID to the current connection to allow + router know of our location. + ''' + def plugin(self,owner): + ''' Start resource binding, if allowed at this time. Used internally. ''' + if self._owner.Dispatcher.Stream.features: + try: + self.FeaturesHandler(self._owner.Dispatcher, self._owner.Dispatcher.Stream.features) + except NodeProcessed: + pass + else: + self._owner.RegisterHandler('features', self.FeaturesHandler, xmlns=NS_STREAMS) + self.needsUnregister = 1 + + def plugout(self): + ''' Remove ComponentBind handler from owner's dispatcher. Used internally. ''' + if self.needsUnregister: + self._owner.UnregisterHandler('features', self.FeaturesHandler, xmlns=NS_STREAMS) + + def Bind(self, domain = None, on_bind = None): + ''' Perform binding. Use provided domain name (if not provided). ''' + self._owner.onreceive(self._on_bound) + self.on_bind = on_bind + + def _on_bound(self, resp): + if data: + self.Dispatcher.ProcessNonBlocking(data) + if self.bound is None: + return + self._owner.onreceive(None) + self._owner.SendAndWaitForResponse( + Protocol('bind', attrs={'name':domain}, xmlns=NS_COMPONENT_1), + func=self._on_bind_reponse) + + def _on_bind_reponse(self, res): + if resp and resp.getAttr('error'): + self.DEBUG('Binding failed: %s.' % resp.getAttr('error'), 'error') + elif resp: + self.DEBUG('Successfully bound.', 'ok') + if self.on_bind: + self.on_bind('ok') + else: + self.DEBUG('Binding failed: timeout expired.', 'error') + if self.on_bind: + self.on_bind(None) diff --git a/src/common/xmpp/client_nb.py b/src/common/xmpp/client_nb.py new file mode 100644 index 000000000..c3d0bfc33 --- /dev/null +++ b/src/common/xmpp/client_nb.py @@ -0,0 +1,389 @@ +## client_nb.py +## based on client.py +## +## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov +## modified by Dimitur Kirov +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2, or (at your option) +## any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. + +# $Id: client.py,v 1.52 2006/01/02 19:40:55 normanr Exp $ + +''' +Provides PlugIn class functionality to develop extentions for xmpppy. +Also provides Client and Component classes implementations as the +examples of xmpppy structures usage. +These classes can be used for simple applications "AS IS" though. +''' + +import socket +import debug + +import transports_nb, dispatcher_nb, auth_nb, roster_nb +from client import * + +class NBCommonClient(CommonClient): + ''' Base for Client and Component classes.''' + def __init__(self, server, port=5222, debug=['always', 'nodebuilder'], caller=None, + on_connect=None, on_connect_failure=None): + ''' Caches server name and (optionally) port to connect to. "debug" parameter specifies + the debug IDs that will go into debug output. You can either specifiy an "include" + or "exclude" list. The latter is done via adding "always" pseudo-ID to the list. + Full list: ['nodebuilder', 'dispatcher', 'gen_auth', 'SASL_auth', 'bind', 'socket', + 'CONNECTproxy', 'TLS', 'roster', 'browser', 'ibb'] . ''' + + if self.__class__.__name__ == 'NonBlockingClient': + self.Namespace, self.DBG = 'jabber:client', DBG_CLIENT + elif self.__class__.__name__ == 'NBCommonClient': + self.Namespace, self.DBG = dispatcher_nb.NS_COMPONENT_ACCEPT, DBG_COMPONENT + + self.defaultNamespace = self.Namespace + self.disconnect_handlers = [] + self.Server = server + self.Port = port + + # Who initiated this client + # Used to register the EventDispatcher + self._caller = caller + if debug and type(debug) != list: + debug = ['always', 'nodebuilder'] + self._DEBUG = Debug.Debug(debug) + self.DEBUG = self._DEBUG.Show + self.debug_flags = self._DEBUG.debug_flags + self.debug_flags.append(self.DBG) + self._owner = self + self._registered_name = None + self.connected = '' + self._component=0 + self.idlequeue = None + self.socket = None + self.on_connect = on_connect + self.on_connect_failure = on_connect_failure + #~ self.RegisterDisconnectHandler(self.DisconnectHandler) + + def set_idlequeue(self, idlequeue): + self.idlequeue = idlequeue + + def disconnected(self): + ''' Called on disconnection. Calls disconnect handlers and cleans things up. ''' + self.connected='' + self.DEBUG(self.DBG,'Disconnect detected','stop') + self.disconnect_handlers.reverse() + for i in self.disconnect_handlers: + i() + self.disconnect_handlers.reverse() + if self.__dict__.has_key('NonBlockingTLS'): + self.NonBlockingTLS.PlugOut() + + def reconnectAndReauth(self): + ''' Example of reconnection method. In fact, it can be used to batch connection and auth as well. ''' + handlerssave=self.Dispatcher.dumpHandlers() + self.Dispatcher.PlugOut() + if self.__dict__.has_key('NonBlockingNonSASL'): + self.NonBlockingNonSASL.PlugOut() + if self.__dict__.has_key('SASL'): + self.SASL.PlugOut() + if self.__dict__.has_key('NonBlockingTLS'): + self.NonBlockingTLS.PlugOut() + if self.__dict__.has_key('NBHTTPPROXYsocket'): + self.NBHTTPPROXYsocket.PlugOut() + if self.__dict__.has_key('NonBlockingTcp'): + self.NonBlockingTcp.PlugOut() + if not self.connect(server=self._Server, proxy=self._Proxy): + return + if not self.auth(self._User, self._Password, self._Resource): + return + self.Dispatcher.restoreHandlers(handlerssave) + return self.connected + + def connect(self,server=None,proxy=None, ssl=None, on_stream_start = None): + ''' Make a tcp/ip connection, protect it with tls/ssl if possible and start XMPP stream. ''' + if not server: + server = (self.Server, self.Port) + self._Server, self._Proxy, self._Ssl = server , proxy, ssl + self.on_stream_start = on_stream_start + if proxy: + self.socket = transports_nb.NBHTTPPROXYsocket(self._on_connected, + self._on_connected_failure, proxy, server) + else: + self.connected = 'tcp' + self.socket = transports_nb.NonBlockingTcp(self._on_connected, + self._on_connected_failure, server) + self.socket.PlugIn(self) + return True + + def get_attrs(self, on_stream_start): + self.on_stream_start = on_stream_start + self.onreceive(self._on_receive_document_attrs) + + def _on_connected_failure(self): + if self.socket: + self.socket.PlugOut() + if self.on_connect_failure: + self.on_connect_failure() + + def _on_connected(self): + self.connected = 'tcp' + if self._Ssl or self.Connection.getPort() in (5223, 443): + try: + transports_nb.NonBlockingTLS().PlugIn(self, now=1) + self.connected = 'ssl' + except socket.sslerror: + return + self.onreceive(self._on_receive_document_attrs) + dispatcher_nb.Dispatcher().PlugIn(self) + + def _on_receive_document_attrs(self, data): + if data: + self.Dispatcher.ProcessNonBlocking(data) + if not hasattr(self, 'Dispatcher') or \ + self.Dispatcher.Stream._document_attrs is None: + return + self.onreceive(None) + if self.Dispatcher.Stream._document_attrs.has_key('version') and \ + self.Dispatcher.Stream._document_attrs['version'] == '1.0': + self.onreceive(self._on_receive_stream_features) + return + if self.on_stream_start: + self.on_stream_start() + self.on_stream_start = None + return True + + def _on_receive_stream_features(self, data): + if data: + self.Dispatcher.ProcessNonBlocking(data) + if not self.Dispatcher.Stream.features: + return + # pass # If we get version 1.0 stream the features tag MUST BE presented + self.onreceive(None) + if self.on_stream_start: + self.on_stream_start() + self.on_stream_start = None + return True + +class NonBlockingClient(NBCommonClient): + ''' Example client class, based on CommonClient. ''' + def connect(self,server=None,proxy=None,secure=None,use_srv=True): + ''' Connect to jabber server. If you want to specify different ip/port to connect to you can + pass it as tuple as first parameter. If there is HTTP proxy between you and server + specify it's address and credentials (if needed) in the second argument. + If you want ssl/tls support to be discovered and enable automatically - leave third argument as None. (ssl will be autodetected only if port is 5223 or 443) + If you want to force SSL start (i.e. if port 5223 or 443 is remapped to some non-standard port) then set it to 1. + If you want to disable tls/ssl support completely, set it to 0. + Example: connect(('192.168.5.5',5222),{'host':'proxy.my.net','port':8080,'user':'me','password':'secret'}) + Returns '' or 'tcp' or 'tls', depending on the result.''' + self.__secure = secure + self.Connection = None + NBCommonClient.connect(self, server = server, proxy = proxy, + on_stream_start = self._on_tcp_stream_start) + return self.connected + + + def _is_connected(self): + self.onreceive(None) + if self.on_connect: + self.on_connect(self, self.connected) + self.on_connect = None + + def _on_tcp_stream_start(self): + if not self.connected or self.__secure is not None and not self.__secure: + self._is_connected() + return True + self.isplugged = True + self.onreceive(None) + transports_nb.NonBlockingTLS().PlugIn(self) + if not self.Dispatcher.Stream._document_attrs.has_key('version') or \ + not self.Dispatcher.Stream._document_attrs['version']=='1.0': + self._is_connected() + return + if not self.Dispatcher.Stream.features.getTag('starttls'): + self._is_connected() + return + self.onreceive(self._on_receive_starttls) + + def _on_receive_starttls(self, data): + if data: + self.Dispatcher.ProcessNonBlocking(data) + if not self.NonBlockingTLS.starttls: + return + self.onreceive(None) + if not hasattr(self, 'NonBlockingTLS') or self.NonBlockingTLS.starttls != 'success': + self.event('tls_failed') + self._is_connected() + return + self.connected = 'tls' + self.onreceive(None) + self._is_connected() + return True + + def auth(self, user, password, resource = '', sasl = 1, on_auth = None): + ''' Authenticate connnection and bind resource. If resource is not provided + random one or library name used. ''' + self._User, self._Password, self._Resource, self._sasl = user, password, resource, sasl + self.on_auth = on_auth + self.get_attrs(self._on_doc_attrs) + return + + def _on_old_auth(self, res): + if res: + self.connected += '+old_auth' + self.on_auth(self, 'old_auth') + else: + self.on_auth(self, None) + + def _on_doc_attrs(self): + if self._sasl: + auth_nb.SASL(self._User, self._Password, self._on_start_sasl).PlugIn(self) + if not self._sasl or self.SASL.startsasl == 'not-supported': + if not self._Resource: + self._Resource = 'xmpppy' + auth_nb.NonBlockingNonSASL(self._User, self._Password, self._Resource, self._on_old_auth).PlugIn(self) + return + self.onreceive(self._on_start_sasl) + self.SASL.auth() + return True + + def _on_start_sasl(self, data=None): + if data: + self.Dispatcher.ProcessNonBlocking(data) + if not self.__dict__.has_key('SASL'): + # SASL is pluged out, possible disconnect + return + if self.SASL.startsasl == 'in-process': + return + self.onreceive(None) + if self.SASL.startsasl == 'failure': + # wrong user/pass, stop auth + self.connected = None + self._on_sasl_auth(None) + self.SASL.PlugOut() + elif self.SASL.startsasl == 'success': + auth_nb.NonBlockingBind().PlugIn(self) + self.onreceive(self._on_auth_bind) + return True + + def _on_auth_bind(self, data): + if data: + self.Dispatcher.ProcessNonBlocking(data) + if self.NonBlockingBind.bound is None: + return + self.NonBlockingBind.NonBlockingBind(self._Resource, self._on_sasl_auth) + return True + + def _on_sasl_auth(self, res): + self.onreceive(None) + if res: + self.connected += '+sasl' + self.on_auth(self, 'sasl') + else: + self.on_auth(self, None) + + def initRoster(self): + ''' Plug in the roster. ''' + if not self.__dict__.has_key('NonBlockingRoster'): + roster_nb.NonBlockingRoster().PlugIn(self) + + def getRoster(self, on_ready = None): + ''' Return the Roster instance, previously plugging it in and + requesting roster from server if needed. ''' + if self.__dict__.has_key('NonBlockingRoster'): + return self.NonBlockingRoster.getRoster(on_ready) + return None + + def sendPresence(self, jid=None, typ=None, requestRoster=0): + ''' Send some specific presence state. + Can also request roster from server if according agrument is set.''' + if requestRoster: roster_nb.NonBlockingRoster().PlugIn(self) + self.send(dispatcher_nb.Presence(to=jid, typ=typ)) + +class Component(NBCommonClient): + ''' Component class. The only difference from CommonClient is ability to perform component authentication. ''' + def __init__(self, server, port=5347, typ=None, debug=['always', 'nodebuilder'], + domains=None, component=0, on_connect = None, on_connect_failure = None): + ''' Init function for Components. + As components use a different auth mechanism which includes the namespace of the component. + Jabberd1.4 and Ejabberd use the default namespace then for all client messages. + Jabberd2 uses jabber:client. + 'server' argument is a server name that you are connecting to (f.e. "localhost"). + 'port' can be specified if 'server' resolves to correct IP. If it is not then you'll need to specify IP + and port while calling "connect()".''' + NBCommonClient.__init__(self, server, port=port, debug=debug) + self.typ = typ + self.component=component + if domains: + self.domains=domains + else: + self.domains=[server] + self.on_connect_component = on_connect + self.on_connect_failure = on_connect_failure + + def connect(self, server=None, proxy=None): + ''' This will connect to the server, and if the features tag is found then set + the namespace to be jabber:client as that is required for jabberd2. + 'server' and 'proxy' arguments have the same meaning as in xmpp.Client.connect() ''' + if self.component: + self.Namespace=auth.NS_COMPONENT_1 + self.Server=server[0] + NBCommonClient.connect(self, server=server, proxy=proxy, + on_connect = self._on_connect, on_connect_failure = self.on_connect_failure) + + def _on_connect(self): + if self.typ=='jabberd2' or not self.typ and self.Dispatcher.Stream.features != None: + self.defaultNamespace=auth.NS_CLIENT + self.Dispatcher.RegisterNamespace(self.defaultNamespace) + self.Dispatcher.RegisterProtocol('iq',dispatcher.Iq) + self.Dispatcher.RegisterProtocol('message',dispatcher_nb.Message) + self.Dispatcher.RegisterProtocol('presence',dispatcher_nb.Presence) + self.on_connect(self.connected) + + def auth(self, name, password, dup=None, sasl=0): + ''' Authenticate component "name" with password "password".''' + self._User, self._Password, self._Resource=name, password,'' + try: + if self.component: + sasl=1 + if sasl: + auth.SASL(name,password).PlugIn(self) + if not sasl or self.SASL.startsasl=='not-supported': + if auth.NonSASL(name,password,'').PlugIn(self): + self.connected+='+old_auth' + return 'old_auth' + return + self.SASL.auth() + self.onreceive(self._on_auth_component) + except: + self.DEBUG(self.DBG,"Failed to authenticate %s" % name,'error') + + def _on_auth_component(self, data): + if data: + self.Dispatcher.ProcessNonBlocking(data) + if self.SASL.startsasl == 'in-process': + return + if self.SASL.startsasl =='success': + if self.component: + self._component = self.component + auth.NBComponentBind().PlugIn(self) + self.onreceive(_on_component_bind) + self.connected += '+sasl' + else: + raise auth.NotAuthorized(self.SASL.startsasl) + + def _on_component_bind(self, data): + if data: + self.Dispatcher.ProcessNonBlocking(data) + if self.NBComponentBind.bound is None: + return + + for domain in self.domains: + self.NBComponentBind.Bind(domain, _on_component_bound) + + def _on_component_bound(self, resp): + self.NBComponentBind.PlugOut() + diff --git a/src/common/xmpp/dispatcher_nb.py b/src/common/xmpp/dispatcher_nb.py new file mode 100644 index 000000000..70f2cf668 --- /dev/null +++ b/src/common/xmpp/dispatcher_nb.py @@ -0,0 +1,410 @@ +## dispatcher_nb.py +## based on dispatcher.py +## +## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov +## modified by Dimitur Kirov +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2, or (at your option) +## any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. + +# $Id: dispatcher.py,v 1.40 2006/01/18 19:26:43 normanr Exp $ + +''' +Main xmpppy mechanism. Provides library with methods to assign different handlers +to different XMPP stanzas. +Contains one tunable attribute: DefaultTimeout (25 seconds by default). It defines time that +Dispatcher.SendAndWaitForResponce method will wait for reply stanza before giving up. +''' + +import simplexml, sys +from protocol import * +from client import PlugIn + +# default timeout to wait for response for our id +DEFAULT_TIMEOUT_SECONDS = 25 +ID = 0 + +class Dispatcher(PlugIn): + ''' Ancestor of PlugIn class. Handles XMPP stream, i.e. aware of stream headers. + Can be plugged out/in to restart these headers (used for SASL f.e.). ''' + def __init__(self): + PlugIn.__init__(self) + DBG_LINE='dispatcher' + self.handlers={} + self._expected={} + self._defaultHandler=None + self._pendingExceptions=[] + self._eventHandler=None + self._cycleHandlers=[] + self._exported_methods=[self.RegisterHandler, self.RegisterDefaultHandler, \ + self.RegisterEventHandler, self.UnregisterCycleHandler, self.RegisterCycleHandler, \ + self.RegisterHandlerOnce, self.UnregisterHandler, self.RegisterProtocol, \ + self.SendAndWaitForResponse, self.send,self.disconnect, \ + self.SendAndCallForResponse, self.getAnID, self.Event] + + def getAnID(self): + global ID + ID += 1 + return `ID` + + def dumpHandlers(self): + ''' Return set of user-registered callbacks in it's internal format. + Used within the library to carry user handlers set over Dispatcher replugins. ''' + return self.handlers + + def restoreHandlers(self, handlers): + ''' Restores user-registered callbacks structure from dump previously obtained via dumpHandlers. + Used within the library to carry user handlers set over Dispatcher replugins. ''' + self.handlers = handlers + + def _init(self): + ''' Registers default namespaces/protocols/handlers. Used internally. ''' + self.RegisterNamespace('unknown') + self.RegisterNamespace(NS_STREAMS) + self.RegisterNamespace(self._owner.defaultNamespace) + self.RegisterProtocol('iq', Iq) + self.RegisterProtocol('presence', Presence) + self.RegisterProtocol('message', Message) + self.RegisterDefaultHandler(self.returnStanzaHandler) + # Register Gajim's event handler as soon as dispatcher begins + self.RegisterEventHandler(self._owner._caller._event_dispatcher) + self.on_responses = {} + + def plugin(self, owner): + ''' Plug the Dispatcher instance into Client class instance and send initial stream header. Used internally.''' + self._init() + self._owner.lastErrNode = None + self._owner.lastErr = None + self._owner.lastErrCode = None + self.StreamInit() + + def plugout(self): + ''' Prepares instance to be destructed. ''' + self.Stream.dispatch = None + self.Stream.DEBUG = None + self.Stream.features = None + self.Stream.destroy() + + def StreamInit(self): + ''' Send an initial stream header. ''' + self.Stream = simplexml.NodeBuilder() + self.Stream._dispatch_depth = 2 + self.Stream.dispatch = self.dispatch + self.Stream.stream_header_received = self._check_stream_start + self._owner.debug_flags.append(simplexml.DBG_NODEBUILDER) + self.Stream.DEBUG = self._owner.DEBUG + self.Stream.features = None + self._metastream = Node('stream:stream') + self._metastream.setNamespace(self._owner.Namespace) + self._metastream.setAttr('version', '1.0') + self._metastream.setAttr('xmlns:stream', NS_STREAMS) + self._metastream.setAttr('to', self._owner.Server) + self._owner.send("%s>" % str(self._metastream)[:-2]) + + def _check_stream_start(self, ns, tag, attrs): + if ns<>NS_STREAMS or tag<>'stream': + raise ValueError('Incorrect stream start: (%s,%s). Terminating.' % (tag, ns)) + + def ProcessNonBlocking(self, data=None): + ''' Check incoming stream for data waiting. If "timeout" is positive - block for as max. this time. + Returns: + 1) length of processed data if some data were processed; + 2) '0' string if no data were processed but link is alive; + 3) 0 (zero) if underlying connection is closed.''' + for handler in self._cycleHandlers: + handler(self) + if len(self._pendingExceptions) > 0: + _pendingException = self._pendingExceptions.pop() + raise _pendingException[0], _pendingException[1], _pendingException[2] + self.Stream.Parse(data) + if len(self._pendingExceptions) > 0: + _pendingException = self._pendingExceptions.pop() + raise _pendingException[0], _pendingException[1], _pendingException[2] + return len(data) + + def RegisterNamespace(self, xmlns, order='info'): + ''' Creates internal structures for newly registered namespace. + You can register handlers for this namespace afterwards. By default one namespace + already registered (jabber:client or jabber:component:accept depending on context. ''' + self.DEBUG('Registering namespace "%s"' % xmlns, order) + self.handlers[xmlns]={} + self.RegisterProtocol('unknown', Protocol, xmlns=xmlns) + self.RegisterProtocol('default', Protocol, xmlns=xmlns) + + def RegisterProtocol(self, tag_name, Proto, xmlns=None, order='info'): + ''' Used to declare some top-level stanza name to dispatcher. + Needed to start registering handlers for such stanzas. + Iq, message and presence protocols are registered by default. ''' + if not xmlns: xmlns=self._owner.defaultNamespace + self.DEBUG('Registering protocol "%s" as %s(%s)' % + (tag_name, Proto, xmlns), order) + self.handlers[xmlns][tag_name]={type:Proto, 'default':[]} + + def RegisterNamespaceHandler(self, xmlns, handler, typ='', ns='', makefirst=0, system=0): + ''' Register handler for processing all stanzas for specified namespace. ''' + self.RegisterHandler('default', handler, typ, ns, xmlns, makefirst, system) + + def RegisterHandler(self, name, handler, typ='', ns='', xmlns=None, makefirst=0, system=0): + '''Register user callback as stanzas handler of declared type. Callback must take + (if chained, see later) arguments: dispatcher instance (for replying), incomed + return of previous handlers. + The callback must raise xmpp.NodeProcessed just before return if it want preven + callbacks to be called with the same stanza as argument _and_, more importantly + library from returning stanza to sender with error set (to be enabled in 0.2 ve + Arguments: + "name" - name of stanza. F.e. "iq". + "handler" - user callback. + "typ" - value of stanza's "type" attribute. If not specified any value match + "ns" - namespace of child that stanza must contain. + "chained" - chain together output of several handlers. + "makefirst" - insert handler in the beginning of handlers list instead of + adding it to the end. Note that more common handlers (i.e. w/o "typ" and " + will be called first nevertheless. + "system" - call handler even if NodeProcessed Exception were raised already. + ''' + if not xmlns: + xmlns=self._owner.defaultNamespace + self.DEBUG('Registering handler %s for "%s" type->%s ns->%s(%s)' % + (handler, name, typ, ns, xmlns), 'info') + if not typ and not ns: + typ='default' + if not self.handlers.has_key(xmlns): + self.RegisterNamespace(xmlns,'warn') + if not self.handlers[xmlns].has_key(name): + self.RegisterProtocol(name,Protocol,xmlns,'warn') + if not self.handlers[xmlns][name].has_key(typ+ns): + self.handlers[xmlns][name][typ+ns]=[] + if makefirst: + self.handlers[xmlns][name][typ+ns].insert(0,{'func':handler,'system':system}) + else: + self.handlers[xmlns][name][typ+ns].append({'func':handler,'system':system}) + + def RegisterHandlerOnce(self,name,handler,typ='',ns='',xmlns=None,makefirst=0, system=0): + ''' Unregister handler after first call (not implemented yet). ''' + if not xmlns: + xmlns=self._owner.defaultNamespace + self.RegisterHandler(name, handler, typ, ns, xmlns, makefirst, system) + + def UnregisterHandler(self, name, handler, typ='', ns='', xmlns=None): + ''' Unregister handler. "typ" and "ns" must be specified exactly the same as with registering.''' + if not xmlns: + xmlns=self._owner.defaultNamespace + if not typ and not ns: + typ='default' + if not self.handlers[xmlns].has_key(name): + return + if not self.handlers[xmlns][name].has_key(typ+ns): + return + for pack in self.handlers[xmlns][name][typ+ns]: + if handler==pack['func']: + break + else: + pack=None + try: + self.handlers[xmlns][name][typ+ns].remove(pack) + except ValueError: + pass + + def RegisterDefaultHandler(self,handler): + ''' Specify the handler that will be used if no NodeProcessed exception were raised. + This is returnStanzaHandler by default. ''' + self._defaultHandler=handler + + def RegisterEventHandler(self,handler): + ''' Register handler that will process events. F.e. "FILERECEIVED" event. ''' + self._eventHandler=handler + + def returnStanzaHandler(self,conn,stanza): + ''' Return stanza back to the sender with error set. ''' + if stanza.getType() in ['get','set']: + conn.send(Error(stanza,ERR_FEATURE_NOT_IMPLEMENTED)) + + def streamErrorHandler(self,conn,error): + name,text='error',error.getData() + for tag in error.getChildren(): + if tag.getNamespace()==NS_XMPP_STREAMS: + if tag.getName()=='text': + text=tag.getData() + else: + name=tag.getName() + if name in stream_exceptions.keys(): + exc=stream_exceptions[name] + else: + exc=StreamError + raise exc((name,text)) + + def RegisterCycleHandler(self, handler): + ''' Register handler that will be called on every Dispatcher.Process() call. ''' + if handler not in self._cycleHandlers: + self._cycleHandlers.append(handler) + + def UnregisterCycleHandler(self, handler): + ''' Unregister handler that will is called on every Dispatcher.Process() call.''' + if handler in self._cycleHandlers: + self._cycleHandlers.remove(handler) + + def Event(self, realm, event, data): + ''' Raise some event. Takes three arguments: + 1) "realm" - scope of event. Usually a namespace. + 2) "event" - the event itself. F.e. "SUCESSFULL SEND". + 3) data that comes along with event. Depends on event.''' + if self._eventHandler: self._eventHandler(realm,event,data) + + def dispatch(self, stanza, session=None, direct=0): + ''' Main procedure that performs XMPP stanza recognition and calling apppropriate handlers for it. + Called internally. ''' + if not session: + session = self + session.Stream._mini_dom = None + name = stanza.getName() + + if not direct and self._owner._component: + if name == 'route': + if stanza.getAttr('error') == None: + if len(stanza.getChildren()) == 1: + stanza = stanza.getChildren()[0] + name=stanza.getName() + else: + for each in stanza.getChildren(): + self.dispatch(each,session,direct=1) + return + elif name == 'presence': + return + elif name in ('features','bind'): + pass + else: + raise UnsupportedStanzaType(name) + + if name=='features': + session.Stream.features=stanza + + xmlns=stanza.getNamespace() + if not self.handlers.has_key(xmlns): + self.DEBUG("Unknown namespace: " + xmlns, 'warn') + xmlns='unknown' + if not self.handlers[xmlns].has_key(name): + self.DEBUG("Unknown stanza: " + name, 'warn') + name='unknown' + else: + self.DEBUG("Got %s/%s stanza" % (xmlns, name), 'ok') + + if stanza.__class__.__name__=='Node': + stanza=self.handlers[xmlns][name][type](node=stanza) + + typ=stanza.getType() + if not typ: typ='' + stanza.props=stanza.getProperties() + ID=stanza.getID() + + session.DEBUG("Dispatching %s stanza with type->%s props->%s id->%s"%(name,typ,stanza.props,ID),'ok') + list=['default'] # we will use all handlers: + if self.handlers[xmlns][name].has_key(typ): list.append(typ) # from very common... + for prop in stanza.props: + if self.handlers[xmlns][name].has_key(prop): list.append(prop) + if typ and self.handlers[xmlns][name].has_key(typ+prop): list.append(typ+prop) # ...to very particular + + chain=self.handlers[xmlns]['default']['default'] + for key in list: + if key: chain = chain + self.handlers[xmlns][name][key] + + output='' + if session._expected.has_key(ID): + user=0 + if type(session._expected[ID]) == type(()): + cb,args = session._expected[ID] + session.DEBUG("Expected stanza arrived. Callback %s(%s) found!" % (cb, args), 'ok') + try: + cb(session,stanza,**args) + except Exception, typ: + if typ.__class__.__name__ <>'NodeProcessed': raise + else: + session.DEBUG("Expected stanza arrived!",'ok') + session._expected[ID]=stanza + else: + user=1 + for handler in chain: + if user or handler['system']: + try: + handler['func'](session,stanza) + except Exception, typ: + if typ.__class__.__name__ <> 'NodeProcessed': + self._pendingExceptions.insert(0, sys.exc_info()) + return + user=0 + if user and self._defaultHandler: + self._defaultHandler(session, stanza) + + def WaitForData(self, data): + if data is None: + return + res = self.ProcessNonBlocking(data) + self._owner.remove_timeout() + if self._expected[self._witid] is None: + return + if self.on_responses.has_key(self._witid): + self._owner.onreceive(None) + resp, args = self.on_responses[self._witid] + del(self.on_responses[self._witid]) + if args is None: + resp(self._expected[self._witid]) + else: + resp(self._owner, self._expected[self._witid], **args) + + def SendAndWaitForResponse(self, stanza, timeout=None, func=None, args=None): + ''' Put stanza on the wire and wait for recipient's response to it. ''' + if timeout is None: + timeout = DEFAULT_TIMEOUT_SECONDS + self._witid = self.send(stanza) + if func: + self.on_responses[self._witid] = (func, args) + if timeout: + self._owner.set_timeout(timeout) + self._owner.onreceive(self.WaitForData) + self._expected[self._witid] = None + return self._witid + + def SendAndCallForResponse(self, stanza, func=None, args=None): + ''' Put stanza on the wire and call back when recipient replies. + Additional callback arguments can be specified in args. ''' + self.SendAndWaitForResponse(stanza, 0, func, args) + + def send(self, stanza): + ''' Serialise stanza and put it on the wire. Assign an unique ID to it before send. + Returns assigned ID.''' + if type(stanza) in [type(''), type(u'')]: + return self._owner.Connection.send(stanza) + if not isinstance(stanza, Protocol): + _ID=None + elif not stanza.getID(): + global ID + ID+=1 + _ID=`ID` + stanza.setID(_ID) + else: + _ID=stanza.getID() + if self._owner._registered_name and not stanza.getAttr('from'): + stanza.setAttr('from', self._owner._registered_name) + if self._owner._component and stanza.getName() != 'bind': + to=self._owner.Server + if stanza.getTo() and stanza.getTo().getDomain(): + to=stanza.getTo().getDomain() + frm=stanza.getFrom() + if frm.getDomain(): + frm=frm.getDomain() + route=Protocol('route', to=to, frm=frm, payload=[stanza]) + stanza=route + stanza.setNamespace(self._owner.Namespace) + stanza.setParent(self._metastream) + self._owner.Connection.send(stanza) + return _ID + + def disconnect(self): + ''' Send a stream terminator. ''' + self._owner.Connection.send('') diff --git a/src/common/xmpp/features_nb.py b/src/common/xmpp/features_nb.py new file mode 100644 index 000000000..63c6b4257 --- /dev/null +++ b/src/common/xmpp/features_nb.py @@ -0,0 +1,228 @@ +## features.py +## +## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2, or (at your option) +## any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. + +# $Id: features.py,v 1.22 2005/09/30 20:13:04 mikealbon Exp $ + +from features import REGISTER_DATA_RECEIVED +from protocol import * + +def _on_default_response(disp, iq, cb): + def _on_response(resp): + if isResultNode(resp): + if cb: + cb(1) + elif cb: + cb(False) + disp.SendAndCallForResponse(iq, _on_response) + +def _discover(disp, ns, jid, node = None, fb2b=0, fb2a=1, cb=None): + """ Try to obtain info from the remote object. + If remote object doesn't support disco fall back to browse (if fb2b is true) + and if it doesnt support browse (or fb2b is not true) fall back to agents protocol + (if gb2a is true). Returns obtained info. Used internally. """ + iq=Iq(to=jid, typ='get', queryNS=ns) + if node: + iq.setAttr('node',node) + def _on_resp1(resp): + if fb2b and not isResultNode(resp): + # Fallback to browse + disp.SendAndCallForResponse(Iq(to=jid,typ='get',queryNS=NS_BROWSE), _on_resp2) + else: + _on_resp2('') + def _on_resp2(resp): + if fb2a and not isResultNode(resp): + # Fallback to agents + disp.SendAndCallForResponse(Iq(to=jid,typ='get',queryNS=NS_AGENTS), _on_result) + else: + _on_result('') + def _on_result(resp): + if isResultNode(resp): + if cb: + cb(resp.getQueryPayload()) + elif cb: + cb([]) + disp.SendAndCallForResponse(iq, _on_resp1) + +# this function is not used in gajim ??? +def discoverItems(disp,jid,node=None, cb=None): + """ Query remote object about any items that it contains. Return items list. """ + """ According to JEP-0030: + query MAY have node attribute + item: MUST HAVE jid attribute and MAY HAVE name, node, action attributes. + action attribute of item can be either of remove or update value.""" + def _on_response(result_array): + ret=[] + for result in result_array: + if result.getName()=='agent' and result.getTag('name'): + result.setAttr('name', result.getTagData('name')) + ret.append(result.attrs) + if cb: + cb(ret) + _discover(disp, NS_DISCO_ITEMS, jid, node, _on_response) + +def discoverInfo(disp,jid,node=None, cb=None): + """ Query remote object about info that it publishes. Returns identities and features lists.""" + """ According to JEP-0030: + query MAY have node attribute + identity: MUST HAVE category and name attributes and MAY HAVE type attribute. + feature: MUST HAVE var attribute""" + def _on_response(result): + identities , features = [] , [] + for i in result: + if i.getName()=='identity': + identities.append(i.attrs) + elif i.getName()=='feature': + features.append(i.getAttr('var')) + elif i.getName()=='agent': + if i.getTag('name'): + i.setAttr('name',i.getTagData('name')) + if i.getTag('description'): + i.setAttr('name',i.getTagData('description')) + identities.append(i.attrs) + if i.getTag('groupchat'): + features.append(NS_GROUPCHAT) + if i.getTag('register'): + features.append(NS_REGISTER) + if i.getTag('search'): + features.append(NS_SEARCH) + if cb: + cb(identities , features) + _discover(disp, NS_DISCO_INFO, jid, node, _on_response) + +### Registration ### jabber:iq:register ### JEP-0077 ########################### +def getRegInfo(disp, host, info={}, sync=True): + """ Gets registration form from remote host. + You can pre-fill the info dictionary. + F.e. if you are requesting info on registering user joey than specify + info as {'username':'joey'}. See JEP-0077 for details. + 'disp' must be connected dispatcher instance.""" + iq=Iq('get',NS_REGISTER,to=host) + for i in info.keys(): + iq.setTagData(i,info[i]) + if sync: + disp.SendAndCallForResponse(iq, lambda resp: _ReceivedRegInfo(disp.Dispatcher,resp, host)) + else: + disp.SendAndCallForResponse(iq, _ReceivedRegInfo, {'agent': host }) + +def _ReceivedRegInfo(con, resp, agent): + iq=Iq('get',NS_REGISTER,to=agent) + if not isResultNode(resp): + return + df=resp.getTag('query',namespace=NS_REGISTER).getTag('x',namespace=NS_DATA) + if df: + con.Event(NS_REGISTER,REGISTER_DATA_RECEIVED,(agent,DataForm(node=df),True)) + return + df=DataForm(typ='form') + for i in resp.getQueryPayload(): + if type(i)<>type(iq): + pass + elif i.getName()=='instructions': + df.addInstructions(i.getData()) + else: + df.setField(i.getName()).setValue(i.getData()) + con.Event(NS_REGISTER, REGISTER_DATA_RECEIVED, (agent,df,False)) + +def register(disp, host, info, cb): + """ Perform registration on remote server with provided info. + disp must be connected dispatcher instance. + Returns true or false depending on registration result. + If registration fails you can get additional info from the dispatcher's owner + attributes lastErrNode, lastErr and lastErrCode. + """ + iq=Iq('set', NS_REGISTER, to=host) + if not isinstance(info, dict): + info=info.asDict() + for i in info.keys(): + iq.setTag('query').setTagData(i,info[i]) + _on_default_response(disp, iq, cb) + +def unregister(disp, host, cb): + """ Unregisters with host (permanently removes account). + disp must be connected and authorized dispatcher instance. + Returns true on success.""" + iq = Iq('set', NS_REGISTER, to=host, payload=[Node('remove')]) + _on_default_response(disp, iq, cb) + +def changePasswordTo(disp, newpassword, host=None, cb = None): + """ Changes password on specified or current (if not specified) server. + disp must be connected and authorized dispatcher instance. + Returns true on success.""" + if not host: host=disp._owner.Server + iq = Iq('set',NS_REGISTER,to=host, payload=[Node('username', + payload=[disp._owner.Server]),Node('password',payload=[newpassword])]) + _on_default_response(disp, iq, cb) + +### Privacy ### jabber:iq:privacy ### draft-ietf-xmpp-im-19 #################### +#type=[jid|group|subscription] +#action=[allow|deny] + +def getPrivacyLists(disp, cb): + """ Requests privacy lists from connected server. + Returns dictionary of existing lists on success.""" + iq = Iq('get', NS_PRIVACY) + def _on_response(resp): + dict = {'lists': []} + try: + if not isResultNode(resp): + cb(False) + return + for list in resp.getQueryPayload(): + if list.getName()=='list': + dict['lists'].append(list.getAttr('name')) + else: + dict[list.getName()]=list.getAttr('name') + cb(dict) + except: + pass + cb(False) + disp.SendAndCallForResponse(iq, _on_respons) + +def getPrivacyList(disp, listname, cb): + """ Requests specific privacy list listname. Returns list of XML nodes (rules) + taken from the server responce.""" + def _on_response(resp): + try: + if isResultNode(resp): + return cb(resp.getQueryPayload()[0]) + except: + pass + cb(False) + iq = Iq('get', NS_PRIVACY, payload=[Node('list', {'name': listname})]) + disp.SendAndCallForResponse(iq, _on_response) + +def setActivePrivacyList(disp, listname=None, typ='active', cb=None): + """ Switches privacy list 'listname' to specified type. + By default the type is 'active'. Returns true on success.""" + if listname: + attrs={'name':listname} + else: + attrs={} + iq = Iq('set',NS_PRIVACY,payload=[Node(typ,attrs)]) + _on_default_response(disp, iq, cb) + +def setDefaultPrivacyList(disp, listname=None): + """ Sets the default privacy list as 'listname'. Returns true on success.""" + return setActivePrivacyList(disp, listname,'default') + +def setPrivacyList(disp, list, cb): + """ Set the ruleset. 'list' should be the simpleXML node formatted + according to RFC 3921 (XMPP-IM) (I.e. Node('list',{'name':listname},payload=[...]) ) + Returns true on success.""" + iq=Iq('set',NS_PRIVACY,payload=[list]) + _on_default_response(disp, iq, cb) + +def delPrivacyList(disp,listname, cb): + """ Deletes privacy list 'listname'. Returns true on success.""" + iq = Iq('set',NS_PRIVACY,payload=[Node('list',{'name':listname})]) + _on_default_response(disp, iq, cb) diff --git a/src/common/xmpp/idlequeue.py b/src/common/xmpp/idlequeue.py new file mode 100644 index 000000000..6733fa862 --- /dev/null +++ b/src/common/xmpp/idlequeue.py @@ -0,0 +1,151 @@ +## idlequeue.py +## +## Copyright (C) 2006 Dimitur Kirov +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2, or (at your option) +## any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. + +import select + +class IdleObject: + ''' base class for all idle listeners, these are the methods, which are called from IdleQueue + ''' + def __init__(self): + self.fd = -1 + pass + + def pollend(self): + ''' called on stream failure ''' + pass + + def pollin(self): + ''' called on new read event ''' + pass + + def pollout(self): + ''' called on new write event (connect in sockets is a pollout) ''' + pass + + def read_timeout(self, fd): + ''' called when timeout has happend ''' + pass + +class IdleQueue: + def __init__(self): + self.queue = {} + + # when there is a timeout it executes obj.read_timeout() + # timeout is not removed automatically! + self.read_timeouts = {} + + # cb, which are executed after XX sec., alarms are removed automatically + self.alarms = {} + self.init_idle() + + def init_idle(self): + self.selector = select.poll() + + def remove_timeout(self, fd): + ''' self explanatory, remove the timeout from 'read_timeouts' dict ''' + if self.read_timeouts.has_key(fd): + del(self.read_timeouts[fd]) + + def set_alarm(self, alarm_cb, seconds): + ''' set up a new alarm, to be called after alarm_cb sec. ''' + alarm_time = self.current_time() + seconds + # almost impossible, but in case we have another alarm_cb at this time + if self.alarms.has_key(alarm_time): + self.alarms[alarm_time].append(alarm_cb) + else: + self.alarms[alarm_time] = [alarm_cb] + + def set_read_timeout(self, fd, seconds): + ''' set a new timeout, if it is not removed after 'seconds', + then obj.read_timeout() will be called ''' + timeout = self.current_time() + seconds + self.read_timeouts[fd] = timeout + + def check_time_events(self): + current_time = self.current_time() + for fd, timeout in self.read_timeouts.items(): + if timeout > current_time: + continue + if self.queue.has_key(fd): + self.queue[fd].read_timeout() + else: + self.remove_timeout(fd) + times = self.alarms.keys() + for alarm_time in times: + if alarm_time > current_time: + break + for cb in self.alarms[alarm_time]: + cb() + del(self.alarms[alarm_time]) + + def plug_idle(self, obj, writable = True, readable = True): + if self.queue.has_key(obj.fd): + self.unplug_idle(obj.fd) + self.queue[obj.fd] = obj + if writable: + if not readable: + flags = 4 # read only + else: + flags = 7 # both readable and writable + else: + flags = 3 # write only + flags |= 16 # hung up, closed channel + self.add_idle(obj.fd, flags) + + def add_idle(self, fd, flags): + self.selector.register(fd, flags) + + def unplug_idle(self, fd): + if self.queue.has_key(fd): + del(self.queue[fd]) + self.remove_idle(fd) + + def current_time(self): + from time import time + return time() + + def remove_idle(self, fd): + self.selector.unregister(fd) + + def process_events(self, fd, flags): + obj = self.queue.get(fd) + if obj is None: + self.unplug_idle(fd) + return False + + if flags & 3: # waiting read event + obj.pollin() + return True + + elif flags & 4: # waiting write event + obj.pollout() + return True + + elif flags & 16: # closed channel + obj.pollend() + return False + + def process(self): + if not self.queue: + return True + try: + waiting_descriptors = self.selector.poll(0) + except select.error, e: + waiting_descriptors = [] + if e[0] != 4: # interrupt + raise + for fd, flags in waiting_descriptors: + self.process_events(fd, flags) + self.check_time_events() + return True diff --git a/src/common/xmpp/roster_nb.py b/src/common/xmpp/roster_nb.py new file mode 100644 index 000000000..307ffd9b1 --- /dev/null +++ b/src/common/xmpp/roster_nb.py @@ -0,0 +1,59 @@ +## roster_nb.py +## based on roster.py +## +## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov +## modified by Dimitur Kirov +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2, or (at your option) +## any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. + +# $Id: roster.py,v 1.17 2005/05/02 08:38:49 snakeru Exp $ + +''' +Simple roster implementation. Can be used though for different tasks like +mass-renaming of contacts. +''' + +from roster import Roster +from protocol import NS_ROSTER + +class NonBlockingRoster(Roster): + def plugin(self, owner, request=1): + ''' Register presence and subscription trackers in the owner's dispatcher. + Also request roster from server if the 'request' argument is set. + Used internally.''' + self._owner.RegisterHandler('iq', self.RosterIqHandler, 'result', NS_ROSTER, makefirst = 1) + self._owner.RegisterHandler('iq', self.RosterIqHandler, 'set', NS_ROSTER) + self._owner.RegisterHandler('presence', self.PresenceHandler) + if request: + self.Request() + + def _on_roster_set(self, data): + if data: + self._owner.Dispatcher.ProcessNonBlocking(data) + if not self.set: + return + self._owner.onreceive(None) + if self.on_ready: + self.on_ready(self) + self.on_ready = None + return True + + def getRoster(self, on_ready=None): + ''' Requests roster from server if neccessary and returns self. ''' + if not self.set: + self.on_ready = on_ready + self._owner.onreceive(self._on_roster_set) + return + if on_ready: + on_ready(self) + on_ready = None + else: + return self diff --git a/src/common/xmpp/transports_nb.py b/src/common/xmpp/transports_nb.py new file mode 100644 index 000000000..fa32a062c --- /dev/null +++ b/src/common/xmpp/transports_nb.py @@ -0,0 +1,481 @@ +## transports_nb.py +## based on transports.py +## +## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov +## modified by Dimitur Kirov +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2, or (at your option) +## any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. + +import socket,select,base64,dispatcher_nb +from simplexml import ustr +from client import PlugIn +from idlequeue import IdleObject +from protocol import * +from transports import * + +import sys +import os +import errno + +# timeout to connect to the server socket, it doesn't include auth +CONNECT_TIMEOUT_SECONDS = 30 + +# how long to wait for a disconnect to complete +DISCONNECT_TIMEOUT_SECONDS = 10 + +# size of the buffer which reads data from server +# if lower, more stanzas will be fragmented and processed twice +RECV_BUFSIZE = 1048576 + +class NonBlockingTcp(PlugIn, IdleObject): + ''' This class can be used instead of transports.Tcp in threadless implementations ''' + def __init__(self, on_connect = None, on_connect_failure = None, server=None, use_srv = True): + ''' Cache connection point 'server'. 'server' is the tuple of (host, port) + absolutely the same as standard tcp socket uses. + on_connect - called when we connect to the socket + on_connect_failure - called if there was error connecting to socket + ''' + IdleObject.__init__(self) + PlugIn.__init__(self) + self.DBG_LINE='socket' + self._exported_methods=[self.send, self.disconnect, self.onreceive, self.set_send_timeout, + self.start_disconnect, self.set_timeout, self.remove_timeout] + self._server = server + self.on_connect = on_connect + self.on_connect_failure = on_connect_failure + self.on_receive = None + self.on_disconnect = None + + # 0 - not connected + # 1 - connected + # -1 - about to disconnect (when we wait for final events to complete) + # -2 - disconnected + self.state = 0 + + # queue with messages to be send + self.sendqueue = [] + + # bytes remained from the last send message + self.sendbuff = '' + + # time to wait for SOME stanza to come and then send keepalive + self.sendtimeout = 0 + + # in case we want to something different than sending keepalives + self.on_timeout = None + + # writable, readable - keep state of the last pluged flags + # This prevents replug of same object with the same flags + self.writable = True + self.readable = False + + def plugin(self, owner): + ''' Fire up connection. Return non-empty string on success. + Also registers self.disconnected method in the owner's dispatcher. + Called internally. ''' + self.idlequeue = owner.idlequeue + if not self._server: + self._server=(self._owner.Server,5222) + if self.connect(self._server) is False: + return False + return True + + def read_timeout(self): + if self.state == 0: + self.idlequeue.unplug_idle(self.fd) + if self.on_connect_failure: + self.on_connect_failure() + else: + if self.on_timeout: + self.on_timeout() + self.renew_send_timeout() + + def connect(self,server=None, proxy = None, secure = None): + ''' Try to establish connection. Returns non-empty string on success. ''' + if not server: + server=self._server + else: + self._server = server + self.state = 0 + try: + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.setblocking(False) + except: + if self.on_connect_failure: + self.on_connect_failure() + return False + self.fd = self._sock.fileno() + self.idlequeue.plug_idle(self, True, False) + self.set_timeout(CONNECT_TIMEOUT_SECONDS) + self._do_connect() + return True + + def _plug_idle(self): + readable = self.state != 0 + if self.sendqueue or self.sendbuff: + writable = True + else: + writable = False + if self.writable != writable or self.readable != readable: + self.idlequeue.plug_idle(self, writable, readable) + + def pollout(self): + if self.state == 0: + return self._do_connect() + return self._do_send() + + def plugout(self): + ''' Disconnect from the remote server and unregister self.disconnected method from + the owner's dispatcher. ''' + self.disconnect() + self._owner.Connection = None + + def pollin(self): + self._do_receive() + + def pollend(self): + self.disconnect() + if self.on_connect_failure: + self.on_connect_failure() + self.on_connect_failure = None + + def disconnect(self): + if self.state == -2: # already disconnected + return + self.state = -2 + self.sendqueue = None + self.remove_timeout() + self._owner.disconnected() + self.idlequeue.unplug_idle(self.fd) + if self.on_disconnect: + self.on_disconnect() + + def end_disconnect(self): + ''' force disconnect only if we are still trying to disconnect ''' + if self.state == -1: + self.disconnect() + + def start_disconnect(self, to_send, on_disconnect): + self.on_disconnect = on_disconnect + self.sendqueue = [] + self.send(to_send) + self.send('') + self.state = -1 # about to disconnect + self.idlequeue.set_alarm(self.end_disconnect, DISCONNECT_TIMEOUT_SECONDS) + + def set_timeout(self, timeout): + if self.state >= 0 and self.fd > 0: + self.idlequeue.set_read_timeout(self.fd, timeout) + + def remove_timeout(self): + if self.fd: + self.idlequeue.remove_timeout(self.fd) + + def onreceive(self, recv_handler): + if not recv_handler: + if hasattr(self._owner, 'Dispatcher'): + self.on_receive = self._owner.Dispatcher.ProcessNonBlocking + else: + self.on_receive = None + return + _tmp = self.on_receive + # make sure this cb is not overriden by recursive calls + if not recv_handler(None) and _tmp == self.on_receive: + self.on_receive = recv_handler + + def _do_receive(self): + ''' Reads all pending incoming data. Calls owner's disconnected() method if appropriate.''' + received = '' + errnum = 0 + try: + # get as many bites, as possible, but not more than RECV_BUFSIZE + received = self._recv(RECV_BUFSIZE) + except Exception, e: + if len(e.args) > 0 and isinstance(e.args[0], int): + errnum = e[0] + # "received" will be empty anyhow + + if not received and errnum != 2: + if errnum != 8: # EOF occurred in violation of protocol + self.DEBUG('Socket error while receiving data', 'error') + if self.state >= 0: + self.disconnect() + return False + if self.state < 0: + return + # we have received some bites, stop the timeout! + self.renew_send_timeout() + if self.on_receive: + if received.strip(): + self.DEBUG(received,'got') + if hasattr(self._owner, 'Dispatcher'): + self._owner.Dispatcher.Event('', DATA_RECEIVED, received) + self.on_receive(received) + else: + # This should never happed, so we need the debug + self.DEBUG('Unhandled data received: %s' % received,'got') + self.disconnect() + return True + + def _do_send(self): + if not self.sendbuff: + if not self.sendqueue: + return None # nothing to send + self.sendbuff = self.sendqueue.pop(0) + self.sent_data = self.sendbuff + try: + send_count = self._send(self.sendbuff) + if send_count: + self.sendbuff = self.sendbuff[send_count:] + if not self.sendbuff and not self.sendqueue: + if self.state < 0: + self._on_send() + self.disconnect() + return + # we are not waiting for write + self._plug_idle() + self._on_send() + except Exception, e: + if self.state < 0: + self.disconnect() + return + if self._on_send_failure: + self._on_send_failure() + return + return True + + def _do_connect(self): + if self.state != 0: + return + self._sock.setblocking(False) + # connect_ex is better than try:connect + try: + errnum = self._sock.connect_ex(self._server) + except socket.error, e: + errnum = e[0] + # in progress, or would block + if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): + return + # 10056 - already connected, only on win32 + # code 'WS*' is not available on GNU, so we use its numeric value + elif errnum not in (0, 10056): + self.remove_timeout() + if self.on_connect_failure: + self.on_connect_failure() + return + self.remove_timeout() + self._owner.Connection=self + self.state = 1 + + self._sock.setblocking(False) + self._send = self._sock.send + self._recv = self._sock.recv + self._plug_idle() + if self.on_connect: + self.on_connect() + self.on_connect = None + return True + + def send(self, raw_data): + ''' Writes raw outgoing data. Blocks until done. + If supplied data is unicode string, encodes it to utf-8 before send. ''' + if self.state <= 0: + return + r = raw_data + if isinstance(r, unicode): + r = r.encode('utf-8') + elif not isinstance(r, str): + r = ustr(r).encode('utf-8') + self.sendqueue.append(r) + self._plug_idle() + + def _on_send(self): + if self.sent_data and self.sent_data.strip(): + self.DEBUG(self.sent_data,'sent') + if hasattr(self._owner, 'Dispatcher'): + self._owner.Dispatcher.Event('', DATA_SENT, self.sent_data) + self.sent_data = None + + def _on_send_failure(self): + self.DEBUG("Socket error while sending data",'error') + self._owner.disconnected() + self.sent_data = None + + def set_send_timeout(self, timeout, on_timeout): + self.sendtimeout = timeout + if self.sendtimeout > 0: + self.on_timeout = on_timeout + else: + self.on_timeout = None + + def renew_send_timeout(self): + if self.on_timeout and self.sendtimeout > 0: + self.set_timeout(self.sendtimeout) + else: + self.remove_timeout() + + def getHost(self): + ''' Return the 'host' value that is connection is [will be] made to.''' + return self._server[0] + + def getPort(self): + ''' Return the 'port' value that is connection is [will be] made to.''' + return self._server[1] + +class NonBlockingTLS(PlugIn): + ''' TLS connection used to encrypts already estabilished tcp connection.''' + def PlugIn(self, owner, now=0, on_tls_start = None): + ''' If the 'now' argument is true then starts using encryption immidiatedly. + If 'now' in false then starts encryption as soon as TLS feature is + declared by the server (if it were already declared - it is ok). + ''' + if owner.__dict__.has_key('NonBlockingTLS'): + return # Already enabled. + PlugIn.PlugIn(self, owner) + DBG_LINE='NonBlockingTLS' + self.on_tls_start = on_tls_start + if now: + res = self._startSSL() + self.tls_start() + return res + if self._owner.Dispatcher.Stream.features: + try: + self.FeaturesHandler(self._owner.Dispatcher, self._owner.Dispatcher.Stream.features) + except NodeProcessed: + pass + else: + self._owner.RegisterHandlerOnce('features',self.FeaturesHandler, xmlns=NS_STREAMS) + self.starttls = None + + def plugout(self,now=0): + ''' Unregisters TLS handler's from owner's dispatcher. Take note that encription + can not be stopped once started. You can only break the connection and start over.''' + self._owner.UnregisterHandler('features',self.FeaturesHandler,xmlns=NS_STREAMS) + + def tls_start(self): + if self.on_tls_start: + self.on_tls_start() + self.on_tls_start = None + + def FeaturesHandler(self, conn, feats): + ''' Used to analyse server tag for TLS support. + If TLS is supported starts the encryption negotiation. Used internally ''' + if not feats.getTag('starttls', namespace=NS_TLS): + self.DEBUG("TLS unsupported by remote server.", 'warn') + self.tls_start() + return + self.DEBUG("TLS supported by remote server. Requesting TLS start.", 'ok') + self._owner.RegisterHandlerOnce('proceed', self.StartTLSHandler, xmlns=NS_TLS) + self._owner.RegisterHandlerOnce('failure', self.StartTLSHandler, xmlns=NS_TLS) + self._owner.send('' % NS_TLS) + self.tls_start() + raise NodeProcessed + + def _startSSL(self): + ''' Immidiatedly switch socket to TLS mode. Used internally.''' + tcpsock=self._owner.Connection + tcpsock._sock.setblocking(True) + tcpsock._sslObj = socket.ssl(tcpsock._sock, None, None) + tcpsock._sock.setblocking(False) + tcpsock._sslIssuer = tcpsock._sslObj.issuer() + tcpsock._sslServer = tcpsock._sslObj.server() + tcpsock._recv = tcpsock._sslObj.read + tcpsock._send = tcpsock._sslObj.write + self.starttls='success' + + def StartTLSHandler(self, conn, starttls): + ''' Handle server reply if TLS is allowed to process. Behaves accordingly. + Used internally.''' + if starttls.getNamespace() <> NS_TLS: + return + self.starttls = starttls.getName() + if self.starttls == 'failure': + self.DEBUG('Got starttls response: ' + self.starttls,'error') + return + self.DEBUG('Got starttls proceed response. Switching to TLS/SSL...','ok') + self._startSSL() + self._owner.Dispatcher.PlugOut() + dispatcher_nb.Dispatcher().PlugIn(self._owner) + + +class NBHTTPPROXYsocket(NonBlockingTcp): + ''' This class can be used instead of transports.HTTPPROXYsocket + HTTP (CONNECT) proxy connection class. Uses TCPsocket as the base class + redefines only connect method. Allows to use HTTP proxies like squid with + (optionally) simple authentication (using login and password). + + ''' + def __init__(self, on_connect =None, on_connect_failure = None,proxy = None,server = None,use_srv=True): + ''' Caches proxy and target addresses. + 'proxy' argument is a dictionary with mandatory keys 'host' and 'port' (proxy address) + and optional keys 'user' and 'password' to use for authentication. + 'server' argument is a tuple of host and port - just like TCPsocket uses. ''' + self.on_connect_proxy = on_connect + self.on_connect_failure = on_connect_failure + NonBlockingTcp.__init__(self, self._on_tcp_connect, on_connect_failure, server, use_srv) + self.DBG_LINE=DBG_CONNECT_PROXY + self.server = server + self.proxy=proxy + + def plugin(self, owner): + ''' Starts connection. Used interally. Returns non-empty string on success.''' + owner.debug_flags.append(DBG_CONNECT_PROXY) + NonBlockingTcp.plugin(self,owner) + + def connect(self,dupe=None): + ''' Starts connection. Connects to proxy, supplies login and password to it + (if were specified while creating instance). Instructs proxy to make + connection to the target server. Returns non-empty sting on success. ''' + NonBlockingTcp.connect(self, (self.proxy['host'], self.proxy['port'])) + + def _on_tcp_connect(self): + self.DEBUG('Proxy server contacted, performing authentification','start') + connector = ['CONNECT %s:%s HTTP/1.0'%self.server, + 'Proxy-Connection: Keep-Alive', + 'Pragma: no-cache', + 'Host: %s:%s'%self.server, + 'User-Agent: HTTPPROXYsocket/v0.1'] + if self.proxy.has_key('user') and self.proxy.has_key('password'): + credentials = '%s:%s' % ( self.proxy['user'], self.proxy['password']) + credentials = base64.encodestring(credentials).strip() + connector.append('Proxy-Authorization: Basic '+credentials) + connector.append('\r\n') + self.onreceive(self._on_headers_sent) + self.send('\r\n'.join(connector)) + + def _on_headers_sent(self, reply): + if reply is None: + return + self.reply = reply.replace('\r', '') + try: + proto, code, desc = reply.split('\n')[0].split(' ', 2) + except: + raise error('Invalid proxy reply') + if code <> '200': + self.DEBUG('Invalid proxy reply: %s %s %s' % (proto, code, desc),'error') + self._owner.disconnected() + return + self.onreceive(self._on_proxy_auth) + + def _on_proxy_auth(self, reply): + if self.reply.find('\n\n') == -1: + if reply is None: + return + if reply.find('\n\n') == -1: + self.reply += reply.replace('\r', '') + return + self.DEBUG('Authentification successfull. Jabber server contacted.','ok') + if self.on_connect_proxy: + self.on_connect_proxy() + + def DEBUG(self, text, severity): + ''' Overwrites DEBUG tag to allow debug output be presented as "CONNECTproxy".''' + return self._owner.DEBUG(DBG_CONNECT_PROXY, text, severity) diff --git a/src/config.py b/src/config.py index da952f94a..c100f6aea 100644 --- a/src/config.py +++ b/src/config.py @@ -4,6 +4,7 @@ ## - Yann Le Boulanger ## - Nikos Kouremenos ## - Vincent Hanquez +## - Dimitur Kirov ## ## Copyright (C) 2003-2004 Yann Le Boulanger ## Vincent Hanquez @@ -1229,7 +1230,6 @@ class AccountModificationWindow: gajim.last_message_time[self.account] gajim.status_before_autoaway[name] = \ gajim.status_before_autoaway[self.account] - gajim.events_for_ui[name] = gajim.events_for_ui[self.account] gajim.contacts.change_account_name(self.account, name) @@ -1256,7 +1256,6 @@ class AccountModificationWindow: del gajim.encrypted_chats[self.account] del gajim.last_message_time[self.account] del gajim.status_before_autoaway[self.account] - del gajim.events_for_ui[self.account] gajim.connections[self.account].name = name gajim.connections[name] = gajim.connections[self.account] del gajim.connections[self.account] @@ -2266,7 +2265,9 @@ class RemoveAccountWindow: _('If you remove it, the connection will be lost.')) if dialog.get_response() != gtk.RESPONSE_OK: return - gajim.connections[self.account].change_status('offline', 'offline') + # change status to offline only if we will not remove this JID from server + if not self.remove_and_unregister_radiobutton.get_active(): + gajim.connections[self.account].change_status('offline', 'offline') if self.remove_and_unregister_radiobutton.get_active(): if not gajim.connections[self.account].password: @@ -2280,10 +2281,15 @@ class RemoveAccountWindow: # We don't remove account cause we canceled pw window return gajim.connections[self.account].password = passphrase - if not gajim.connections[self.account].unregister_account(): - # unregistration failed, we don't remove the account - # Error message is send by connect_and_auth() - return + gajim.connections[self.account].unregister_account(self._on_remove_success) + else: + self._on_remove_success(True) + + def _on_remove_success(self, res): + # action of unregistration has failed, we don't remove the account + # Error message is send by connect_and_auth() + if not res: + return # Close all opened windows gajim.interface.roster.close_all(gajim.interface.instances[self.account]) del gajim.connections[self.account] @@ -2302,7 +2308,6 @@ class RemoveAccountWindow: del gajim.encrypted_chats[self.account] del gajim.last_message_time[self.account] del gajim.status_before_autoaway[self.account] - del gajim.events_for_ui[self.account] if len(gajim.connections) >= 2: # Do not merge accounts if only one exists gajim.interface.roster.regroup = gajim.config.get('mergeaccounts') else: @@ -2828,7 +2833,6 @@ _('You can set advanced account options by pressing Advanced button, or later by con = connection.Connection(self.account) if savepass: con.password = password - gajim.events_for_ui[self.account] = [] if not self.modify: con.new_account(self.account, config) return @@ -2858,7 +2862,6 @@ _('You can set advanced account options by pressing Advanced button, or later by gajim.encrypted_chats[self.account] = [] gajim.last_message_time[self.account] = {} gajim.status_before_autoaway[self.account] = '' - gajim.events_for_ui[self.account] = [] # refresh accounts window if gajim.interface.instances.has_key('accounts'): gajim.interface.instances['accounts'].init_accounts() diff --git a/src/gajim.py b/src/gajim.py index 45d1e464e..530fdd404 100755 --- a/src/gajim.py +++ b/src/gajim.py @@ -105,6 +105,8 @@ import notify import common.sleepy +from common.xmpp import idlequeue +from common import nslookup from common import socks5 from common import gajim from common import connection @@ -152,6 +154,39 @@ import disco GTKGUI_GLADE = 'gtkgui.glade' +class GlibIdleQueue(idlequeue.IdleQueue): + ''' + Extends IdleQueue to use glib io_add_wath, instead of select/poll + In another, `non gui' implementation of Gajim IdleQueue can be used safetly. + ''' + def init_idle(self): + ''' this method is called at the end of class constructor. + Creates a dict, which maps file/pipe/sock descriptor to glib event id''' + self.events = {} + if gtk.pygtk_version >= (2, 8, 0): + # time() is already called in glib, we just get the last value + # overrides IdleQueue.current_time() + self.current_time = lambda: gobject.get_current_time() + + def add_idle(self, fd, flags): + ''' this method is called when we plug a new idle object. + Start listening for events from fd + ''' + res = gobject.io_add_watch(fd, flags, self.process_events, + priority=gobject.PRIORITY_LOW) + # store the id of the watch, so that we can remove it on unplug + self.events[fd] = res + + def remove_idle(self, fd): + ''' this method is called when we unplug a new idle object. + Stop listening for events from fd + ''' + gobject.source_remove(self.events[fd]) + del(self.events[fd]) + + def process(self): + self.check_time_events() + class Interface: def handle_event_roster(self, account, data): #('ROSTER', account, array) @@ -1333,27 +1368,17 @@ class Interface: 'SIGNED_IN': self.handle_event_signed_in, 'META_CONTACTS': self.handle_event_meta_contacts, } - - def exec_event(self, account): - ev = gajim.events_for_ui[account].pop(0) - self.handlers[ev[0]](account, ev[1]) + gajim.handlers = self.handlers def process_connections(self): - # We copy the list of connections because one can disappear while we - # process() - accounts = [] - for account in gajim.connections: - accounts.append(account) - for account in accounts: - if gajim.connections[account].connected: - gajim.connections[account].process(0.01) - if gajim.socks5queue.connected: - gajim.socks5queue.process(0) - for account in gajim.events_for_ui: #when we create a new account we don't have gajim.connection - while len(gajim.events_for_ui[account]): - gajim.mutex_events_for_ui.lock(self.exec_event, account) - gajim.mutex_events_for_ui.unlock() - time.sleep(0.01) # so threads in connection.py have time to run + ''' called each XXX (200) miliseconds. For now it checks for idlequeue timeouts + and FT events. + ''' + gajim.idlequeue.process() + + # TODO: rewrite socks5 classes to work with idlequeue and remove these lines + if gajim.socks5queue.connected: + gajim.socks5queue.process(0) return True # renew timeout (loop for ever) def save_config(self): @@ -1471,6 +1496,12 @@ class Interface: gajim.socks5queue = socks5.SocksQueue( self.handle_event_file_rcv_completed, self.handle_event_file_progress) + # in a nongui implementation, just call: + # gajim.idlequeue = IdleQueue() , and + # gajim.idlequeue.process() each foo miliseconds + gajim.idlequeue = GlibIdleQueue() + # resolve and keep current record of resolved hosts + gajim.resolver = nslookup.Resolver(gajim.idlequeue) self.register_handlers() for account in gajim.config.get_per('accounts'): gajim.connections[account] = common.connection.Connection(account) @@ -1495,7 +1526,6 @@ class Interface: gajim.encrypted_chats[a] = [] gajim.last_message_time[a] = {} gajim.status_before_autoaway[a] = '' - gajim.events_for_ui[a] = [] self.roster = roster_window.RosterWindow() diff --git a/src/roster_window.py b/src/roster_window.py index 97bd3c455..20bca6da4 100644 --- a/src/roster_window.py +++ b/src/roster_window.py @@ -723,7 +723,9 @@ class RosterWindow: def draw_roster(self): '''Clear and draw roster''' - self.tree.get_model().clear() + # clear the model, only if it is not empty + if self.tree.get_model(): + self.tree.get_model().clear() for acct in gajim.connections: self.add_account_to_roster(acct) for jid in gajim.contacts.get_jid_list(acct): @@ -899,7 +901,8 @@ class RosterWindow: # if we're online ... if connection.connection: roster = connection.connection.getRoster() - if roster.getItem(jid): + # in threadless connection when no roster stanza is sent, 'roster' is None + if roster and roster.getItem(jid): resources = roster.getResources(jid) # ...get the contact info for our other online resources for resource in resources: