Move IBB code to new IBB module

This commit is contained in:
Philipp Hörist 2019-05-19 15:16:37 +02:00
parent 7ec93f89a2
commit fdfc9c90a1
6 changed files with 233 additions and 265 deletions

View File

@ -34,7 +34,6 @@ from gajim.common import helpers
from gajim.common import jingle_xtls
from gajim.common.jingle import ConnectionJingle
from gajim.common.protocol.bytestream import ConnectionSocks5Bytestream
from gajim.common.protocol.bytestream import ConnectionIBBytestream
from gajim.common.connection_handlers_events import StreamReceivedEvent
from gajim.common.connection_handlers_events import PresenceReceivedEvent
from gajim.common.connection_handlers_events import StreamConflictReceivedEvent
@ -185,10 +184,9 @@ class ConnectionHandlersBase:
class ConnectionHandlers(ConnectionSocks5Bytestream,
ConnectionHandlersBase,
ConnectionJingle, ConnectionIBBytestream):
ConnectionJingle):
def __init__(self):
ConnectionSocks5Bytestream.__init__(self)
ConnectionIBBytestream.__init__(self)
ConnectionJingle.__init__(self)
ConnectionHandlersBase.__init__(self)
@ -223,10 +221,6 @@ class ConnectionHandlers(ConnectionSocks5Bytestream,
nbxmpp.NS_BYTESTREAM)
con.RegisterHandler('iq', self._bytestreamErrorCB, 'error',
nbxmpp.NS_BYTESTREAM)
con.RegisterHandlerOnce('iq', self.IBBAllIqHandler)
con.RegisterHandler('iq', self.IBBIqHandler, ns=nbxmpp.NS_IBB)
con.RegisterHandler('message', self.IBBMessageHandler, ns=nbxmpp.NS_IBB)
con.RegisterHandler('iq', self._JingleCB, 'result')
con.RegisterHandler('iq', self._JingleCB, 'error')
con.RegisterHandler('iq', self._JingleCB, 'set', nbxmpp.NS_JINGLE)

View File

@ -146,8 +146,9 @@ class StateTransfering(JingleFileTransferStates):
def _start_ibb_transfer(self, con):
self.jft.file_props.transport_sid = self.jft.transport.sid
fp = open(self.jft.file_props.file_name, 'rb')
con.OpenStream(self.jft.file_props.sid, self.jft.session.peerjid, fp,
blocksize=4096)
con.get_module('IBB').send_open(self.jft.session.peerjid,
self.jft.file_props.sid,
fp)
def _start_sock5_transfer(self):
# It tells wether we start the transfer as client or server

View File

@ -74,6 +74,7 @@ MODULES = [
'vcard_avatars',
'vcard_temp',
'announce',
'ibb',
]
_imported_modules = [] # type: List[tuple]

227
gajim/common/modules/ibb.py Normal file
View File

