ability to send / receive file using IBB based on xmpppy implementation. Fixes #2331

This commit is contained in:
Yann Leboulanger 2010-08-16 14:33:50 +02:00
parent 4ef341bf2e
commit eb589020ed
3 changed files with 286 additions and 10 deletions

View File

@ -51,6 +51,7 @@ from common.pubsub import ConnectionPubSub
from common.pep import ConnectionPEP
from common.protocol.caps import ConnectionCaps
from common.protocol.bytestream import ConnectionSocks5Bytestream
from common.protocol.bytestream import ConnectionIBBytestream
from common.message_archiving import ConnectionArchive
from common.message_archiving import ARCHIVING_COLLECTIONS_ARRIVED
from common.message_archiving import ARCHIVING_COLLECTION_ARRIVED
@ -1038,12 +1039,13 @@ class ConnectionHandlersBase:
class ConnectionHandlers(ConnectionArchive, ConnectionVcard,
ConnectionSocks5Bytestream, ConnectionDisco, ConnectionCommands,
ConnectionPubSub, ConnectionPEP, ConnectionCaps, ConnectionHandlersBase,
ConnectionJingle):
ConnectionJingle, ConnectionIBBytestream):
def __init__(self):
global HAS_IDLE
ConnectionArchive.__init__(self)
ConnectionVcard.__init__(self)
ConnectionSocks5Bytestream.__init__(self)
ConnectionIBBytestream.__init__(self)
ConnectionCommands.__init__(self)
ConnectionPubSub.__init__(self)
ConnectionPEP.__init__(self, account=self.name, dispatcher=self,
@ -2337,6 +2339,10 @@ ConnectionJingle):
common.xmpp.NS_BYTESTREAM)
con.RegisterHandler('iq', self._bytestreamErrorCB, 'error',
common.xmpp.NS_BYTESTREAM)
con.RegisterHandlerOnce('iq', self.StreamOpenReplyHandler)
con.RegisterHandler('iq', self.IBBIqHandler, ns=common.xmpp.NS_IBB)
con.RegisterHandler('message', self.IBBMessageHandler,
ns=common.xmpp.NS_IBB)
con.RegisterHandler('iq', self._DiscoverItemsCB, 'result',
common.xmpp.NS_DISCO_ITEMS)
con.RegisterHandler('iq', self._DiscoverItemsErrorCB, 'error',

View File

@ -29,6 +29,9 @@
##
import socket
import base64
import gobject
import time
from common import xmpp
from common import gajim
@ -37,6 +40,8 @@ from common import dataforms
from common.socks5 import Socks5Receiver
import logging
log = logging.getLogger('gajim.c.p.bytestream')
def is_transfer_paused(file_props):
if 'stopped' in file_props and file_props['stopped']:
@ -115,7 +120,8 @@ class ConnectionBytestream:
feature.addChild(node=_feature)
field = _feature.setField('stream-method')
field.setAttr('type', 'list-single')
field.addOption(xmpp.NS_BYTESTREAM)
#field.addOption(xmpp.NS_BYTESTREAM)
field.addOption(xmpp.NS_IBB)
self.connection.send(iq)
def send_file_approval(self, file_props):
@ -137,7 +143,10 @@ class ConnectionBytestream:
feature.addChild(node=_feature)
field = _feature.setField('stream-method')
field.delAttr('type')
field.setValue(xmpp.NS_BYTESTREAM)
if xmpp.NS_BYTESTREAM in file_props['stream-methods']:
field.setValue(xmpp.NS_BYTESTREAM)
else:
field.setValue(xmpp.NS_IBB)
self.connection.send(iq)
def send_file_rejection(self, file_props, code='403', typ=None):
@ -193,10 +202,14 @@ class ConnectionBytestream:
form_tag = feature.getTag('x')
form = xmpp.DataForm(node=form_tag)
field = form.getField('stream-method')
if field.getValue() != xmpp.NS_BYTESTREAM:
return
self._send_socks5_info(file_props)
raise xmpp.NodeProcessed
if field.getValue() == xmpp.NS_BYTESTREAM:
self._send_socks5_info(file_props)
raise xmpp.NodeProcessed
if field.getValue() == xmpp.NS_IBB:
sid = file_props['sid']
fp = open(file_props['file-name'], 'r')
self.OpenStream(sid, file_props['receiver'], fp)
raise xmpp.NodeProcessed
def _siSetCB(self, con, iq_obj):
jid = self._ft_get_from(iq_obj)
@ -219,7 +232,8 @@ class ConnectionBytestream:
for f in form.iter_fields():
if f.var == 'stream-method' and f.type == 'list-single':
values = [o[1] for o in f.options]
if xmpp.NS_BYTESTREAM in values:
file_props['stream-methods'] = ' '.join(values)
if xmpp.NS_BYTESTREAM in values or xmpp.NS_IBB in values:
break
else:
self.send_file_rejection(file_props, code='400', typ='stream')
@ -322,6 +336,12 @@ class ConnectionSocks5Bytestream(ConnectionBytestream):
if 'idx' in host and host['idx'] > 0:
gajim.socks5queue.remove_receiver(host['idx'])
gajim.socks5queue.remove_sender(host['idx'])
if 'direction' in file_props:
# it's a IBB
sid = file_props['sid']
if sid in self.files_props:
del self.files_props[sid]
def _send_socks5_info(self, file_props):
"""
@ -634,6 +654,255 @@ class ConnectionSocks5Bytestream(ConnectionBytestream):
raise xmpp.NodeProcessed
class ConnectionIBBytestream(ConnectionBytestream):
def __init__(self):
ConnectionBytestream.__init__(self)
self._streams = {}
self._ampnode = xmpp.Node(xmpp.NS_AMP + ' amp', payload=[xmpp.Node(
'rule', {'condition': 'deliver-at', 'value': 'stored',
'action': 'error'}), xmpp.Node('rule',
{'condition': 'match-resource', 'value': 'exact',
'action':'error'})])
self.timout_id = None
def IBBIqHandler(self, conn, stanza):
"""
Handles streams state change. Used internally.
"""
typ = stanza.getType()
log.debug('IBBIqHandler called typ->%s' % typ)
if typ == 'set' and stanza.getTag('open', namespace=xmpp.NS_IBB):
self.StreamOpenHandler(conn, stanza)
elif typ == 'set' and stanza.getTag('close', namespace=xmpp.NS_IBB):
self.StreamCloseHandler(conn, stanza)
elif typ == 'result':
self.StreamCommitHandler(conn, stanza)
elif typ == 'error':
self.StreamOpenReplyHandler(conn, stanza)
else:
conn.send(xmpp.Error(stanza, xmpp.ERR_BAD_REQUEST))
raise xmpp.NodeProcessed
def StreamOpenHandler(self, conn, stanza):
"""
Handles opening of new incoming stream. Used internally.
"""
err = None
sid = stanza.getTagAttr('open', 'sid')
blocksize = stanza.getTagAttr('open', 'block-size')
log.debug('StreamOpenHandler called sid->%s blocksize->%s' % (sid,
blocksize))
try:
blocksize = int(blocksize)
except:
err = xmpp.ERR_BAD_REQUEST
if not sid or not blocksize:
err = xmpp.ERR_BAD_REQUEST
elif not gajim.socks5queue.get_file_props(self.name, sid):
err = xmpp.ERR_UNEXPECTED_REQUEST
if err:
rep = xmpp.Error(stanza, err)
else:
file_props = gajim.socks5queue.get_file_props(self.name, sid)
log.debug("Opening stream: id %s, block-size %s" % (sid, blocksize))
rep = xmpp.Protocol('iq', stanza.getFrom(), 'result',
stanza.getTo(), {'id': stanza.getID()})
file_props['block-size'] = blocksize
file_props['seq'] = 0
file_props['received-len'] = 0
file_props['last-time'] = time.time()
file_props['error'] = 0
file_props['paused'] = False
file_props['connected'] = True
file_props['completed'] = False
file_props['disconnect_cb'] = None
file_props['continue_cb'] = None
file_props['syn_id'] = stanza.getID()
file_props['fp'] = open(file_props['file-name'], 'w')
conn.send(rep)
def OpenStream(self, sid, to, fp, blocksize=3000):
"""
Start new stream. You should provide stream id 'sid', the endpoind jid
'to', the file object containing info for send 'fp'. Also the desired
blocksize can be specified.
Take into account that recommended stanza size is 4k and IBB uses
base64 encoding that increases size of data by 1/3.
"""
if sid not in self.files_props.keys():
return
if not xmpp.JID(to).getResource():
return
self.files_props[sid]['direction'] = '|>' + to
self.files_props[sid]['block-size'] = blocksize
self.files_props[sid]['fp'] = fp
self.files_props[sid]['seq'] = 0
self.files_props[sid]['error'] = 0
self.files_props[sid]['paused'] = False
self.files_props[sid]['received-len'] = 0
self.files_props[sid]['last-time'] = time.time()
self.files_props[sid]['connected'] = True
self.files_props[sid]['completed'] = False
self.files_props[sid]['disconnect_cb'] = None
self.files_props[sid]['continue_cb'] = None
if not self.timout_id:
self.timout_id = gobject.timeout_add_seconds(3, self.SendHandler)
self.SendHandler() # start sending now
syn = xmpp.Protocol('iq', to, 'set', payload=[xmpp.Node(xmpp.NS_IBB + \
' open', {'sid': sid, 'block-size': blocksize})])
self.connection.send(syn)
self.files_props[sid]['syn_id'] = syn.getID()
return self.files_props[sid]
def SendHandler(self):
"""
Send next portion of data if it is time to do it. Used internally.
"""
log.debug('SendHandler called')
if not self.files_props:
self.timout_id = None
return False
for file_props in self.files_props.values():
if 'direction' not in file_props:
# it's socks5 bytestream
continue
sid = file_props['sid']
if file_props['direction'][:2] == '|>':
# We waitthat other part accept stream
continue
if file_props['direction'][0] == '>':
if 'paused' in file_props and file_props['paused']:
continue
chunk = file_props['fp'].read(file_props['block-size'])
if chunk:
datanode = xmpp.Node(xmpp.NS_IBB + ' data', {'sid': sid,
'seq': file_props['seq']}, base64.encodestring(chunk))
file_props['seq'] += 1
file_props['started'] = True
if file_props['seq'] == 65536:
file_props['seq'] = 0
self.connection.send(xmpp.Protocol('message',
file_props['direction'][1:], payload=[datanode,
self._ampnode]))
current_time = time.time()
file_props['elapsed-time'] += current_time - file_props[
'last-time']
file_props['last-time'] = current_time
file_props['received-len'] += len(chunk)
gajim.socks5queue.progress_transfer_cb(self.name,
file_props)
else:
# notify the other side about stream closing
# notify the local user about sucessfull send
# delete the local stream
self.connection.send(xmpp.Protocol('iq',
file_props['direction'][1:], 'set',
payload=[xmpp.Node(xmpp.NS_IBB + ' close',
{'sid':sid})]))
file_props['completed'] = True
del self.files_props[sid]
if not self.files_props:
self.timout_id = None
return False
return True
def IBBMessageHandler(self, conn, stanza):
"""
Receive next portion of incoming datastream and store it write
it to temporary file. Used internally.
"""
sid = stanza.getTagAttr('data', 'sid')
seq = stanza.getTagAttr('data', 'seq')
data = stanza.getTagData('data')
log.debug('ReceiveHandler called sid->%s seq->%s' % (sid, seq))
try:
seq = int(seq)
data = base64.decodestring(data)
except Exception:
seq = ''
data = ''
err = None
if not gajim.socks5queue.get_file_props(self.name, sid):
err = xmpp.ERR_ITEM_NOT_FOUND
else:
file_props = gajim.socks5queue.get_file_props(self.name, sid)
if not data:
err = xmpp.ERR_BAD_REQUEST
elif seq <> file_props['seq']:
err = xmpp.ERR_UNEXPECTED_REQUEST
else:
log.debug('Successfull receive sid->%s %s+%s bytes' % (sid,
file_props['fp'].tell(), len(data)))
file_props['seq'] += 1
file_props['started'] = True
file_props['fp'].write(data)
current_time = time.time()
file_props['elapsed-time'] += current_time - file_props[
'last-time']
file_props['last-time'] = current_time
file_props['received-len'] += len(data)
gajim.socks5queue.progress_transfer_cb(self.name, file_props)
if file_props['received-len'] >= file_props['size']:
file_props['completed'] = True
if err:
log.debug('Error on receive: %s' % err)
conn.send(xmpp.Error(xmpp.Iq(to=stanza.getFrom(),
frm=stanza.getTo(),
payload=[xmpp.Node(xmpp.NS_IBB + ' close')]), err, reply=0))
def StreamCloseHandler(self, conn, stanza):
"""
Handle stream closure due to all data transmitted.
Raise xmpppy event specifying successfull data receive.
"""
sid = stanza.getTagAttr('close', 'sid')
log.debug('StreamCloseHandler called sid->%s' % sid)
# look in sending files
if sid in self.files_props.keys():
conn.send(stanza.buildReply('result'))
gajim.socks5queue.complete_transfer_cb(self.name, file_props)
del self.files_props[sid]
# look in receiving files
elif gajim.socks5queue.get_file_props(self.name, sid):
file_props = gajim.socks5queue.get_file_props(self.name, sid)
conn.send(stanza.buildReply('result'))
file_props['fp'].close()
gajim.socks5queue.complete_transfer_cb(self.name, file_props)
gajim.socks5queue.remove_file_props(self.name, sid)
else:
conn.send(xmpp.Error(stanza, xmpp.ERR_ITEM_NOT_FOUND))
def StreamOpenReplyHandler(self, conn, stanza):
"""
Handle remote side reply about if it agree or not to receive our
datastream.
Used internally. Raises xmpppy event specfiying if the data transfer
is agreed upon.
"""
syn_id = stanza.getID()
log.debug('StreamOpenReplyHandler called syn_id->%s' % syn_id)
for sid in self.files_props.keys():
file_props = self.files_props[sid]
if not 'direction' in file_props:
# It's socks5 bytestream
continue
if file_props['syn_id'] == syn_id:
if stanza.getType() == 'error':
if file_props['direction'][0] == '<':
conn.Event('IBB', 'ERROR ON RECEIVE', file_props)
else:
conn.Event('IBB', 'ERROR ON SEND', file_props)
del self.files_props[sid]
elif stanza.getType() == 'result':
if file_props['direction'][0] == '|':
file_props['direction'] = file_props['direction'][1:]
conn.Event('IBB', 'STREAM COMMITTED', file_props)
else:
conn.send(xmpp.Error(stanza,
xmpp.ERR_UNEXPECTED_REQUEST))
class ConnectionSocks5BytestreamZeroconf(ConnectionSocks5Bytestream):
def _ft_get_from(self, iq_obj):

View File

@ -801,13 +801,14 @@ class FileTransfersWindow:
s_iter = selected[1]
sid = self.model[s_iter][C_SID].decode('utf-8')
file_props = self.files_props[sid[0]][sid[1:]]
if self.is_transfer_paused(file_props):
if is_transfer_paused(file_props):
file_props['last-time'] = time.time()
file_props['paused'] = False
types = {'r' : 'download', 's' : 'upload'}
self.set_status(file_props['type'], file_props['sid'], types[sid[0]])
self.toggle_pause_continue(True)
file_props['continue_cb']()
if file_props['continue_cb']:
file_props['continue_cb']()
elif is_transfer_active(file_props):
file_props['paused'] = True
self.set_status(file_props['type'], file_props['sid'], 'pause')