Refactor IBB Handlers

- Move handling of Data IQs into IBBIqHandler
- Call SendHandler with file_props
- Save last sent id in file_props.syn_id
- Remove some useless checks, now that we call SendHandler with file_props
- Send item-not-found error on invalid session id
This commit is contained in:
Philipp Hörist 2017-03-30 03:33:51 +02:00
parent 385d6c3081
commit 7cb7bcaae6
1 changed files with 49 additions and 64 deletions

View File

@ -759,7 +759,6 @@ class ConnectionIBBytestream(ConnectionBytestream):
def __init__(self):
ConnectionBytestream.__init__(self)
self._streams = {}
self.last_sent_ibb_id = None
def IBBIqHandler(self, conn, stanza):
"""
@ -767,12 +766,22 @@ class ConnectionIBBytestream(ConnectionBytestream):
"""
typ = stanza.getType()
log.debug('IBBIqHandler called typ->%s' % typ)
if typ == 'set' and stanza.getTag('open', namespace=nbxmpp.NS_IBB):
if typ == 'set' and stanza.getTag('open'):
self.StreamOpenHandler(conn, stanza)
elif typ == 'set' and stanza.getTag('close', namespace=nbxmpp.NS_IBB):
elif typ == 'set' and stanza.getTag('close'):
self.StreamCloseHandler(conn, stanza)
elif typ == 'result':
self.SendHandler()
elif typ == 'set' and stanza.getTag('data'):
sid = stanza.getTagAttr('data', 'sid')
file_props = FilesProp.getFilePropByTransportSid(self.name, sid)
if not file_props:
conn.send(nbxmpp.Error(stanza, nbxmpp.ERR_ITEM_NOT_FOUND))
elif file_props.connected and self.IBBMessageHandler(conn,
stanza):
reply = stanza.buildReply('result')
reply.delChild('data')
conn.send(reply)
elif not file_props.connected:
log.debug('Received IQ for closed filetransfer, IQ dropped')
elif typ == 'error':
gajim.socks5queue.error_cb()
else:
@ -849,7 +858,7 @@ class ConnectionIBBytestream(ConnectionBytestream):
base64 encoding that increases size of data by 1/3.
"""
file_props = FilesProp.getFilePropBySid(sid)
file_props.direction = '|>'
file_props.direction = '>'
file_props.block_size = blocksize
file_props.fp = fp
file_props.seq = 0
@ -868,50 +877,41 @@ class ConnectionIBBytestream(ConnectionBytestream):
file_props.syn_id = syn.getID()
return file_props
def SendHandler(self):
def SendHandler(self, file_props):
"""
Send next portion of data if it is time to do it. Used internally.
"""
log.debug('SendHandler called')
for file_props in FilesProp.getAllFileProp():
if not file_props.direction:
# it's socks5 bytestream
continue
if file_props.completed:
self.CloseIBBStream(file_props)
sid = file_props.sid
if file_props.direction[:2] == '|>':
# We waitthat other part accept stream
continue
if file_props.direction[0] == '>':
if file_props.paused:
continue
if not file_props.connected:
#TODO: Reply with out of order error
continue
chunk = file_props.fp.read(file_props.block_size)
if chunk:
datanode = nbxmpp.Node(nbxmpp.NS_IBB + ' data', {
'sid': file_props.transport_sid,
'seq': file_props.seq}, base64.b64encode(chunk.encode(
'utf-8')).decode('utf-8'))
file_props.seq += 1
file_props.started = True
if file_props.seq == 65536:
file_props.seq = 0
self.last_sent_ibb_id = self.connection.send(
nbxmpp.Protocol(name='iq', to=file_props.receiver,
typ='set', payload=[datanode]))
current_time = time.time()
file_props.elapsed_time += current_time - file_props.last_time
file_props.last_time = current_time
file_props.received_len += len(chunk)
if file_props.size == file_props.received_len:
file_props.completed = True
gajim.socks5queue.progress_transfer_cb(self.name,
file_props)
else:
log.debug('Nothing to read, but file not completed')
if file_props.completed:
self.CloseIBBStream(file_props)
if file_props.paused:
return
if not file_props.connected:
#TODO: Reply with out of order error
return
chunk = file_props.fp.read(file_props.block_size)
if chunk:
datanode = nbxmpp.Node(nbxmpp.NS_IBB + ' data', {
'sid': file_props.transport_sid,
'seq': file_props.seq},
base64.b64encode(chunk).decode('ascii'))
file_props.seq += 1
file_props.started = True
if file_props.seq == 65536:
file_props.seq = 0
file_props.syn_id = self.connection.send(
nbxmpp.Protocol(name='iq', to=file_props.receiver,
typ='set', payload=[datanode]))
current_time = time.time()
file_props.elapsed_time += current_time - file_props.last_time
file_props.last_time = current_time
file_props.received_len += len(chunk)
if file_props.size == file_props.received_len:
file_props.completed = True
gajim.socks5queue.progress_transfer_cb(self.name,
file_props)
else:
log.debug('Nothing to read, but file not completed')
def IBBMessageHandler(self, conn, stanza):
"""
@ -980,6 +980,7 @@ class ConnectionIBBytestream(ConnectionBytestream):
else:
conn.send(nbxmpp.Error(stanza, nbxmpp.ERR_ITEM_NOT_FOUND))
def IBBAllIqHandler(self, conn, stanza):
"""
Handle remote side reply about if it agree or not to receive our
@ -1001,25 +1002,9 @@ class ConnectionIBBytestream(ConnectionBytestream):
else:
conn.Event('IBB', 'ERROR ON SEND', file_props)
elif stanza.getType() == 'result':
if file_props.direction[0] == '|':
file_props.direction = file_props.direction[1:]
self.SendHandler()
else:
conn.send(nbxmpp.Error(stanza,
nbxmpp.ERR_UNEXPECTED_REQUEST))
self.SendHandler(file_props)
break
else:
if stanza.getTag('data'):
sid = stanza.getTagAttr('data', 'sid')
file_props = FilesProp.getFilePropByTransportSid(self.name, sid)
if file_props.connected and self.IBBMessageHandler(conn,
stanza):
reply = stanza.buildReply('result')
reply.delChild('data')
conn.send(reply)
raise nbxmpp.NodeProcessed
elif syn_id == self.last_sent_ibb_id:
self.SendHandler()
class ConnectionSocks5BytestreamZeroconf(ConnectionSocks5Bytestream):