@ -0,0 +1,227 @@
# This file is part of Gajim.
#
# Gajim is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# Gajim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
# XEP-0047: In-Band Bytestreams
import time
import nbxmpp
from nbxmpp.protocol import NodeProcessed
from nbxmpp.structs import StanzaHandler
from nbxmpp.util import is_error_result
from gajim.common import app
from gajim.common.modules.base import BaseModule
from gajim.common.file_props import FilesProp
class IBB(BaseModule):
_nbxmpp_extends = 'IBB'
_nbxmpp_methods = [
'send_open',
'send_close',
'send_data',
'send_reply',
]
def __init__(self, con):
BaseModule.__init__(self, con)
self.handlers = [
StanzaHandler(name='iq',
callback=self._ibb_received,
ns=nbxmpp.NS_IBB),
]
def _ibb_received(self, _con, stanza, properties):
if not properties.is_ibb:
return
if properties.ibb.type == 'data':
self._log.info('Data received, sid: %s, seq: %s',
properties.ibb.sid, properties.ibb.seq)
file_props = FilesProp.getFilePropByTransportSid(self._account,
properties.ibb.sid)
if not file_props:
self.send_reply(stanza, nbxmpp.ERR_ITEM_NOT_FOUND)
raise NodeProcessed
if file_props.connected:
self._on_data_received(stanza, file_props, properties)
self.send_reply(stanza)
elif properties.ibb.type == 'open':
self._log.info('Open received, sid: %s, blocksize: %s',
properties.ibb.sid, properties.ibb.block_size)
file_props = FilesProp.getFilePropByTransportSid(self._account,
properties.ibb.sid)
if not file_props:
self.send_reply(stanza, nbxmpp.ERR_ITEM_NOT_FOUND)
raise NodeProcessed
file_props.block_size = properties.ibb.block_size
file_props.direction = '<'
file_props.seq = 0
file_props.received_len = 0
file_props.last_time = time.time()
file_props.error = 0
file_props.paused = False
file_props.connected = True
file_props.completed = False
file_props.disconnect_cb = None
file_props.continue_cb = None
file_props.syn_id = stanza.getID()
file_props.fp = open(file_props.file_name, 'wb')
self.send_reply(stanza)
elif properties.ibb.type == 'close':
self._log.info('Close received, sid: %s', properties.ibb.sid)
file_props = FilesProp.getFilePropByTransportSid(self._account,
properties.ibb.sid)
if not file_props:
self.send_reply(stanza, nbxmpp.ERR_ITEM_NOT_FOUND)
raise NodeProcessed
self.send_reply(stanza)
file_props.fp.close()
file_props.completed = file_props.received_len >= file_props.size
if not file_props.completed:
file_props.error = -1
app.socks5queue.complete_transfer_cb(self._account, file_props)
raise NodeProcessed
def _on_data_received(self, stanza, file_props, properties):
ibb = properties.ibb
if ibb.seq != file_props.seq:
self.send_reply(stanza, nbxmpp.ERR_UNEXPECTED_REQUEST)
self.send_close(file_props)
raise NodeProcessed
self._log.debug('Data received: sid: %s, %s+%s bytes',
ibb.sid, file_props.fp.tell(), len(ibb.data))
file_props.seq += 1
file_props.started = True
file_props.fp.write(ibb.data)
current_time = time.time()
file_props.elapsed_time += current_time - file_props.last_time
file_props.last_time = current_time
file_props.received_len += len(ibb.data)
app.socks5queue.progress_transfer_cb(self._account, file_props)
if file_props.received_len >= file_props.size:
file_props.completed = True
def send_open(self, to, sid, fp):
self._log.info('Send open to %s, sid: %s', to, sid)
file_props = FilesProp.getFilePropBySid(sid)
file_props.direction = '>'
file_props.block_size = 4096
file_props.fp = fp
file_props.seq = -1
file_props.error = 0
file_props.paused = False
file_props.received_len = 0
file_props.last_time = time.time()
file_props.connected = True
file_props.completed = False
file_props.disconnect_cb = None
file_props.continue_cb = None
self._nbxmpp('IBB').send_open(to,
file_props.transport_sid,
4096,
callback=self._on_open_result,
user_data=file_props)
return file_props
def _on_open_result(self, result, file_props):
if is_error_result(result):
app.socks5queue.error_cb('Error', str(result))
self._log.warning('Error: %s', result)
return
self.send_data(file_props)
def send_close(self, file_props):
file_props.connected = False
file_props.fp.close()
file_props.stopped = True
to = file_props.receiver
if file_props.direction == '<':
to = file_props.sender
self._log.info('Send close to %s, sid: %s',
to, file_props.transport_sid)
self._nbxmpp('IBB').send_close(to, file_props.transport_sid,
callback=self._on_close_result)
if file_props.completed:
app.socks5queue.complete_transfer_cb(self._account, file_props)
else:
if file_props.type_ == 's':
peerjid = file_props.receiver
else:
peerjid = file_props.sender
session = self._con.get_jingle_session(
peerjid, file_props.sid, 'file')
# According to the xep, the initiator also cancels
# the jingle session if there are no more files to send using IBB
if session.weinitiate:
session.cancel_session()
def _on_close_result(self, result):
if is_error_result(result):
app.socks5queue.error_cb('Error', str(result))
self._log.warning('Error: %s', result)
return
def send_data(self, file_props):
if file_props.completed:
self.send_close(file_props)
return
chunk = file_props.fp.read(file_props.block_size)
if chunk:
file_props.seq += 1
file_props.started = True
if file_props.seq == 65536:
file_props.seq = 0
self._log.info('Send data to %s, sid: %s',
file_props.receiver, file_props.transport_sid)
self._nbxmpp('IBB').send_data(file_props.receiver,
file_props.transport_sid,
file_props.seq,
chunk,
callback=self._on_data_result,
user_data=file_props)
current_time = time.time()
file_props.elapsed_time += current_time - file_props.last_time
file_props.last_time = current_time
file_props.received_len += len(chunk)
if file_props.size == file_props.received_len:
file_props.completed = True
app.socks5queue.progress_transfer_cb(self._account, file_props)
def _on_data_result(self, result, file_props):
if is_error_result(result):
app.socks5queue.error_cb('Error', str(result))
self._log.warning('Error: %s', result)
return
self.send_data(file_props)
def get_instance(*args, **kwargs):
return IBB(*args, **kwargs), 'IBB'

View File

