Merge branch 'deduplication' into 'master'

Deduplicate based on stanza-id

See merge request gajim/gajim!148
This commit is contained in:
Philipp Hörist 2017-11-07 21:03:01 +01:00
commit d381179d1f
5 changed files with 148 additions and 48 deletions

View File

@ -725,8 +725,6 @@ class ConnectionHandlersBase:
# IDs of sent messages (https://trac.gajim.org/ticket/8222)
self.sent_message_ids = []
self.received_message_hashes = []
# We decrypt GPG messages one after the other. Keep queue in mem
self.gpg_messages_to_decrypt = []
@ -946,7 +944,8 @@ class ConnectionHandlersBase:
def _on_message_received(self, obj):
if isinstance(obj, MessageReceivedEvent):
app.nec.push_incoming_event(
DecryptedMessageReceivedEvent(None, conn=self, msg_obj=obj))
DecryptedMessageReceivedEvent(
None, conn=self, msg_obj=obj, stanza_id=obj.unique_id))
else:
app.nec.push_incoming_event(
MamDecryptedMessageReceivedEvent(None, **vars(obj)))
@ -1012,7 +1011,7 @@ class ConnectionHandlersBase:
return True
elif obj.mtype == 'groupchat':
app.nec.push_incoming_event(GcMessageReceivedEvent(None,
conn=self, msg_obj=obj))
conn=self, msg_obj=obj, stanza_id=obj.unique_id))
return True
def _nec_gc_message_received(self, obj):
@ -1028,7 +1027,8 @@ class ConnectionHandlersBase:
KindConstant.GC_MSG,
message=obj.msgtxt,
contact_name=obj.nick,
additional_data=obj.additional_data)
additional_data=obj.additional_data,
stanza_id=obj.unique_id)
app.logger.set_room_last_message_time(obj.room_jid, obj.timestamp)
# process and dispatch an error message
@ -2048,8 +2048,6 @@ ConnectionHandlersBase, ConnectionJingle, ConnectionIBBytestream):
app.nec.push_incoming_event(SignedInEvent(None, conn=self))
self.send_awaiting_pep()
self.continue_connect_info = None
# hashes of already received messages
self.received_message_hashes = []
def _SearchCB(self, con, iq_obj):
log.debug('SearchCB')

View File

