Deduplicate GC and MAM Messages based on stanza-id
- Add methods to determine the unique/stable id - Write the id to the stanza_id DB field - Add method to deduplicate based on the unique/stable id
This commit is contained in:
parent
46926e71d1
commit
7f1a839e77
|
@ -944,7 +944,8 @@ class ConnectionHandlersBase:
|
|||
def _on_message_received(self, obj):
|
||||
if isinstance(obj, MessageReceivedEvent):
|
||||
app.nec.push_incoming_event(
|
||||
DecryptedMessageReceivedEvent(None, conn=self, msg_obj=obj))
|
||||
DecryptedMessageReceivedEvent(
|
||||
None, conn=self, msg_obj=obj, stanza_id=obj.unique_id))
|
||||
else:
|
||||
app.nec.push_incoming_event(
|
||||
MamDecryptedMessageReceivedEvent(None, **vars(obj)))
|
||||
|
@ -1010,7 +1011,7 @@ class ConnectionHandlersBase:
|
|||
return True
|
||||
elif obj.mtype == 'groupchat':
|
||||
app.nec.push_incoming_event(GcMessageReceivedEvent(None,
|
||||
conn=self, msg_obj=obj))
|
||||
conn=self, msg_obj=obj, stanza_id=obj.unique_id))
|
||||
return True
|
||||
|
||||
def _nec_gc_message_received(self, obj):
|
||||
|
@ -1026,7 +1027,8 @@ class ConnectionHandlersBase:
|
|||
KindConstant.GC_MSG,
|
||||
message=obj.msgtxt,
|
||||
contact_name=obj.nick,
|
||||
additional_data=obj.additional_data)
|
||||
additional_data=obj.additional_data,
|
||||
stanza_id=obj.unique_id)
|
||||
app.logger.set_room_last_message_time(obj.room_jid, obj.timestamp)
|
||||
|
||||
# process and dispatch an error message
|
||||
|
|
|
@ -139,6 +139,28 @@ class HelperEvent:
|
|||
if oob_desc is not None:
|
||||
self.additional_data['gajim']['oob_desc'] = oob_desc
|
||||
|
||||
def get_stanza_id(self, stanza, query=False):
|
||||
if query:
|
||||
# On a MAM query the stanza-id is maybe not set, so
|
||||
# get the id of the stanza
|
||||
return stanza.getAttr('id')
|
||||
stanza_id, by = stanza.getStanzaIDAttrs()
|
||||
if by is None:
|
||||
# We can not verify who set this stanza-id, ignore it.
|
||||
return
|
||||
elif not self.conn.get_own_jid().bareMatch(by):
|
||||
# by attribute does not match the server, ignore it.
|
||||
return
|
||||
return stanza_id
|
||||
|
||||
@staticmethod
|
||||
def get_forwarded_message(stanza):
|
||||
forwarded = stanza.getTag('forwarded',
|
||||
namespace=nbxmpp.NS_FORWARD,
|
||||
protocol=True)
|
||||
if forwarded is not None:
|
||||
return forwarded.getTag('message', protocol=True)
|
||||
|
||||
class HttpAuthReceivedEvent(nec.NetworkIncomingEvent):
|
||||
name = 'http-auth-received'
|
||||
base_network_events = []
|
||||
|
@ -1026,6 +1048,7 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
|
|||
:stanza: Complete stanza Node
|
||||
:forwarded: Forwarded Node
|
||||
:result: Result Node
|
||||
:unique_id: The unique stable id
|
||||
'''
|
||||
self._set_base_event_vars_as_attributes(base_event)
|
||||
self.additional_data = {}
|
||||
|
@ -1046,6 +1069,13 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
|
|||
log.info('Received groupchat message from user archive')
|
||||
return False
|
||||
|
||||
# use stanza-id as unique-id
|
||||
self.unique_id, origin_id = self.get_unique_id()
|
||||
|
||||
# Check for duplicates
|
||||
if app.logger.find_stanza_id(self.unique_id, origin_id):
|
||||
return
|
||||
|
||||
self.msgtxt = self.msg_.getTagData('body')
|
||||
self.query_id = self.result.getAttr('queryid')
|
||||
|
||||
|
@ -1057,24 +1087,12 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
|
|||
to = own_jid
|
||||
|
||||
if frm.bareMatch(own_jid):
|
||||
self.stanza_id = self.msg_.getOriginID()
|
||||
if self.stanza_id is None:
|
||||
self.stanza_id = self.msg_.getID()
|
||||
|
||||
self.with_ = to
|
||||
self.kind = KindConstant.CHAT_MSG_SENT
|
||||
else:
|
||||
if self.result.getNamespace() == nbxmpp.NS_MAM_2:
|
||||
self.stanza_id = self.result.getID()
|
||||
else:
|
||||
self.stanza_id = self.msg_.getID()
|
||||
|
||||
self.with_ = frm
|
||||
self.kind = KindConstant.CHAT_MSG_RECV
|
||||
|
||||
if self.stanza_id is None:
|
||||
log.debug('Could not retrieve stanza-id')
|
||||
|
||||
delay = self.forwarded.getTagAttr(
|
||||
'delay', 'stamp', namespace=nbxmpp.NS_DELAY2)
|
||||
if delay is None:
|
||||
|
@ -1097,9 +1115,31 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
|
|||
log.warning('Received MAM message with '
|
||||
'invalid user timestamp: %s', user_delay)
|
||||
|
||||
log.debug('Received mam-message: stanza id: %s', self.stanza_id)
|
||||
log.debug('Received mam-message: unique id: %s', self.unique_id)
|
||||
return True
|
||||
|
||||
def get_unique_id(self):
|
||||
if self.conn.get_own_jid().bareMatch(self.msg_.getFrom()):
|
||||
# On our own Messages we have to check for both
|
||||
# stanza-id and origin-id, because other resources
|
||||
# maybe not support origin-id
|
||||
stanza_id = None
|
||||
if self.result.getNamespace() == nbxmpp.NS_MAM_2:
|
||||
# Only mam:2 ensures valid stanza-id
|
||||
stanza_id = self.get_stanza_id(self.result, query=True)
|
||||
|
||||
# try always to get origin-id because its a message
|
||||
# we sent.
|
||||
origin_id = self.msg_.getOriginID()
|
||||
|
||||
return stanza_id, origin_id
|
||||
|
||||
# A message we received
|
||||
elif self.result.getNamespace() == nbxmpp.NS_MAM_2:
|
||||
# Only mam:2 ensures valid stanza-id
|
||||
return self.get_stanza_id(self.result, query=True), None
|
||||
return None, None
|
||||
|
||||
class MamDecryptedMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
|
||||
name = 'mam-decrypted-message-received'
|
||||
base_network_events = []
|
||||
|
@ -1177,6 +1217,14 @@ class MessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
|
|||
self.stanza.getFrom())
|
||||
return
|
||||
|
||||
# Check for duplicates
|
||||
self.unique_id = self.get_unique_id()
|
||||
# Check groupchat messages for duplicates,
|
||||
# We do this because of MUC History messages
|
||||
if self.stanza.getType() == 'groupchat':
|
||||
if app.logger.find_stanza_id(self.unique_id):
|
||||
return
|
||||
|
||||
address_tag = self.stanza.getTag('addresses',
|
||||
namespace=nbxmpp.NS_ADDRESS)
|
||||
# Be sure it comes from one of our resource, else ignore address element
|
||||
|
@ -1248,7 +1296,8 @@ class MessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
|
|||
conn=self.conn,
|
||||
stanza=self.stanza,
|
||||
forwarded=forwarded,
|
||||
result=result))
|
||||
result=result,
|
||||
stanza_id=self.unique_id))
|
||||
return
|
||||
|
||||
# Mediated invitation?
|
||||
|
@ -1320,6 +1369,38 @@ class MessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
|
|||
|
||||
return True
|
||||
|
||||
def get_unique_id(self):
|
||||
if self.stanza.getType() == 'groupchat':
|
||||
# TODO: Disco the MUC check if 'urn:xmpp:mam:2' is announced
|
||||
return self.get_stanza_id(self.stanza)
|
||||
|
||||
elif self.stanza.getType() != 'chat':
|
||||
return
|
||||
|
||||
# Messages we receive live
|
||||
if self.conn.archiving_namespace != nbxmpp.NS_MAM_2:
|
||||
# Only mam:2 ensures valid stanza-id
|
||||
return
|
||||
|
||||
# Sent Carbon
|
||||
sent_carbon = self.stanza.getTag('sent',
|
||||
namespace=nbxmpp.NS_CARBONS,
|
||||
protocol=True)
|
||||
if sent_carbon is not None:
|
||||
message = self.get_forwarded_message(sent_carbon)
|
||||
return self.get_stanza_id(message)
|
||||
|
||||
# Received Carbon
|
||||
received_carbon = self.stanza.getTag('received',
|
||||
namespace=nbxmpp.NS_CARBONS,
|
||||
protocol=True)
|
||||
if received_carbon is not None:
|
||||
message = self.get_forwarded_message(received_carbon)
|
||||
return self.get_stanza_id(message)
|
||||
|
||||
# Normal Message
|
||||
return self.get_stanza_id(self.stanza)
|
||||
|
||||
class ZeroconfMessageReceivedEvent(MessageReceivedEvent):
|
||||
name = 'message-received'
|
||||
base_network_events = []
|
||||
|
@ -1416,6 +1497,7 @@ class DecryptedMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
|
|||
self.stanza = self.msg_obj.stanza
|
||||
self.additional_data = self.msg_obj.additional_data
|
||||
self.id_ = self.msg_obj.id_
|
||||
self.unique_id = self.msg_obj.unique_id
|
||||
self.jid = self.msg_obj.jid
|
||||
self.fjid = self.msg_obj.fjid
|
||||
self.resource = self.msg_obj.resource
|
||||
|
@ -1497,6 +1579,7 @@ class GcMessageReceivedEvent(nec.NetworkIncomingEvent):
|
|||
else:
|
||||
self.additional_data = self.msg_obj.additional_data
|
||||
self.id_ = self.msg_obj.stanza.getID()
|
||||
self.unique_id = self.msg_obj.unique_id
|
||||
self.fjid = self.msg_obj.fjid
|
||||
self.msgtxt = self.msg_obj.msgtxt
|
||||
self.jid = self.msg_obj.jid
|
||||
|
|
|
@ -1084,6 +1084,33 @@ class Logger:
|
|||
return True
|
||||
return False
|
||||
|
||||
def find_stanza_id(self, stanza_id, origin_id=None):
|
||||
"""
|
||||
Checks if a stanza-id is already in the `logs` table
|
||||
|
||||
:param stanza_id: The stanza-id
|
||||
|
||||
return True if the stanza-id was found
|
||||
"""
|
||||
ids = []
|
||||
if stanza_id is not None:
|
||||
ids.append(stanza_id)
|
||||
if origin_id is not None:
|
||||
ids.append(origin_id)
|
||||
|
||||
sql = '''
|
||||
SELECT stanza_id FROM logs
|
||||
WHERE stanza_id IN ({values}) LIMIT 1
|
||||
'''.format(values=', '.join('?' * len(ids)))
|
||||
|
||||
result = self.con.execute(sql, tuple(ids)).fetchone()
|
||||
|
||||
if result is not None:
|
||||
log.info('Found duplicated message, stanza-id: %s, origin-id: %s',
|
||||
stanza_id, origin_id)
|
||||
return True
|
||||
return False
|
||||
|
||||
def insert_jid(self, jid, kind=None, type_=JIDConstant.NORMAL_TYPE):
|
||||
"""
|
||||
Insert a new jid into the `jids` table.
|
||||
|
@ -1129,6 +1156,9 @@ class Logger:
|
|||
|
||||
lastrowid = self.con.execute(sql, (jid_id, time_, kind) + tuple(kwargs.values())).lastrowid
|
||||
|
||||
log.info('Insert into DB: jid: %s, time: %s, kind: %s, stanza_id: %s',
|
||||
jid, time_, kind, kwargs.get('stanza_id', None))
|
||||
|
||||
if unread and kind == KindConstant.CHAT_MSG_RECV:
|
||||
sql = '''INSERT INTO unread_messages (message_id, jid_id)
|
||||
VALUES (?, (SELECT jid_id FROM jids WHERE jid = ?))'''
|
||||
|
|
|
@ -127,14 +127,18 @@ class ConnectionArchive313:
|
|||
def _nec_mam_decrypted_message_received(self, obj):
|
||||
if obj.conn.name != self.name:
|
||||
return
|
||||
# if self.archiving_namespace != nbxmpp.NS_MAM_2:
|
||||
# Fallback duplicate search without stanza-id
|
||||
duplicate = app.logger.search_for_duplicate(
|
||||
obj.with_, obj.timestamp, obj.msgtxt)
|
||||
if not duplicate:
|
||||
if duplicate:
|
||||
return
|
||||
app.logger.insert_into_logs(
|
||||
obj.with_, obj.timestamp, obj.kind,
|
||||
unread=False,
|
||||
message=obj.msgtxt,
|
||||
additional_data=obj.additional_data)
|
||||
additional_data=obj.additional_data,
|
||||
stanza_id=obj.unique_id)
|
||||
|
||||
def get_query_id(self):
|
||||
self.mam_query_id = self.connection.getAnID()
|
||||
|
|
|
@ -120,7 +120,8 @@ class ChatControlSession(stanza_session.EncryptedStanzaSession):
|
|||
jid, obj.timestamp, log_type,
|
||||
message=msg_to_log,
|
||||
subject=obj.subject,
|
||||
additional_data=obj.additional_data)
|
||||
additional_data=obj.additional_data,
|
||||
stanza_id=obj.unique_id)
|
||||
|
||||
# Handle chat states
|
||||
if contact and (not obj.forwarded or not obj.sent):
|
||||
|
|
Loading…
Reference in New Issue