diff --git a/src/common/zeroconf/client_zeroconf.py b/src/common/zeroconf/client_zeroconf.py index 59d78dc46..a4cd612cf 100644 --- a/src/common/zeroconf/client_zeroconf.py +++ b/src/common/zeroconf/client_zeroconf.py @@ -13,6 +13,7 @@ ## GNU General Public License for more details. ## from common import gajim +import common.xmpp from common.xmpp.idlequeue import IdleObject from common.xmpp import dispatcher_nb, simplexml from common.xmpp.client import * @@ -135,7 +136,7 @@ class P2PClient(IdleObject): self.Connection = conn self.Connection.PlugIn(self) dispatcher_nb.Dispatcher().PlugIn(self) - self.RegisterHandler('message', self._messageCB) + self._register_handlers() if self.sock_type == TYPE_CLIENT: while self.messagequeue: message = self.messagequeue.pop(0) @@ -206,10 +207,22 @@ class P2PClient(IdleObject): return True - def _messageCB(self, conn, data): - self._caller._messageCB(self.Server, conn, data) - - + + def _register_handlers(self): + self.RegisterHandler('message', lambda conn, data:self._caller._messageCB(self.Server, conn, data)) + self.RegisterHandler('iq', self._caller._siSetCB, 'set', + common.xmpp.NS_SI) + self.RegisterHandler('iq', self._caller._siErrorCB, 'error', + common.xmpp.NS_SI) + self.RegisterHandler('iq', self._caller._siResultCB, 'result', + common.xmpp.NS_SI) + self.RegisterHandler('iq', self._caller._bytestreamSetCB, 'set', + common.xmpp.NS_BYTESTREAM) + self.RegisterHandler('iq', self._caller._bytestreamResultCB, 'result', + common.xmpp.NS_BYTESTREAM) + self.RegisterHandler('iq', self._caller._bytestreamErrorCB, 'error', + common.xmpp.NS_BYTESTREAM) + class P2PConnection(IdleObject, PlugIn): ''' class for sending file to socket over socks5 ''' def __init__(self, sock_hash, _sock, host = None, port = None, caller = None, on_connect = None): diff --git a/src/common/zeroconf/connection_handlers_zeroconf.py b/src/common/zeroconf/connection_handlers_zeroconf.py index eab7fe3a4..54600f8ce 100644 --- a/src/common/zeroconf/connection_handlers_zeroconf.py +++ b/src/common/zeroconf/connection_handlers_zeroconf.py @@ -50,6 +50,479 @@ except: gajim.log.debug(_('Unable to load idle module')) HAS_IDLE = False + +class ConnectionBytestream: + def __init__(self): + self.files_props = {} + + def is_transfer_stoped(self, file_props): + if file_props.has_key('error') and file_props['error'] != 0: + return True + if file_props.has_key('completed') and file_props['completed']: + return True + if file_props.has_key('connected') and file_props['connected'] == False: + return True + if not file_props.has_key('stopped') or not file_props['stopped']: + return False + return True + + def send_success_connect_reply(self, streamhost): + ''' send reply to the initiator of FT that we + made a connection + ''' + if streamhost is None: + return None + iq = common.xmpp.Iq(to = streamhost['initiator'], typ = 'result', + frm = streamhost['target']) + iq.setAttr('id', streamhost['id']) + query = iq.setTag('query') + query.setNamespace(common.xmpp.NS_BYTESTREAM) + stream_tag = query.setTag('streamhost-used') + stream_tag.setAttr('jid', streamhost['jid']) + self.connection.send(iq) + + def remove_transfers_for_contact(self, contact): + ''' stop all active transfer for contact ''' + for file_props in self.files_props.values(): + if self.is_transfer_stoped(file_props): + continue + receiver_jid = unicode(file_props['receiver']).split('/')[0] + if contact.jid == receiver_jid: + file_props['error'] = -5 + self.remove_transfer(file_props) + self.dispatch('FILE_REQUEST_ERROR', (contact.jid, file_props, '')) + sender_jid = unicode(file_props['sender']) + if contact.jid == sender_jid: + file_props['error'] = -3 + self.remove_transfer(file_props) + + def remove_all_transfers(self): + ''' stops and removes all active connections from the socks5 pool ''' + for file_props in self.files_props.values(): + self.remove_transfer(file_props, remove_from_list = False) + del(self.files_props) + self.files_props = {} + + def remove_transfer(self, file_props, remove_from_list = True): + if file_props is None: + return + self.disconnect_transfer(file_props) + sid = file_props['sid'] + gajim.socks5queue.remove_file_props(self.name, sid) + + if remove_from_list: + if self.files_props.has_key('sid'): + del(self.files_props['sid']) + + def disconnect_transfer(self, file_props): + if file_props is None: + return + if file_props.has_key('hash'): + gajim.socks5queue.remove_sender(file_props['hash']) + + if file_props.has_key('streamhosts'): + for host in file_props['streamhosts']: + if host.has_key('idx') and host['idx'] > 0: + gajim.socks5queue.remove_receiver(host['idx']) + gajim.socks5queue.remove_sender(host['idx']) + + def send_socks5_info(self, file_props, fast = True, receiver = None, + sender = None): + ''' send iq for the present streamhosts and proxies ''' + if type(self.peerhost) != tuple: + return + port = gajim.config.get('file_transfers_port') + ft_override_host_to_send = gajim.config.get('ft_override_host_to_send') + if receiver is None: + receiver = file_props['receiver'] + if sender is None: + sender = file_props['sender'] + proxyhosts = [] + sha_str = helpers.get_auth_sha(file_props['sid'], sender, + receiver) + file_props['sha_str'] = sha_str + if not ft_override_host_to_send: + ft_override_host_to_send = self.peerhost[0] + try: + ft_override_host_to_send = socket.gethostbyname( + ft_override_host_to_send) + except socket.gaierror: + self.dispatch('ERROR', (_('Wrong host'), _('The host you configured as the ft_override_host_to_send advanced option is not valid, so ignored.'))) + ft_override_host_to_send = self.peerhost[0] + listener = gajim.socks5queue.start_listener(self.peerhost[0], port, + sha_str, self._result_socks5_sid, file_props['sid']) + if listener == None: + file_props['error'] = -5 + self.dispatch('FILE_REQUEST_ERROR', (unicode(receiver), file_props, + '')) + self._connect_error(unicode(receiver), file_props['sid'], + file_props['sid'], code = 406) + return + + iq = common.xmpp.Protocol(name = 'iq', to = unicode(receiver), + typ = 'set') + file_props['request-id'] = 'id_' + file_props['sid'] + iq.setID(file_props['request-id']) + query = iq.setTag('query') + query.setNamespace(common.xmpp.NS_BYTESTREAM) + query.setAttr('mode', 'tcp') + query.setAttr('sid', file_props['sid']) + streamhost = query.setTag('streamhost') + streamhost.setAttr('port', unicode(port)) + streamhost.setAttr('host', ft_override_host_to_send) + streamhost.setAttr('jid', sender) + self.connection.send(iq) + + def send_file_rejection(self, file_props): + ''' informs sender that we refuse to download the file ''' + # user response to ConfirmationDialog may come after we've disconneted + if not self.connection or self.connected < 2: + return + iq = common.xmpp.Protocol(name = 'iq', + to = unicode(file_props['sender']), typ = 'error') + iq.setAttr('id', file_props['request-id']) + err = common.xmpp.ErrorNode(code = '403', typ = 'cancel', name = + 'forbidden', text = 'Offer Declined') + iq.addChild(node=err) + self.connection.send(iq) + + def send_file_approval(self, file_props): + ''' send iq, confirming that we want to download the file ''' + # user response to ConfirmationDialog may come after we've disconneted + if not self.connection or self.connected < 2: + return + iq = common.xmpp.Protocol(name = 'iq', + to = unicode(file_props['sender']), typ = 'result') + iq.setAttr('id', file_props['request-id']) + si = iq.setTag('si') + si.setNamespace(common.xmpp.NS_SI) + if file_props.has_key('offset') and file_props['offset']: + file_tag = si.setTag('file') + file_tag.setNamespace(common.xmpp.NS_FILE) + range_tag = file_tag.setTag('range') + range_tag.setAttr('offset', file_props['offset']) + feature = si.setTag('feature') + feature.setNamespace(common.xmpp.NS_FEATURE) + _feature = common.xmpp.DataForm(typ='submit') + feature.addChild(node=_feature) + field = _feature.setField('stream-method') + field.delAttr('type') + field.setValue(common.xmpp.NS_BYTESTREAM) + self.connection.send(iq) + + def send_file_request(self, file_props): + ''' send iq for new FT request ''' + if not self.connection or self.connected < 2: + return + our_jid = gajim.get_jid_from_account(self.name) + frm = our_jid + file_props['sender'] = frm + fjid = file_props['receiver'].jid + iq = common.xmpp.Protocol(name = 'iq', to = fjid, + typ = 'set') + iq.setID(file_props['sid']) + self.files_props[file_props['sid']] = file_props + si = iq.setTag('si') + si.setNamespace(common.xmpp.NS_SI) + si.setAttr('profile', common.xmpp.NS_FILE) + si.setAttr('id', file_props['sid']) + file_tag = si.setTag('file') + file_tag.setNamespace(common.xmpp.NS_FILE) + file_tag.setAttr('name', file_props['name']) + file_tag.setAttr('size', file_props['size']) + desc = file_tag.setTag('desc') + if file_props.has_key('desc'): + desc.setData(file_props['desc']) + file_tag.setTag('range') + feature = si.setTag('feature') + feature.setNamespace(common.xmpp.NS_FEATURE) + _feature = common.xmpp.DataForm(typ='form') + feature.addChild(node=_feature) + field = _feature.setField('stream-method') + field.setAttr('type', 'list-single') + field.addOption(common.xmpp.NS_BYTESTREAM) + self.connection.send(iq) + + def _result_socks5_sid(self, sid, hash_id): + ''' store the result of sha message from auth. ''' + if not self.files_props.has_key(sid): + return + file_props = self.files_props[sid] + file_props['hash'] = hash_id + return + + def _connect_error(self, to, _id, sid, code = 404): + ''' cb, when there is an error establishing BS connection, or + when connection is rejected''' + msg_dict = { + 404: 'Could not connect to given hosts', + 405: 'Cancel', + 406: 'Not acceptable', + } + msg = msg_dict[code] + iq = None + iq = common.xmpp.Protocol(name = 'iq', to = to, + typ = 'error') + iq.setAttr('id', _id) + err = iq.setTag('error') + err.setAttr('code', unicode(code)) + err.setData(msg) + self.connection.send(iq) + if code == 404: + file_props = gajim.socks5queue.get_file_props(self.name, sid) + if file_props is not None: + self.disconnect_transfer(file_props) + file_props['error'] = -3 + self.dispatch('FILE_REQUEST_ERROR', (to, file_props, msg)) + + def _proxy_auth_ok(self, proxy): + '''cb, called after authentication to proxy server ''' + file_props = self.files_props[proxy['sid']] + iq = common.xmpp.Protocol(name = 'iq', to = proxy['initiator'], + typ = 'set') + auth_id = "au_" + proxy['sid'] + iq.setID(auth_id) + query = iq.setTag('query') + query.setNamespace(common.xmpp.NS_BYTESTREAM) + query.setAttr('sid', proxy['sid']) + activate = query.setTag('activate') + activate.setData(file_props['proxy_receiver']) + iq.setID(auth_id) + self.connection.send(iq) + + # register xmpppy handlers for bytestream and FT stanzas + def _bytestreamErrorCB(self, con, iq_obj): + gajim.log.debug('_bytestreamErrorCB') + id = unicode(iq_obj.getAttr('id')) + frm = unicode(iq_obj.getFrom()) + query = iq_obj.getTag('query') + gajim.proxy65_manager.error_cb(frm, query) + jid = unicode(iq_obj.getFrom()) + id = id[3:] + if not self.files_props.has_key(id): + return + file_props = self.files_props[id] + file_props['error'] = -4 + self.dispatch('FILE_REQUEST_ERROR', (jid, file_props, '')) + raise common.xmpp.NodeProcessed + + def _bytestreamSetCB(self, con, iq_obj): + gajim.log.debug('_bytestreamSetCB') + target = unicode(iq_obj.getAttr('to')) + id = unicode(iq_obj.getAttr('id')) + query = iq_obj.getTag('query') + sid = unicode(query.getAttr('sid')) + file_props = gajim.socks5queue.get_file_props( + self.name, sid) + streamhosts=[] + for item in query.getChildren(): + if item.getName() == 'streamhost': + host_dict={ + 'state': 0, + 'target': target, + 'id': id, + 'sid': sid, + 'initiator': unicode(iq_obj.getFrom()) + } + for attr in item.getAttrs(): + host_dict[attr] = item.getAttr(attr) + streamhosts.append(host_dict) + if file_props is None: + if self.files_props.has_key(sid): + file_props = self.files_props[sid] + file_props['fast'] = streamhosts + if file_props['type'] == 's': + if file_props.has_key('streamhosts'): + file_props['streamhosts'].extend(streamhosts) + else: + file_props['streamhosts'] = streamhosts + if not gajim.socks5queue.get_file_props(self.name, sid): + gajim.socks5queue.add_file_props(self.name, file_props) + gajim.socks5queue.connect_to_hosts(self.name, sid, + self.send_success_connect_reply, None) + raise common.xmpp.NodeProcessed + + file_props['streamhosts'] = streamhosts + if file_props['type'] == 'r': + gajim.socks5queue.connect_to_hosts(self.name, sid, + self.send_success_connect_reply, self._connect_error) + raise common.xmpp.NodeProcessed + + def _ResultCB(self, con, iq_obj): + gajim.log.debug('_ResultCB') + # if we want to respect jep-0065 we have to check for proxy + # activation result in any result iq + real_id = unicode(iq_obj.getAttr('id')) + if real_id[:3] != 'au_': + return + frm = unicode(iq_obj.getFrom()) + id = real_id[3:] + if self.files_props.has_key(id): + file_props = self.files_props[id] + if file_props['streamhost-used']: + for host in file_props['proxyhosts']: + if host['initiator'] == frm and host.has_key('idx'): + gajim.socks5queue.activate_proxy(host['idx']) + raise common.xmpp.NodeProcessed + + def _bytestreamResultCB(self, con, iq_obj): + gajim.log.debug('_bytestreamResultCB') + frm = unicode(iq_obj.getFrom()) + real_id = unicode(iq_obj.getAttr('id')) + query = iq_obj.getTag('query') + gajim.proxy65_manager.resolve_result(frm, query) + + try: + streamhost = query.getTag('streamhost-used') + except: # this bytestream result is not what we need + pass + id = real_id[3:] + if self.files_props.has_key(id): + file_props = self.files_props[id] + else: + raise common.xmpp.NodeProcessed + if streamhost is None: + # proxy approves the activate query + if real_id[:3] == 'au_': + id = real_id[3:] + if not file_props.has_key('streamhost-used') or \ + file_props['streamhost-used'] is False: + raise common.xmpp.NodeProcessed + if not file_props.has_key('proxyhosts'): + raise common.xmpp.NodeProcessed + for host in file_props['proxyhosts']: + if host['initiator'] == frm and \ + unicode(query.getAttr('sid')) == file_props['sid']: + gajim.socks5queue.activate_proxy(host['idx']) + break + raise common.xmpp.NodeProcessed + jid = streamhost.getAttr('jid') + if file_props.has_key('streamhost-used') and \ + file_props['streamhost-used'] is True: + raise common.xmpp.NodeProcessed + + if real_id[:3] == 'au_': + gajim.socks5queue.send_file(file_props, self.name) + raise common.xmpp.NodeProcessed + + proxy = None + if file_props.has_key('proxyhosts'): + for proxyhost in file_props['proxyhosts']: + if proxyhost['jid'] == jid: + proxy = proxyhost + + if proxy != None: + file_props['streamhost-used'] = True + if not file_props.has_key('streamhosts'): + file_props['streamhosts'] = [] + file_props['streamhosts'].append(proxy) + file_props['is_a_proxy'] = True + receiver = socks5.Socks5Receiver(gajim.idlequeue, proxy, file_props['sid'], file_props) + gajim.socks5queue.add_receiver(self.name, receiver) + proxy['idx'] = receiver.queue_idx + gajim.socks5queue.on_success = self._proxy_auth_ok + raise common.xmpp.NodeProcessed + + else: + gajim.socks5queue.send_file(file_props, self.name) + if file_props.has_key('fast'): + fasts = file_props['fast'] + if len(fasts) > 0: + self._connect_error(frm, fasts[0]['id'], file_props['sid'], + code = 406) + + raise common.xmpp.NodeProcessed + + def _siResultCB(self, con, iq_obj): + gajim.log.debug('_siResultCB') + self.peerhost = con._owner.Connection._sock.getsockname() + id = iq_obj.getAttr('id') + if not self.files_props.has_key(id): + # no such jid + return + file_props = self.files_props[id] + if file_props is None: + # file properties for jid is none + return + if file_props.has_key('request-id'): + # we have already sent streamhosts info + return + file_props['receiver'] = unicode(iq_obj.getFrom()) + si = iq_obj.getTag('si') + file_tag = si.getTag('file') + range_tag = None + if file_tag: + range_tag = file_tag.getTag('range') + if range_tag: + offset = range_tag.getAttr('offset') + if offset: + file_props['offset'] = int(offset) + length = range_tag.getAttr('length') + if length: + file_props['length'] = int(length) + feature = si.setTag('feature') + if feature.getNamespace() != common.xmpp.NS_FEATURE: + return + form_tag = feature.getTag('x') + form = common.xmpp.DataForm(node=form_tag) + field = form.getField('stream-method') + if field.getValue() != common.xmpp.NS_BYTESTREAM: + return + self.send_socks5_info(file_props, fast = True) + raise common.xmpp.NodeProcessed + + def _siSetCB(self, con, iq_obj): + gajim.log.debug('_siSetCB') + jid = unicode(iq_obj.getFrom()) + si = iq_obj.getTag('si') + profile = si.getAttr('profile') + mime_type = si.getAttr('mime-type') + if profile != common.xmpp.NS_FILE: + return + file_tag = si.getTag('file') + file_props = {'type': 'r'} + for attribute in file_tag.getAttrs(): + if attribute in ('name', 'size', 'hash', 'date'): + val = file_tag.getAttr(attribute) + if val is None: + continue + file_props[attribute] = val + file_desc_tag = file_tag.getTag('desc') + if file_desc_tag is not None: + file_props['desc'] = file_desc_tag.getData() + + if mime_type is not None: + file_props['mime-type'] = mime_type + our_jid = gajim.get_jid_from_account(self.name) + file_props['receiver'] = our_jid + file_props['sender'] = unicode(iq_obj.getFrom()) + file_props['request-id'] = unicode(iq_obj.getAttr('id')) + file_props['sid'] = unicode(si.getAttr('id')) + gajim.socks5queue.add_file_props(self.name, file_props) + self.dispatch('FILE_REQUEST', (jid, file_props)) + raise common.xmpp.NodeProcessed + + def _siErrorCB(self, con, iq_obj): + gajim.log.debug('_siErrorCB') + si = iq_obj.getTag('si') + profile = si.getAttr('profile') + if profile != common.xmpp.NS_FILE: + return + id = iq_obj.getAttr('id') + if not self.files_props.has_key(id): + # no such jid + return + file_props = self.files_props[id] + if file_props is None: + # file properties for jid is none + return + jid = unicode(iq_obj.getFrom()) + file_props['error'] = -3 + self.dispatch('FILE_REQUEST_ERROR', (jid, file_props, '')) + raise common.xmpp.NodeProcessed + + + class ConnectionVcard: def __init__(self): self.vcard_sha = None @@ -212,9 +685,10 @@ class ConnectionVcard: ''' pass -class ConnectionHandlersZeroconf(ConnectionVcard): +class ConnectionHandlersZeroconf(ConnectionVcard, ConnectionBytestream): def __init__(self): ConnectionVcard.__init__(self) + ConnectionBytestream.__init__(self) # List of IDs we are waiting answers for {id: (type_of_request, data), } self.awaiting_answers = {} # List of IDs that will produce a timeout is answer doesn't arrive