MAM: Request from last received stanza-id

Record stanza-ids from live messages. If Gajim is started query the
archive from the last received stanza-id
This commit is contained in:
Philipp Hörist 2018-07-29 19:36:02 +02:00
parent dbf891e97c
commit 137bf1f831
5 changed files with 51 additions and 31 deletions

View File

@ -268,7 +268,7 @@ class ConnectionHandlersBase:
if obj.conn.name != self.name: if obj.conn.name != self.name:
return return
self._check_for_mam_compliance(obj.jid, obj.unique_id) self._check_for_mam_compliance(obj.jid, obj.stanza_id)
if (app.config.should_log(obj.conn.name, obj.jid) and if (app.config.should_log(obj.conn.name, obj.jid) and
obj.msgtxt and obj.nick): obj.msgtxt and obj.nick):
@ -282,8 +282,10 @@ class ConnectionHandlersBase:
message=obj.msgtxt, message=obj.msgtxt,
contact_name=obj.nick, contact_name=obj.nick,
additional_data=obj.additional_data, additional_data=obj.additional_data,
stanza_id=obj.unique_id) stanza_id=obj.stanza_id)
app.logger.set_room_last_message_time(obj.room_jid, obj.timestamp) app.logger.set_room_last_message_time(obj.room_jid, obj.timestamp)
self.get_module('MAM').save_archive_id(
obj.room_jid, obj.stanza_id, obj.timestamp)
# process and dispatch an error message # process and dispatch an error message
def dispatch_error_message(self, msg, msgtxt, session, frm, tim): def dispatch_error_message(self, msg, msgtxt, session, frm, tim):

View File

@ -1429,10 +1429,11 @@ class Logger:
exists = self.get_archive_timestamp(jid) exists = self.get_archive_timestamp(jid)
if not exists: if not exists:
sql = '''INSERT INTO last_archive_message VALUES (?, ?, ?, ?)''' sql = '''INSERT INTO last_archive_message VALUES (?, ?, ?, ?)'''
self._con.execute(sql, (jid_id, self._con.execute(sql, (
kwargs.get('last_mam_id', None), jid_id,
kwargs.get('oldest_mam_timestamp', None), kwargs.get('last_mam_id', None),
kwargs.get('last_muc_timestamp', None))) kwargs.get('oldest_mam_timestamp', None),
kwargs.get('last_muc_timestamp', None)))
else: else:
args = ' = ?, '.join(kwargs.keys()) + ' = ?' args = ' = ?, '.join(kwargs.keys()) + ' = ?'
sql = '''UPDATE last_archive_message SET {} sql = '''UPDATE last_archive_message SET {}

View File

