From eb589020ed5b5ad1549b3f004c282db41ea6721b Mon Sep 17 00:00:00 2001 From: Yann Leboulanger Date: Mon, 16 Aug 2010 14:33:50 +0200 Subject: [PATCH] ability to send / receive file using IBB based on xmpppy implementation. Fixes #2331 --- src/common/connection_handlers.py | 8 +- src/common/protocol/bytestream.py | 283 +++++++++++++++++++++++++++++- src/filetransfers_window.py | 5 +- 3 files changed, 286 insertions(+), 10 deletions(-) diff --git a/src/common/connection_handlers.py b/src/common/connection_handlers.py index 61798beff..af57b00a8 100644 --- a/src/common/connection_handlers.py +++ b/src/common/connection_handlers.py @@ -51,6 +51,7 @@ from common.pubsub import ConnectionPubSub from common.pep import ConnectionPEP from common.protocol.caps import ConnectionCaps from common.protocol.bytestream import ConnectionSocks5Bytestream +from common.protocol.bytestream import ConnectionIBBytestream from common.message_archiving import ConnectionArchive from common.message_archiving import ARCHIVING_COLLECTIONS_ARRIVED from common.message_archiving import ARCHIVING_COLLECTION_ARRIVED @@ -1038,12 +1039,13 @@ class ConnectionHandlersBase: class ConnectionHandlers(ConnectionArchive, ConnectionVcard, ConnectionSocks5Bytestream, ConnectionDisco, ConnectionCommands, ConnectionPubSub, ConnectionPEP, ConnectionCaps, ConnectionHandlersBase, -ConnectionJingle): +ConnectionJingle, ConnectionIBBytestream): def __init__(self): global HAS_IDLE ConnectionArchive.__init__(self) ConnectionVcard.__init__(self) ConnectionSocks5Bytestream.__init__(self) + ConnectionIBBytestream.__init__(self) ConnectionCommands.__init__(self) ConnectionPubSub.__init__(self) ConnectionPEP.__init__(self, account=self.name, dispatcher=self, @@ -2337,6 +2339,10 @@ ConnectionJingle): common.xmpp.NS_BYTESTREAM) con.RegisterHandler('iq', self._bytestreamErrorCB, 'error', common.xmpp.NS_BYTESTREAM) + con.RegisterHandlerOnce('iq', self.StreamOpenReplyHandler) + con.RegisterHandler('iq', self.IBBIqHandler, ns=common.xmpp.NS_IBB) + con.RegisterHandler('message', self.IBBMessageHandler, + ns=common.xmpp.NS_IBB) con.RegisterHandler('iq', self._DiscoverItemsCB, 'result', common.xmpp.NS_DISCO_ITEMS) con.RegisterHandler('iq', self._DiscoverItemsErrorCB, 'error', diff --git a/src/common/protocol/bytestream.py b/src/common/protocol/bytestream.py index 8277ed710..3942d5a84 100644 --- a/src/common/protocol/bytestream.py +++ b/src/common/protocol/bytestream.py @@ -29,6 +29,9 @@ ## import socket +import base64 +import gobject +import time from common import xmpp from common import gajim @@ -37,6 +40,8 @@ from common import dataforms from common.socks5 import Socks5Receiver +import logging +log = logging.getLogger('gajim.c.p.bytestream') def is_transfer_paused(file_props): if 'stopped' in file_props and file_props['stopped']: @@ -115,7 +120,8 @@ class ConnectionBytestream: feature.addChild(node=_feature) field = _feature.setField('stream-method') field.setAttr('type', 'list-single') - field.addOption(xmpp.NS_BYTESTREAM) + #field.addOption(xmpp.NS_BYTESTREAM) + field.addOption(xmpp.NS_IBB) self.connection.send(iq) def send_file_approval(self, file_props): @@ -137,7 +143,10 @@ class ConnectionBytestream: feature.addChild(node=_feature) field = _feature.setField('stream-method') field.delAttr('type') - field.setValue(xmpp.NS_BYTESTREAM) + if xmpp.NS_BYTESTREAM in file_props['stream-methods']: + field.setValue(xmpp.NS_BYTESTREAM) + else: + field.setValue(xmpp.NS_IBB) self.connection.send(iq) def send_file_rejection(self, file_props, code='403', typ=None): @@ -193,10 +202,14 @@ class ConnectionBytestream: form_tag = feature.getTag('x') form = xmpp.DataForm(node=form_tag) field = form.getField('stream-method') - if field.getValue() != xmpp.NS_BYTESTREAM: - return - self._send_socks5_info(file_props) - raise xmpp.NodeProcessed + if field.getValue() == xmpp.NS_BYTESTREAM: + self._send_socks5_info(file_props) + raise xmpp.NodeProcessed + if field.getValue() == xmpp.NS_IBB: + sid = file_props['sid'] + fp = open(file_props['file-name'], 'r') + self.OpenStream(sid, file_props['receiver'], fp) + raise xmpp.NodeProcessed def _siSetCB(self, con, iq_obj): jid = self._ft_get_from(iq_obj) @@ -219,7 +232,8 @@ class ConnectionBytestream: for f in form.iter_fields(): if f.var == 'stream-method' and f.type == 'list-single': values = [o[1] for o in f.options] - if xmpp.NS_BYTESTREAM in values: + file_props['stream-methods'] = ' '.join(values) + if xmpp.NS_BYTESTREAM in values or xmpp.NS_IBB in values: break else: self.send_file_rejection(file_props, code='400', typ='stream') @@ -322,6 +336,12 @@ class ConnectionSocks5Bytestream(ConnectionBytestream): if 'idx' in host and host['idx'] > 0: gajim.socks5queue.remove_receiver(host['idx']) gajim.socks5queue.remove_sender(host['idx']) + + if 'direction' in file_props: + # it's a IBB + sid = file_props['sid'] + if sid in self.files_props: + del self.files_props[sid] def _send_socks5_info(self, file_props): """ @@ -634,6 +654,255 @@ class ConnectionSocks5Bytestream(ConnectionBytestream): raise xmpp.NodeProcessed + +class ConnectionIBBytestream(ConnectionBytestream): + + def __init__(self): + ConnectionBytestream.__init__(self) + self._streams = {} + self._ampnode = xmpp.Node(xmpp.NS_AMP + ' amp', payload=[xmpp.Node( + 'rule', {'condition': 'deliver-at', 'value': 'stored', + 'action': 'error'}), xmpp.Node('rule', + {'condition': 'match-resource', 'value': 'exact', + 'action':'error'})]) + self.timout_id = None + + def IBBIqHandler(self, conn, stanza): + """ + Handles streams state change. Used internally. + """ + typ = stanza.getType() + log.debug('IBBIqHandler called typ->%s' % typ) + if typ == 'set' and stanza.getTag('open', namespace=xmpp.NS_IBB): + self.StreamOpenHandler(conn, stanza) + elif typ == 'set' and stanza.getTag('close', namespace=xmpp.NS_IBB): + self.StreamCloseHandler(conn, stanza) + elif typ == 'result': + self.StreamCommitHandler(conn, stanza) + elif typ == 'error': + self.StreamOpenReplyHandler(conn, stanza) + else: + conn.send(xmpp.Error(stanza, xmpp.ERR_BAD_REQUEST)) + raise xmpp.NodeProcessed + + def StreamOpenHandler(self, conn, stanza): + """ + Handles opening of new incoming stream. Used internally. + """ + err = None + sid = stanza.getTagAttr('open', 'sid') + blocksize = stanza.getTagAttr('open', 'block-size') + log.debug('StreamOpenHandler called sid->%s blocksize->%s' % (sid, + blocksize)) + try: + blocksize = int(blocksize) + except: + err = xmpp.ERR_BAD_REQUEST + if not sid or not blocksize: + err = xmpp.ERR_BAD_REQUEST + elif not gajim.socks5queue.get_file_props(self.name, sid): + err = xmpp.ERR_UNEXPECTED_REQUEST + if err: + rep = xmpp.Error(stanza, err) + else: + file_props = gajim.socks5queue.get_file_props(self.name, sid) + log.debug("Opening stream: id %s, block-size %s" % (sid, blocksize)) + rep = xmpp.Protocol('iq', stanza.getFrom(), 'result', + stanza.getTo(), {'id': stanza.getID()}) + file_props['block-size'] = blocksize + file_props['seq'] = 0 + file_props['received-len'] = 0 + file_props['last-time'] = time.time() + file_props['error'] = 0 + file_props['paused'] = False + file_props['connected'] = True + file_props['completed'] = False + file_props['disconnect_cb'] = None + file_props['continue_cb'] = None + file_props['syn_id'] = stanza.getID() + file_props['fp'] = open(file_props['file-name'], 'w') + conn.send(rep) + + def OpenStream(self, sid, to, fp, blocksize=3000): + """ + Start new stream. You should provide stream id 'sid', the endpoind jid + 'to', the file object containing info for send 'fp'. Also the desired + blocksize can be specified. + Take into account that recommended stanza size is 4k and IBB uses + base64 encoding that increases size of data by 1/3. + """ + if sid not in self.files_props.keys(): + return + if not xmpp.JID(to).getResource(): + return + self.files_props[sid]['direction'] = '|>' + to + self.files_props[sid]['block-size'] = blocksize + self.files_props[sid]['fp'] = fp + self.files_props[sid]['seq'] = 0 + self.files_props[sid]['error'] = 0 + self.files_props[sid]['paused'] = False + self.files_props[sid]['received-len'] = 0 + self.files_props[sid]['last-time'] = time.time() + self.files_props[sid]['connected'] = True + self.files_props[sid]['completed'] = False + self.files_props[sid]['disconnect_cb'] = None + self.files_props[sid]['continue_cb'] = None + if not self.timout_id: + self.timout_id = gobject.timeout_add_seconds(3, self.SendHandler) + self.SendHandler() # start sending now + syn = xmpp.Protocol('iq', to, 'set', payload=[xmpp.Node(xmpp.NS_IBB + \ + ' open', {'sid': sid, 'block-size': blocksize})]) + self.connection.send(syn) + self.files_props[sid]['syn_id'] = syn.getID() + return self.files_props[sid] + + def SendHandler(self): + """ + Send next portion of data if it is time to do it. Used internally. + """ + log.debug('SendHandler called') + if not self.files_props: + self.timout_id = None + return False + for file_props in self.files_props.values(): + if 'direction' not in file_props: + # it's socks5 bytestream + continue + sid = file_props['sid'] + if file_props['direction'][:2] == '|>': + # We waitthat other part accept stream + continue + if file_props['direction'][0] == '>': + if 'paused' in file_props and file_props['paused']: + continue + chunk = file_props['fp'].read(file_props['block-size']) + if chunk: + datanode = xmpp.Node(xmpp.NS_IBB + ' data', {'sid': sid, + 'seq': file_props['seq']}, base64.encodestring(chunk)) + file_props['seq'] += 1 + file_props['started'] = True + if file_props['seq'] == 65536: + file_props['seq'] = 0 + self.connection.send(xmpp.Protocol('message', + file_props['direction'][1:], payload=[datanode, + self._ampnode])) + current_time = time.time() + file_props['elapsed-time'] += current_time - file_props[ + 'last-time'] + file_props['last-time'] = current_time + file_props['received-len'] += len(chunk) + gajim.socks5queue.progress_transfer_cb(self.name, + file_props) + else: + # notify the other side about stream closing + # notify the local user about sucessfull send + # delete the local stream + self.connection.send(xmpp.Protocol('iq', + file_props['direction'][1:], 'set', + payload=[xmpp.Node(xmpp.NS_IBB + ' close', + {'sid':sid})])) + file_props['completed'] = True + del self.files_props[sid] + if not self.files_props: + self.timout_id = None + return False + return True + + def IBBMessageHandler(self, conn, stanza): + """ + Receive next portion of incoming datastream and store it write + it to temporary file. Used internally. + """ + sid = stanza.getTagAttr('data', 'sid') + seq = stanza.getTagAttr('data', 'seq') + data = stanza.getTagData('data') + log.debug('ReceiveHandler called sid->%s seq->%s' % (sid, seq)) + try: + seq = int(seq) + data = base64.decodestring(data) + except Exception: + seq = '' + data = '' + err = None + if not gajim.socks5queue.get_file_props(self.name, sid): + err = xmpp.ERR_ITEM_NOT_FOUND + else: + file_props = gajim.socks5queue.get_file_props(self.name, sid) + if not data: + err = xmpp.ERR_BAD_REQUEST + elif seq <> file_props['seq']: + err = xmpp.ERR_UNEXPECTED_REQUEST + else: + log.debug('Successfull receive sid->%s %s+%s bytes' % (sid, + file_props['fp'].tell(), len(data))) + file_props['seq'] += 1 + file_props['started'] = True + file_props['fp'].write(data) + current_time = time.time() + file_props['elapsed-time'] += current_time - file_props[ + 'last-time'] + file_props['last-time'] = current_time + file_props['received-len'] += len(data) + gajim.socks5queue.progress_transfer_cb(self.name, file_props) + if file_props['received-len'] >= file_props['size']: + file_props['completed'] = True + if err: + log.debug('Error on receive: %s' % err) + conn.send(xmpp.Error(xmpp.Iq(to=stanza.getFrom(), + frm=stanza.getTo(), + payload=[xmpp.Node(xmpp.NS_IBB + ' close')]), err, reply=0)) + + def StreamCloseHandler(self, conn, stanza): + """ + Handle stream closure due to all data transmitted. + Raise xmpppy event specifying successfull data receive. + """ + sid = stanza.getTagAttr('close', 'sid') + log.debug('StreamCloseHandler called sid->%s' % sid) + # look in sending files + if sid in self.files_props.keys(): + conn.send(stanza.buildReply('result')) + gajim.socks5queue.complete_transfer_cb(self.name, file_props) + del self.files_props[sid] + # look in receiving files + elif gajim.socks5queue.get_file_props(self.name, sid): + file_props = gajim.socks5queue.get_file_props(self.name, sid) + conn.send(stanza.buildReply('result')) + file_props['fp'].close() + gajim.socks5queue.complete_transfer_cb(self.name, file_props) + gajim.socks5queue.remove_file_props(self.name, sid) + else: + conn.send(xmpp.Error(stanza, xmpp.ERR_ITEM_NOT_FOUND)) + + def StreamOpenReplyHandler(self, conn, stanza): + """ + Handle remote side reply about if it agree or not to receive our + datastream. + Used internally. Raises xmpppy event specfiying if the data transfer + is agreed upon. + """ + syn_id = stanza.getID() + log.debug('StreamOpenReplyHandler called syn_id->%s' % syn_id) + for sid in self.files_props.keys(): + file_props = self.files_props[sid] + if not 'direction' in file_props: + # It's socks5 bytestream + continue + if file_props['syn_id'] == syn_id: + if stanza.getType() == 'error': + if file_props['direction'][0] == '<': + conn.Event('IBB', 'ERROR ON RECEIVE', file_props) + else: + conn.Event('IBB', 'ERROR ON SEND', file_props) + del self.files_props[sid] + elif stanza.getType() == 'result': + if file_props['direction'][0] == '|': + file_props['direction'] = file_props['direction'][1:] + conn.Event('IBB', 'STREAM COMMITTED', file_props) + else: + conn.send(xmpp.Error(stanza, + xmpp.ERR_UNEXPECTED_REQUEST)) + class ConnectionSocks5BytestreamZeroconf(ConnectionSocks5Bytestream): def _ft_get_from(self, iq_obj): diff --git a/src/filetransfers_window.py b/src/filetransfers_window.py index 62c9948af..4a6f9bc49 100644 --- a/src/filetransfers_window.py +++ b/src/filetransfers_window.py @@ -801,13 +801,14 @@ class FileTransfersWindow: s_iter = selected[1] sid = self.model[s_iter][C_SID].decode('utf-8') file_props = self.files_props[sid[0]][sid[1:]] - if self.is_transfer_paused(file_props): + if is_transfer_paused(file_props): file_props['last-time'] = time.time() file_props['paused'] = False types = {'r' : 'download', 's' : 'upload'} self.set_status(file_props['type'], file_props['sid'], types[sid[0]]) self.toggle_pause_continue(True) - file_props['continue_cb']() + if file_props['continue_cb']: + file_props['continue_cb']() elif is_transfer_active(file_props): file_props['paused'] = True self.set_status(file_props['type'], file_props['sid'], 'pause')