From 46926e71d1720b4f37d77ba6fad324a900e15092 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20H=C3=B6rist?= Date: Sun, 22 Oct 2017 22:33:54 +0200 Subject: [PATCH 1/3] Remove old message duplicate code - Calculating so many hashes for each Message is quite expensive - It hides our own implementation bugs, like when we retrieve history from a MUC with wrong timestamps, or on rejoin. We never know about it because the Messages are dropped. - It should not be necessary anymore. The original problem was a bug in nbxmpp which triggered mass resending of old messages. --- gajim/common/connection_handlers.py | 4 ---- gajim/common/connection_handlers_events.py | 16 ---------------- 2 files changed, 20 deletions(-) diff --git a/gajim/common/connection_handlers.py b/gajim/common/connection_handlers.py index 175905511..7322aa7ec 100644 --- a/gajim/common/connection_handlers.py +++ b/gajim/common/connection_handlers.py @@ -725,8 +725,6 @@ class ConnectionHandlersBase: # IDs of sent messages (https://trac.gajim.org/ticket/8222) self.sent_message_ids = [] - self.received_message_hashes = [] - # We decrypt GPG messages one after the other. Keep queue in mem self.gpg_messages_to_decrypt = [] @@ -2048,8 +2046,6 @@ ConnectionHandlersBase, ConnectionJingle, ConnectionIBBytestream): app.nec.push_incoming_event(SignedInEvent(None, conn=self)) self.send_awaiting_pep() self.continue_connect_info = None - # hashes of already received messages - self.received_message_hashes = [] def _SearchCB(self, con, iq_obj): log.debug('SearchCB') diff --git a/gajim/common/connection_handlers_events.py b/gajim/common/connection_handlers_events.py index bed46c6cf..761a236dd 100644 --- a/gajim/common/connection_handlers_events.py +++ b/gajim/common/connection_handlers_events.py @@ -1472,22 +1472,6 @@ class DecryptedMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent): if replace: self.correct_id = replace.getAttr('id') - # ignore message duplicates - if self.msgtxt and self.id_ and self.jid: - self.msghash = hashlib.sha256(("%s|%s|%s" % ( - hashlib.sha256(self.msgtxt.encode('utf-8')).hexdigest(), - hashlib.sha256(self.id_.encode('utf-8')).hexdigest(), - hashlib.sha256(self.jid.encode('utf-8')).hexdigest())).encode( - 'utf-8')).digest() - if self.msghash in self.conn.received_message_hashes: - log.info("Ignoring duplicated message from '%s' with id '%s'" % (self.jid, self.id_)) - return False - else: - log.debug("subhashes: msgtxt, id_, jid = ('%s', '%s', '%s')" % (hashlib.sha256(self.msgtxt.encode('utf-8')).hexdigest(), hashlib.sha256(self.id_.encode('utf-8')).hexdigest(), hashlib.sha256(self.jid.encode('utf-8')).hexdigest())) - self.conn.received_message_hashes.append(self.msghash) - # only record the last 20000 hashes (should be about 1MB [32 bytes per hash] - # and about 24 hours if you receive a message every 5 seconds) - self.conn.received_message_hashes = self.conn.received_message_hashes[-20000:] return True class ChatstateReceivedEvent(nec.NetworkIncomingEvent): From 7f1a839e77a684faa66e5f404abf06040d16382e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20H=C3=B6rist?= Date: Sun, 22 Oct 2017 23:27:47 +0200 Subject: [PATCH 2/3] Deduplicate GC and MAM Messages based on stanza-id - Add methods to determine the unique/stable id - Write the id to the stanza_id DB field - Add method to deduplicate based on the unique/stable id --- gajim/common/connection_handlers.py | 10 +- gajim/common/connection_handlers_events.py | 111 ++++++++++++++++++--- gajim/common/logger.py | 30 ++++++ gajim/common/message_archiving.py | 16 +-- gajim/session.py | 3 +- 5 files changed, 145 insertions(+), 25 deletions(-) diff --git a/gajim/common/connection_handlers.py b/gajim/common/connection_handlers.py index 7322aa7ec..02e85b4e6 100644 --- a/gajim/common/connection_handlers.py +++ b/gajim/common/connection_handlers.py @@ -944,7 +944,8 @@ class ConnectionHandlersBase: def _on_message_received(self, obj): if isinstance(obj, MessageReceivedEvent): app.nec.push_incoming_event( - DecryptedMessageReceivedEvent(None, conn=self, msg_obj=obj)) + DecryptedMessageReceivedEvent( + None, conn=self, msg_obj=obj, stanza_id=obj.unique_id)) else: app.nec.push_incoming_event( MamDecryptedMessageReceivedEvent(None, **vars(obj))) @@ -1010,14 +1011,14 @@ class ConnectionHandlersBase: return True elif obj.mtype == 'groupchat': app.nec.push_incoming_event(GcMessageReceivedEvent(None, - conn=self, msg_obj=obj)) + conn=self, msg_obj=obj, stanza_id=obj.unique_id)) return True def _nec_gc_message_received(self, obj): if obj.conn.name != self.name: return if (app.config.should_log(obj.conn.name, obj.jid) and - obj.msgtxt and obj.nick): + obj.msgtxt and obj.nick): # if not obj.nick, it means message comes from room itself # usually it hold description and can be send at each connection # so don't store it in logs @@ -1026,7 +1027,8 @@ class ConnectionHandlersBase: KindConstant.GC_MSG, message=obj.msgtxt, contact_name=obj.nick, - additional_data=obj.additional_data) + additional_data=obj.additional_data, + stanza_id=obj.unique_id) app.logger.set_room_last_message_time(obj.room_jid, obj.timestamp) # process and dispatch an error message diff --git a/gajim/common/connection_handlers_events.py b/gajim/common/connection_handlers_events.py index 761a236dd..4f89c3965 100644 --- a/gajim/common/connection_handlers_events.py +++ b/gajim/common/connection_handlers_events.py @@ -139,6 +139,28 @@ class HelperEvent: if oob_desc is not None: self.additional_data['gajim']['oob_desc'] = oob_desc + def get_stanza_id(self, stanza, query=False): + if query: + # On a MAM query the stanza-id is maybe not set, so + # get the id of the stanza + return stanza.getAttr('id') + stanza_id, by = stanza.getStanzaIDAttrs() + if by is None: + # We can not verify who set this stanza-id, ignore it. + return + elif not self.conn.get_own_jid().bareMatch(by): + # by attribute does not match the server, ignore it. + return + return stanza_id + + @staticmethod + def get_forwarded_message(stanza): + forwarded = stanza.getTag('forwarded', + namespace=nbxmpp.NS_FORWARD, + protocol=True) + if forwarded is not None: + return forwarded.getTag('message', protocol=True) + class HttpAuthReceivedEvent(nec.NetworkIncomingEvent): name = 'http-auth-received' base_network_events = [] @@ -1026,6 +1048,7 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent): :stanza: Complete stanza Node :forwarded: Forwarded Node :result: Result Node + :unique_id: The unique stable id ''' self._set_base_event_vars_as_attributes(base_event) self.additional_data = {} @@ -1046,6 +1069,13 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent): log.info('Received groupchat message from user archive') return False + # use stanza-id as unique-id + self.unique_id, origin_id = self.get_unique_id() + + # Check for duplicates + if app.logger.find_stanza_id(self.unique_id, origin_id): + return + self.msgtxt = self.msg_.getTagData('body') self.query_id = self.result.getAttr('queryid') @@ -1057,24 +1087,12 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent): to = own_jid if frm.bareMatch(own_jid): - self.stanza_id = self.msg_.getOriginID() - if self.stanza_id is None: - self.stanza_id = self.msg_.getID() - self.with_ = to self.kind = KindConstant.CHAT_MSG_SENT else: - if self.result.getNamespace() == nbxmpp.NS_MAM_2: - self.stanza_id = self.result.getID() - else: - self.stanza_id = self.msg_.getID() - self.with_ = frm self.kind = KindConstant.CHAT_MSG_RECV - if self.stanza_id is None: - log.debug('Could not retrieve stanza-id') - delay = self.forwarded.getTagAttr( 'delay', 'stamp', namespace=nbxmpp.NS_DELAY2) if delay is None: @@ -1097,9 +1115,31 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent): log.warning('Received MAM message with ' 'invalid user timestamp: %s', user_delay) - log.debug('Received mam-message: stanza id: %s', self.stanza_id) + log.debug('Received mam-message: unique id: %s', self.unique_id) return True + def get_unique_id(self): + if self.conn.get_own_jid().bareMatch(self.msg_.getFrom()): + # On our own Messages we have to check for both + # stanza-id and origin-id, because other resources + # maybe not support origin-id + stanza_id = None + if self.result.getNamespace() == nbxmpp.NS_MAM_2: + # Only mam:2 ensures valid stanza-id + stanza_id = self.get_stanza_id(self.result, query=True) + + # try always to get origin-id because its a message + # we sent. + origin_id = self.msg_.getOriginID() + + return stanza_id, origin_id + + # A message we received + elif self.result.getNamespace() == nbxmpp.NS_MAM_2: + # Only mam:2 ensures valid stanza-id + return self.get_stanza_id(self.result, query=True), None + return None, None + class MamDecryptedMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent): name = 'mam-decrypted-message-received' base_network_events = [] @@ -1177,6 +1217,14 @@ class MessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent): self.stanza.getFrom()) return + # Check for duplicates + self.unique_id = self.get_unique_id() + # Check groupchat messages for duplicates, + # We do this because of MUC History messages + if self.stanza.getType() == 'groupchat': + if app.logger.find_stanza_id(self.unique_id): + return + address_tag = self.stanza.getTag('addresses', namespace=nbxmpp.NS_ADDRESS) # Be sure it comes from one of our resource, else ignore address element @@ -1248,7 +1296,8 @@ class MessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent): conn=self.conn, stanza=self.stanza, forwarded=forwarded, - result=result)) + result=result, + stanza_id=self.unique_id)) return # Mediated invitation? @@ -1320,6 +1369,38 @@ class MessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent): return True + def get_unique_id(self): + if self.stanza.getType() == 'groupchat': + # TODO: Disco the MUC check if 'urn:xmpp:mam:2' is announced + return self.get_stanza_id(self.stanza) + + elif self.stanza.getType() != 'chat': + return + + # Messages we receive live + if self.conn.archiving_namespace != nbxmpp.NS_MAM_2: + # Only mam:2 ensures valid stanza-id + return + + # Sent Carbon + sent_carbon = self.stanza.getTag('sent', + namespace=nbxmpp.NS_CARBONS, + protocol=True) + if sent_carbon is not None: + message = self.get_forwarded_message(sent_carbon) + return self.get_stanza_id(message) + + # Received Carbon + received_carbon = self.stanza.getTag('received', + namespace=nbxmpp.NS_CARBONS, + protocol=True) + if received_carbon is not None: + message = self.get_forwarded_message(received_carbon) + return self.get_stanza_id(message) + + # Normal Message + return self.get_stanza_id(self.stanza) + class ZeroconfMessageReceivedEvent(MessageReceivedEvent): name = 'message-received' base_network_events = [] @@ -1416,6 +1497,7 @@ class DecryptedMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent): self.stanza = self.msg_obj.stanza self.additional_data = self.msg_obj.additional_data self.id_ = self.msg_obj.id_ + self.unique_id = self.msg_obj.unique_id self.jid = self.msg_obj.jid self.fjid = self.msg_obj.fjid self.resource = self.msg_obj.resource @@ -1497,6 +1579,7 @@ class GcMessageReceivedEvent(nec.NetworkIncomingEvent): else: self.additional_data = self.msg_obj.additional_data self.id_ = self.msg_obj.stanza.getID() + self.unique_id = self.msg_obj.unique_id self.fjid = self.msg_obj.fjid self.msgtxt = self.msg_obj.msgtxt self.jid = self.msg_obj.jid diff --git a/gajim/common/logger.py b/gajim/common/logger.py index 798405093..dbd04acbe 100644 --- a/gajim/common/logger.py +++ b/gajim/common/logger.py @@ -1084,6 +1084,33 @@ class Logger: return True return False + def find_stanza_id(self, stanza_id, origin_id=None): + """ + Checks if a stanza-id is already in the `logs` table + + :param stanza_id: The stanza-id + + return True if the stanza-id was found + """ + ids = [] + if stanza_id is not None: + ids.append(stanza_id) + if origin_id is not None: + ids.append(origin_id) + + sql = ''' + SELECT stanza_id FROM logs + WHERE stanza_id IN ({values}) LIMIT 1 + '''.format(values=', '.join('?' * len(ids))) + + result = self.con.execute(sql, tuple(ids)).fetchone() + + if result is not None: + log.info('Found duplicated message, stanza-id: %s, origin-id: %s', + stanza_id, origin_id) + return True + return False + def insert_jid(self, jid, kind=None, type_=JIDConstant.NORMAL_TYPE): """ Insert a new jid into the `jids` table. @@ -1129,6 +1156,9 @@ class Logger: lastrowid = self.con.execute(sql, (jid_id, time_, kind) + tuple(kwargs.values())).lastrowid + log.info('Insert into DB: jid: %s, time: %s, kind: %s, stanza_id: %s', + jid, time_, kind, kwargs.get('stanza_id', None)) + if unread and kind == KindConstant.CHAT_MSG_RECV: sql = '''INSERT INTO unread_messages (message_id, jid_id) VALUES (?, (SELECT jid_id FROM jids WHERE jid = ?))''' diff --git a/gajim/common/message_archiving.py b/gajim/common/message_archiving.py index 451c5d8af..2e516232b 100644 --- a/gajim/common/message_archiving.py +++ b/gajim/common/message_archiving.py @@ -127,14 +127,18 @@ class ConnectionArchive313: def _nec_mam_decrypted_message_received(self, obj): if obj.conn.name != self.name: return + # if self.archiving_namespace != nbxmpp.NS_MAM_2: + # Fallback duplicate search without stanza-id duplicate = app.logger.search_for_duplicate( obj.with_, obj.timestamp, obj.msgtxt) - if not duplicate: - app.logger.insert_into_logs( - obj.with_, obj.timestamp, obj.kind, - unread=False, - message=obj.msgtxt, - additional_data=obj.additional_data) + if duplicate: + return + app.logger.insert_into_logs( + obj.with_, obj.timestamp, obj.kind, + unread=False, + message=obj.msgtxt, + additional_data=obj.additional_data, + stanza_id=obj.unique_id) def get_query_id(self): self.mam_query_id = self.connection.getAnID() diff --git a/gajim/session.py b/gajim/session.py index de18f912d..63116412e 100644 --- a/gajim/session.py +++ b/gajim/session.py @@ -120,7 +120,8 @@ class ChatControlSession(stanza_session.EncryptedStanzaSession): jid, obj.timestamp, log_type, message=msg_to_log, subject=obj.subject, - additional_data=obj.additional_data) + additional_data=obj.additional_data, + stanza_id=obj.unique_id) # Handle chat states if contact and (not obj.forwarded or not obj.sent): From d4fdf0d7c1da7a06518acb7473e387aa6d285e1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20H=C3=B6rist?= Date: Wed, 1 Nov 2017 23:07:11 +0100 Subject: [PATCH 3/3] Shorten search intervall for MAM messages --- gajim/common/logger.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gajim/common/logger.py b/gajim/common/logger.py index dbd04acbe..d742e2322 100644 --- a/gajim/common/logger.py +++ b/gajim/common/logger.py @@ -1064,9 +1064,9 @@ class Logger: :param msg: The message text """ - # Add 5 minutes around the timestamp - start_time = timestamp - 300 - end_time = timestamp + 300 + # Add 10 seconds around the timestamp + start_time = timestamp - 10 + end_time = timestamp + 10 log.debug('start: %s, end: %s, jid: %s, message: %s', start_time, end_time, jid, msg)