@ -25,8 +25,6 @@
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
import socket
import base64
import time
import logging
import nbxmpp
@ -633,259 +631,6 @@ class ConnectionSocks5Bytestream(ConnectionBytestream):
raise nbxmpp.NodeProcessed
class ConnectionIBBytestream(ConnectionBytestream):
def __init__(self):
ConnectionBytestream.__init__(self)
self._streams = {}
def IBBIqHandler(self, conn, stanza):
"""
Handles streams state change. Used internally.
"""
typ = stanza.getType()
log.debug('IBBIqHandler called typ->%s', typ)
if typ == 'set' and stanza.getTag('open'):
self.StreamOpenHandler(conn, stanza)
elif typ == 'set' and stanza.getTag('close'):
self.StreamCloseHandler(conn, stanza)
elif typ == 'set' and stanza.getTag('data'):
sid = stanza.getTagAttr('data', 'sid')
file_props = FilesProp.getFilePropByTransportSid(self.name, sid)
if not file_props:
conn.send(nbxmpp.Error(stanza, nbxmpp.ERR_ITEM_NOT_FOUND))
elif file_props.connected and self.IBBMessageHandler(conn,
stanza):
reply = stanza.buildReply('result')
reply.delChild('data')
conn.send(reply)
elif not file_props.connected:
log.debug('Received IQ for closed filetransfer, IQ dropped')
elif typ == 'error':
app.socks5queue.error_cb()
else:
conn.send(nbxmpp.Error(stanza, nbxmpp.ERR_BAD_REQUEST))
raise nbxmpp.NodeProcessed
def StreamOpenHandler(self, conn, stanza):
"""
Handles opening of new incoming stream. Used internally.
"""
err = None
sid = stanza.getTagAttr('open', 'sid')
blocksize = stanza.getTagAttr('open', 'block-size')
log.debug('StreamOpenHandler called sid->%s blocksize->%s',
sid, blocksize)
file_props = FilesProp.getFilePropByTransportSid(self.name, sid)
try:
blocksize = int(blocksize)
except Exception:
err = nbxmpp.ERR_BAD_REQUEST
if not sid or not blocksize:
err = nbxmpp.ERR_BAD_REQUEST
elif not file_props:
err = nbxmpp.ERR_UNEXPECTED_REQUEST
if err:
rep = nbxmpp.Error(stanza, err)
else:
log.debug("Opening stream: id %s, block-size %s",
sid, blocksize)
rep = nbxmpp.Protocol('iq', stanza.getFrom(), 'result',
stanza.getTo(), {'id': stanza.getID()})
file_props.block_size = blocksize
file_props.direction = '<'
file_props.seq = 0
file_props.received_len = 0
file_props.last_time = time.time()
file_props.error = 0
file_props.paused = False
file_props.connected = True
file_props.completed = False
file_props.disconnect_cb = None
file_props.continue_cb = None
file_props.syn_id = stanza.getID()
file_props.fp = open(file_props.file_name, 'wb')
conn.send(rep)
def CloseIBBStream(self, file_props):
file_props.connected = False
file_props.fp.close()
file_props.stopped = True
to = file_props.receiver
if file_props.direction == '<':
to = file_props.sender
self.connection.send(
nbxmpp.Protocol('iq', to, 'set',
payload=[nbxmpp.Node(nbxmpp.NS_IBB + ' close',
{'sid':file_props.transport_sid})]))
if file_props.completed:
app.socks5queue.complete_transfer_cb(self.name, file_props)
elif file_props.session_type == 'jingle':
peerjid = \
file_props.receiver if file_props.type_ == 's' else file_props.sender
session = self.get_jingle_session(peerjid, file_props.sid, 'file')
# According to the xep, the initiator also cancels the jingle session
# if there are no more files to send using IBB
if session.weinitiate:
session.cancel_session()
def OpenStream(self, sid, to, fp, blocksize=4096):
"""
Start new stream. You should provide stream id 'sid', the endpoint jid
'to', the file object containing info for send 'fp'. Also the desired
blocksize can be specified.
Take into account that recommended stanza size is 4k and IBB uses
base64 encoding that increases size of data by 1/3.
"""
file_props = FilesProp.getFilePropBySid(sid)
file_props.direction = '>'
file_props.block_size = blocksize
file_props.fp = fp
file_props.seq = 0
file_props.error = 0
file_props.paused = False
file_props.received_len = 0
file_props.last_time = time.time()
file_props.connected = True
file_props.completed = False
file_props.disconnect_cb = None
file_props.continue_cb = None
syn = nbxmpp.Protocol('iq', to, 'set', payload=[nbxmpp.Node(
nbxmpp.NS_IBB + ' open', {'sid': file_props.transport_sid,
'block-size': blocksize, 'stanza': 'iq'})])
self.connection.send(syn)
file_props.syn_id = syn.getID()
return file_props
def SendHandler(self, file_props):
"""
Send next portion of data if it is time to do it. Used internally.
"""
log.debug('SendHandler called')
if file_props.completed:
self.CloseIBBStream(file_props)
if file_props.paused:
return
if not file_props.connected:
#TODO: Reply with out of order error
return
chunk = file_props.fp.read(file_props.block_size)
if chunk:
datanode = nbxmpp.Node(nbxmpp.NS_IBB + ' data', {
'sid': file_props.transport_sid,
'seq': file_props.seq},
base64.b64encode(chunk).decode('ascii'))
file_props.seq += 1
file_props.started = True
if file_props.seq == 65536:
file_props.seq = 0
file_props.syn_id = self.connection.send(
nbxmpp.Protocol(name='iq', to=file_props.receiver,
typ='set', payload=[datanode]))
current_time = time.time()
file_props.elapsed_time += current_time - file_props.last_time
file_props.last_time = current_time
file_props.received_len += len(chunk)
if file_props.size == file_props.received_len:
file_props.completed = True
app.socks5queue.progress_transfer_cb(self.name,
file_props)
else:
log.debug('Nothing to read, but file not completed')
def IBBMessageHandler(self, conn, stanza):
"""
Receive next portion of incoming datastream and store it write
it to temporary file. Used internally.
"""
sid = stanza.getTagAttr('data', 'sid')
seq = stanza.getTagAttr('data', 'seq')
data = stanza.getTagData('data')
log.debug('ReceiveHandler called sid->%s seq->%s', sid, seq)
try:
seq = int(seq)
data = base64.b64decode(data.encode('utf-8'))
except Exception:
seq = ''
data = b''
err = None
file_props = FilesProp.getFilePropByTransportSid(self.name, sid)
if file_props is None:
err = nbxmpp.ERR_ITEM_NOT_FOUND
else:
if not data:
err = nbxmpp.ERR_BAD_REQUEST
elif seq != file_props.seq:
err = nbxmpp.ERR_UNEXPECTED_REQUEST
else:
log.debug('Successfully received sid->%s %s+%s bytes',
sid, file_props.fp.tell(), len(data))
file_props.seq += 1
file_props.started = True
file_props.fp.write(data)
current_time = time.time()
file_props.elapsed_time += current_time - file_props.last_time
file_props.last_time = current_time
file_props.received_len += len(data)
app.socks5queue.progress_transfer_cb(self.name, file_props)
if file_props.received_len >= file_props.size:
file_props.completed = True
if err:
log.debug('Error on receive: %s', err)
conn.send(nbxmpp.Error(nbxmpp.Iq(to=stanza.getFrom(),
frm=stanza.getTo(),
payload=[nbxmpp.Node(nbxmpp.NS_IBB + ' close')]), err, reply=0))
else:
return True
def StreamCloseHandler(self, conn, stanza):
"""
Handle stream closure due to all data transmitted.
Raise xmpppy event specifying successful data receive.
"""
sid = stanza.getTagAttr('close', 'sid')
log.debug('StreamCloseHandler called sid->%s', sid)
# look in sending files
file_props = FilesProp.getFilePropByTransportSid(self.name, sid)
if file_props:
reply = stanza.buildReply('result')
reply.delChild('close')
conn.send(reply)
# look in receiving files
file_props.fp.close()
file_props.completed = file_props.received_len >= file_props.size
if not file_props.completed:
file_props.error = -1
app.socks5queue.complete_transfer_cb(self.name, file_props)
else:
conn.send(nbxmpp.Error(stanza, nbxmpp.ERR_ITEM_NOT_FOUND))
def IBBAllIqHandler(self, conn, stanza):
"""
Handle remote side reply about if it agree or not to receive our
datastream.
Used internally. Raises xmpppy event specifying if the data transfer
is agreed upon.
"""
syn_id = stanza.getID()
log.debug('IBBAllIqHandler called syn_id->%s', syn_id)
for file_props in FilesProp.getAllFileProp():
if not file_props.direction or not file_props.connected:
# It's socks5 bytestream
# Or we closed the IBB stream
continue
if file_props.syn_id == syn_id:
if stanza.getType() == 'error':
if file_props.direction[0] == '<':
conn.Event('IBB', 'ERROR ON RECEIVE', file_props)
else:
conn.Event('IBB', 'ERROR ON SEND', file_props)
elif stanza.getType() == 'result':
self.SendHandler(file_props)
break
class ConnectionSocks5BytestreamZeroconf(ConnectionSocks5Bytestream):
def _ft_get_from(self, iq_obj):

View File

@ -882,7 +882,7 @@ class FileTransfersWindow:
con = app.connections[account]
# Check if we are in a IBB transfer
if file_props.direction:
con.CloseIBBStream(file_props)
con.get_module('IBB').send_close(file_props)
con.disconnect_transfer(file_props)
self.set_status(file_props, 'stop')