@ -139,6 +139,28 @@ class HelperEvent:
if oob_desc is not None:
self.additional_data['gajim']['oob_desc'] = oob_desc
def get_stanza_id(self, stanza, query=False):
if query:
# On a MAM query the stanza-id is maybe not set, so
# get the id of the stanza
return stanza.getAttr('id')
stanza_id, by = stanza.getStanzaIDAttrs()
if by is None:
# We can not verify who set this stanza-id, ignore it.
return
elif not self.conn.get_own_jid().bareMatch(by):
# by attribute does not match the server, ignore it.
return
return stanza_id
@staticmethod
def get_forwarded_message(stanza):
forwarded = stanza.getTag('forwarded',
namespace=nbxmpp.NS_FORWARD,
protocol=True)
if forwarded is not None:
return forwarded.getTag('message', protocol=True)
class HttpAuthReceivedEvent(nec.NetworkIncomingEvent):
name = 'http-auth-received'
base_network_events = []
@ -1026,6 +1048,7 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
:stanza: Complete stanza Node
:forwarded: Forwarded Node
:result: Result Node
:unique_id: The unique stable id
'''
self._set_base_event_vars_as_attributes(base_event)
self.additional_data = {}
@ -1046,6 +1069,13 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
log.info('Received groupchat message from user archive')
return False
# use stanza-id as unique-id
self.unique_id, origin_id = self.get_unique_id()
# Check for duplicates
if app.logger.find_stanza_id(self.unique_id, origin_id):
return
self.msgtxt = self.msg_.getTagData('body')
self.query_id = self.result.getAttr('queryid')
@ -1057,24 +1087,12 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
to = own_jid
if frm.bareMatch(own_jid):
self.stanza_id = self.msg_.getOriginID()
if self.stanza_id is None:
self.stanza_id = self.msg_.getID()
self.with_ = to
self.kind = KindConstant.CHAT_MSG_SENT
else:
if self.result.getNamespace() == nbxmpp.NS_MAM_2:
self.stanza_id = self.result.getID()
else:
self.stanza_id = self.msg_.getID()
self.with_ = frm
self.kind = KindConstant.CHAT_MSG_RECV
if self.stanza_id is None:
log.debug('Could not retrieve stanza-id')
delay = self.forwarded.getTagAttr(
'delay', 'stamp', namespace=nbxmpp.NS_DELAY2)
if delay is None:
@ -1097,9 +1115,31 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
log.warning('Received MAM message with '
'invalid user timestamp: %s', user_delay)
log.debug('Received mam-message: stanza id: %s', self.stanza_id)
log.debug('Received mam-message: unique id: %s', self.unique_id)
return True
def get_unique_id(self):
if self.conn.get_own_jid().bareMatch(self.msg_.getFrom()):
# On our own Messages we have to check for both
# stanza-id and origin-id, because other resources
# maybe not support origin-id
stanza_id = None
if self.result.getNamespace() == nbxmpp.NS_MAM_2:
# Only mam:2 ensures valid stanza-id
stanza_id = self.get_stanza_id(self.result, query=True)
# try always to get origin-id because its a message
# we sent.
origin_id = self.msg_.getOriginID()
return stanza_id, origin_id
# A message we received
elif self.result.getNamespace() == nbxmpp.NS_MAM_2:
# Only mam:2 ensures valid stanza-id
return self.get_stanza_id(self.result, query=True), None
return None, None
class MamDecryptedMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
name = 'mam-decrypted-message-received'
base_network_events = []
@ -1177,6 +1217,14 @@ class MessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
self.stanza.getFrom())
return
# Check for duplicates
self.unique_id = self.get_unique_id()
# Check groupchat messages for duplicates,
# We do this because of MUC History messages
if self.stanza.getType() == 'groupchat':
if app.logger.find_stanza_id(self.unique_id):
return
address_tag = self.stanza.getTag('addresses',
namespace=nbxmpp.NS_ADDRESS)
# Be sure it comes from one of our resource, else ignore address element
@ -1248,7 +1296,8 @@ class MessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
conn=self.conn,
stanza=self.stanza,
forwarded=forwarded,
result=result))
result=result,
stanza_id=self.unique_id))
return
# Mediated invitation?
@ -1320,6 +1369,38 @@ class MessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
return True
def get_unique_id(self):
if self.stanza.getType() == 'groupchat':
# TODO: Disco the MUC check if 'urn:xmpp:mam:2' is announced
return self.get_stanza_id(self.stanza)
elif self.stanza.getType() != 'chat':
return
# Messages we receive live
if self.conn.archiving_namespace != nbxmpp.NS_MAM_2:
# Only mam:2 ensures valid stanza-id
return
# Sent Carbon
sent_carbon = self.stanza.getTag('sent',
namespace=nbxmpp.NS_CARBONS,
protocol=True)
if sent_carbon is not None:
message = self.get_forwarded_message(sent_carbon)
return self.get_stanza_id(message)
# Received Carbon
received_carbon = self.stanza.getTag('received',
namespace=nbxmpp.NS_CARBONS,
protocol=True)
if received_carbon is not None:
message = self.get_forwarded_message(received_carbon)
return self.get_stanza_id(message)
# Normal Message
return self.get_stanza_id(self.stanza)
class ZeroconfMessageReceivedEvent(MessageReceivedEvent):
name = 'message-received'
base_network_events = []
@ -1416,6 +1497,7 @@ class DecryptedMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
self.stanza = self.msg_obj.stanza
self.additional_data = self.msg_obj.additional_data
self.id_ = self.msg_obj.id_
self.unique_id = self.msg_obj.unique_id
self.jid = self.msg_obj.jid
self.fjid = self.msg_obj.fjid
self.resource = self.msg_obj.resource
@ -1472,22 +1554,6 @@ class DecryptedMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
if replace:
self.correct_id = replace.getAttr('id')
# ignore message duplicates
if self.msgtxt and self.id_ and self.jid:
self.msghash = hashlib.sha256(("%s|%s|%s" % (
hashlib.sha256(self.msgtxt.encode('utf-8')).hexdigest(),
hashlib.sha256(self.id_.encode('utf-8')).hexdigest(),
hashlib.sha256(self.jid.encode('utf-8')).hexdigest())).encode(
'utf-8')).digest()
if self.msghash in self.conn.received_message_hashes:
log.info("Ignoring duplicated message from '%s' with id '%s'" % (self.jid, self.id_))
return False
else:
log.debug("subhashes: msgtxt, id_, jid = ('%s', '%s', '%s')" % (hashlib.sha256(self.msgtxt.encode('utf-8')).hexdigest(), hashlib.sha256(self.id_.encode('utf-8')).hexdigest(), hashlib.sha256(self.jid.encode('utf-8')).hexdigest()))
self.conn.received_message_hashes.append(self.msghash)
# only record the last 20000 hashes (should be about 1MB [32 bytes per hash]
# and about 24 hours if you receive a message every 5 seconds)
self.conn.received_message_hashes = self.conn.received_message_hashes[-20000:]
return True
class ChatstateReceivedEvent(nec.NetworkIncomingEvent):
@ -1513,6 +1579,7 @@ class GcMessageReceivedEvent(nec.NetworkIncomingEvent):
else:
self.additional_data = self.msg_obj.additional_data
self.id_ = self.msg_obj.stanza.getID()
self.unique_id = self.msg_obj.unique_id
self.fjid = self.msg_obj.fjid
self.msgtxt = self.msg_obj.msgtxt
self.jid = self.msg_obj.jid

View File

@ -1064,9 +1064,9 @@ class Logger:
:param msg: The message text
"""
# Add 5 minutes around the timestamp
start_time = timestamp - 300
end_time = timestamp + 300
# Add 10 seconds around the timestamp
start_time = timestamp - 10
end_time = timestamp + 10
log.debug('start: %s, end: %s, jid: %s, message: %s',
start_time, end_time, jid, msg)
@ -1084,6 +1084,33 @@ class Logger:
return True
return False
def find_stanza_id(self, stanza_id, origin_id=None):
"""
Checks if a stanza-id is already in the `logs` table
:param stanza_id: The stanza-id
return True if the stanza-id was found
"""
ids = []
if stanza_id is not None:
ids.append(stanza_id)
if origin_id is not None:
ids.append(origin_id)
sql = '''
SELECT stanza_id FROM logs
WHERE stanza_id IN ({values}) LIMIT 1
'''.format(values=', '.join('?' * len(ids)))
result = self.con.execute(sql, tuple(ids)).fetchone()
if result is not None:
log.info('Found duplicated message, stanza-id: %s, origin-id: %s',
stanza_id, origin_id)
return True
return False
def insert_jid(self, jid, kind=None, type_=JIDConstant.NORMAL_TYPE):
"""
Insert a new jid into the `jids` table.
@ -1129,6 +1156,9 @@ class Logger:
lastrowid = self.con.execute(sql, (jid_id, time_, kind) + tuple(kwargs.values())).lastrowid
log.info('Insert into DB: jid: %s, time: %s, kind: %s, stanza_id: %s',
jid, time_, kind, kwargs.get('stanza_id', None))
if unread and kind == KindConstant.CHAT_MSG_RECV:
sql = '''INSERT INTO unread_messages (message_id, jid_id)
VALUES (?, (SELECT jid_id FROM jids WHERE jid = ?))'''

View File

@ -127,14 +127,18 @@ class ConnectionArchive313:
def _nec_mam_decrypted_message_received(self, obj):
if obj.conn.name != self.name:
return
# if self.archiving_namespace != nbxmpp.NS_MAM_2:
# Fallback duplicate search without stanza-id
duplicate = app.logger.search_for_duplicate(
obj.with_, obj.timestamp, obj.msgtxt)
if not duplicate:
if duplicate:
return
app.logger.insert_into_logs(
obj.with_, obj.timestamp, obj.kind,
unread=False,
message=obj.msgtxt,
additional_data=obj.additional_data)
additional_data=obj.additional_data,
stanza_id=obj.unique_id)
def get_query_id(self):
self.mam_query_id = self.connection.getAnID()

View File

@ -120,7 +120,8 @@ class ChatControlSession(stanza_session.EncryptedStanzaSession):
jid, obj.timestamp, log_type,
message=msg_to_log,
subject=obj.subject,
additional_data=obj.additional_data)
additional_data=obj.additional_data,
stanza_id=obj.unique_id)
# Handle chat states
if contact and (not obj.forwarded or not obj.sent):