@ -47,6 +47,9 @@ class MAM:
self.archiving_namespace = None self.archiving_namespace = None
self._mam_query_ids = {} self._mam_query_ids = {}
# Holds archive jids where catch up was successful
self._catch_up_finished = []
def pass_disco(self, from_, identities, features, data, node): def pass_disco(self, from_, identities, features, data, node):
if nbxmpp.NS_MAM_2 in features: if nbxmpp.NS_MAM_2 in features:
self.archiving_namespace = nbxmpp.NS_MAM_2 self.archiving_namespace = nbxmpp.NS_MAM_2
@ -369,6 +372,9 @@ class MAM:
start_date = datetime.utcnow() - timedelta(days=7) start_date = datetime.utcnow() - timedelta(days=7)
log.info('First start: query archive start: %s', start_date) log.info('First start: query archive start: %s', start_date)
query = self._get_archive_query(query_id, start=start_date) query = self._get_archive_query(query_id, start=start_date)
if own_jid in self._catch_up_finished:
self._catch_up_finished.remove(own_jid)
self._send_archive_query(query, query_id, start_date) self._send_archive_query(query, query_id, start_date)
def request_archive_on_muc_join(self, jid): def request_archive_on_muc_join(self, jid):
@ -388,6 +394,9 @@ class MAM:
start_date = datetime.utcnow() - timedelta(days=1) start_date = datetime.utcnow() - timedelta(days=1)
log.info('First join: query archive %s from: %s', jid, start_date) log.info('First join: query archive %s from: %s', jid, start_date)
query = self._get_archive_query(query_id, jid=jid, start=start_date) query = self._get_archive_query(query_id, jid=jid, start=start_date)
if jid in self._catch_up_finished:
self._catch_up_finished.remove(jid)
self._send_archive_query(query, query_id, start_date, groupchat=True) self._send_archive_query(query, query_id, start_date, groupchat=True)
def _send_archive_query(self, query, query_id, start_date=None, def _send_archive_query(self, query, query_id, start_date=None,
@ -408,11 +417,13 @@ class MAM:
last = set_.getTagData('last') last = set_.getTagData('last')
if last is None: if last is None:
log.info('End of MAM query, no items retrieved') log.info('End of MAM query, no items retrieved')
self._catch_up_finished.append(jid)
self._mam_query_ids.pop(jid) self._mam_query_ids.pop(jid)
return return
complete = fin.getAttr('complete') complete = fin.getAttr('complete')
app.logger.set_archive_timestamp(jid, last_mam_id=last) app.logger.set_archive_timestamp(
jid, last_mam_id=last, last_muc_timestamp=None)
if complete != 'true': if complete != 'true':
self._mam_query_ids.pop(jid) self._mam_query_ids.pop(jid)
query_id = self._get_query_id(jid) query_id = self._get_query_id(jid)
@ -425,6 +436,8 @@ class MAM:
jid, jid,
last_mam_id=last, last_mam_id=last,
oldest_mam_timestamp=start_date.timestamp()) oldest_mam_timestamp=start_date.timestamp())
self._catch_up_finished.append(jid)
log.info('End of MAM query, last mam id: %s', last) log.info('End of MAM query, last mam id: %s', last)
def request_archive_interval(self, start_date, end_date, after=None, def request_archive_interval(self, start_date, end_date, after=None,
@ -517,6 +530,17 @@ class MAM:
query.setAttr('queryid', query_id) query.setAttr('queryid', query_id)
return iq return iq
def save_archive_id(self, jid, stanza_id, timestamp):
if stanza_id is None:
return
if jid is None:
jid = self._con.get_own_jid().getStripped()
if jid not in self._catch_up_finished:
return
log.info('Save: %s: %s, %s', jid, stanza_id, timestamp)
app.logger.set_archive_timestamp(
jid, last_mam_id=stanza_id, last_muc_timestamp=timestamp)
def request_mam_preferences(self): def request_mam_preferences(self):
log.info('Request MAM preferences') log.info('Request MAM preferences')
iq = nbxmpp.Iq('get', self.archiving_namespace) iq = nbxmpp.Iq('get', self.archiving_namespace)

View File

@ -109,8 +109,8 @@ class Message:
jid, resource = app.get_room_and_nick_from_fjid(fjid) jid, resource = app.get_room_and_nick_from_fjid(fjid)
# Check for duplicates # Check for duplicates
unique_id = self._get_unique_id(stanza, forwarded, sent, stanza_id, origin_id = self._get_unique_id(
self_message, muc_pm) stanza, forwarded, sent, self_message, muc_pm)
# Check groupchat messages for duplicates, # Check groupchat messages for duplicates,
# We do this because of MUC History messages # We do this because of MUC History messages
@ -121,8 +121,9 @@ class Message:
archive_jid = self._con.get_own_jid().getStripped() archive_jid = self._con.get_own_jid().getStripped()
if app.logger.find_stanza_id(self._account, if app.logger.find_stanza_id(self._account,
archive_jid, archive_jid,
unique_id, stanza_id,
groupchat=type_ == 'groupchat'): origin_id,
type_ == 'groupchat'):
return return
thread_id = stanza.getThread() thread_id = stanza.getThread()
@ -177,7 +178,8 @@ class Message:
'fjid': fjid, 'fjid': fjid,
'jid': jid, 'jid': jid,
'resource': resource, 'resource': resource,
'unique_id': unique_id, 'stanza_id': stanza_id,
'unique_id': stanza_id or origin_id,
'mtype': type_, 'mtype': type_,
'msgtxt': msgtxt, 'msgtxt': msgtxt,
'thread_id': thread_id, 'thread_id': thread_id,
@ -215,7 +217,6 @@ class Message:
'form_node': parse_form(event.stanza), 'form_node': parse_form(event.stanza),
'xhtml': parse_xhtml(event.stanza), 'xhtml': parse_xhtml(event.stanza),
'chatstate': parse_chatstate(event.stanza), 'chatstate': parse_chatstate(event.stanza),
'stanza_id': event.unique_id
} }
parse_oob(event.stanza, event.additional_data) parse_oob(event.stanza, event.additional_data)
@ -235,7 +236,7 @@ class Message:
None, None,
conn=self._con, conn=self._con,
msg_obj=event, msg_obj=event,
stanza_id=event.unique_id)) stanza_id=event.stanza_id))
return return
app.nec.push_incoming_event( app.nec.push_incoming_event(
@ -245,30 +246,19 @@ class Message:
def _get_unique_id(self, stanza, forwarded, sent, self_message, muc_pm): def _get_unique_id(self, stanza, forwarded, sent, self_message, muc_pm):
if stanza.getType() == 'groupchat': if stanza.getType() == 'groupchat':
# TODO: Disco the MUC check if 'urn:xmpp:mam:2' is announced # TODO: Disco the MUC check if 'urn:xmpp:mam:2' is announced
return self._get_stanza_id(stanza) return self._get_stanza_id(stanza), None
if stanza.getType() != 'chat': if stanza.getType() != 'chat':
return return None, None
# Messages we receive live # Messages we receive live
if self._con.get_module('MAM').archiving_namespace != nbxmpp.NS_MAM_2: if self._con.get_module('MAM').archiving_namespace != nbxmpp.NS_MAM_2:
# Only mam:2 ensures valid stanza-id # Only mam:2 ensures valid stanza-id
return return None, None
if forwarded: if self_message:
if sent: return self._get_stanza_id(stanza), stanza.getOriginID()
if self_message or muc_pm: return self._get_stanza_id(stanza), None
return stanza.getOriginID()
return self._get_stanza_id(stanza)
else:
if muc_pm:
return stanza.getOriginID()
return self._get_stanza_id(stanza)
# Normal Message
if self_message or muc_pm:
return stanza.getOriginID()
return self._get_stanza_id(stanza)
def _get_stanza_id(self, stanza): def _get_stanza_id(self, stanza):
stanza_id, by = stanza.getStanzaIDAttrs() stanza_id, by = stanza.getStanzaIDAttrs()

View File

@ -139,6 +139,9 @@ class ChatControlSession(object):
additional_data=obj.additional_data, additional_data=obj.additional_data,
stanza_id=obj.unique_id) stanza_id=obj.unique_id)
self.conn.get_module('MAM').save_archive_id(
None, obj.stanza_id, obj.timestamp)
if obj.muc_pm and not obj.gc_control: if obj.muc_pm and not obj.gc_control:
# This is a carbon of a PM from a MUC we are not currently # This is a carbon of a PM from a MUC we are not currently
# joined. We log it silently without notification. # joined. We log it silently without notification.