# This file is part of Gajim. # # Gajim is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation; version 3 only. # # Gajim is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Gajim. If not, see . # XEP-0313: Message Archive Management import logging from datetime import datetime, timedelta import nbxmpp from gajim.common import app from gajim.common.nec import NetworkIncomingEvent from gajim.common.const import ArchiveState, JIDConstant, KindConstant from gajim.common.caps_cache import muc_caps_cache from gajim.common.modules.misc import parse_delay from gajim.common.modules.misc import parse_oob from gajim.common.modules.misc import parse_correction from gajim.common.modules.misc import parse_eme from gajim.common.modules.util import is_self_message from gajim.common.modules.util import is_muc_pm log = logging.getLogger('gajim.c.m.archiving') class MAM: def __init__(self, con): self._con = con self._account = con.name self.handlers = [ ('message', self._mam_message_received, '', nbxmpp.NS_MAM_1), ('message', self._mam_message_received, '', nbxmpp.NS_MAM_2), ] self.available = False self.archiving_namespace = None self._mam_query_ids = {} # Holds archive jids where catch up was successful self._catch_up_finished = [] def pass_disco(self, from_, _identities, features, _data, _node): if nbxmpp.NS_MAM_2 in features: self.archiving_namespace = nbxmpp.NS_MAM_2 elif nbxmpp.NS_MAM_1 in features: self.archiving_namespace = nbxmpp.NS_MAM_1 else: return self.available = True log.info('Discovered MAM %s: %s', self.archiving_namespace, from_) # TODO: Move this GUI code out action = app.app.lookup_action('%s-archive' % self._account) action.set_enabled(True) def _from_valid_archive(self, stanza, message, groupchat): if groupchat: expected_archive = message.getFrom() else: expected_archive = self._con.get_own_jid() archive_jid = stanza.getFrom() if archive_jid is None: if groupchat: return # Message from our own archive return self._con.get_own_jid() if archive_jid.bareMatch(expected_archive): return archive_jid def _get_unique_id(self, result, message, groupchat, self_message, muc_pm): stanza_id = result.getAttr('id') if groupchat: return stanza_id, None origin_id = message.getOriginID() if self_message: return None, origin_id if muc_pm: return stanza_id, origin_id if self._con.get_own_jid().bareMatch(message.getFrom()): # message we sent return stanza_id, origin_id # A message we received return stanza_id, None def _mam_message_received(self, _con, stanza): app.nec.push_incoming_event( NetworkIncomingEvent('raw-mam-message-received', conn=self._con, stanza=stanza)) result = stanza.getTag('result', protocol=True) queryid = result.getAttr('queryid') forwarded = result.getTag('forwarded', namespace=nbxmpp.NS_FORWARD, protocol=True) message = forwarded.getTag('message', protocol=True) groupchat = message.getType() == 'groupchat' archive_jid = self._from_valid_archive(stanza, message, groupchat) if archive_jid is None: log.warning('Message from invalid archive %s', stanza) raise nbxmpp.NodeProcessed log.info('Received message from archive: %s', archive_jid) if not self._is_valid_request(archive_jid, queryid): log.warning('Invalid MAM Message: unknown query id') log.debug(stanza) raise nbxmpp.NodeProcessed # Timestamp parsing timestamp = parse_delay(forwarded) if timestamp is None: raise nbxmpp.NodeProcessed user_timestamp = parse_delay(message) # Fix for self messaging if not groupchat: to = message.getTo() if to is None: # Some servers dont set the 'to' attribute when # we send a message to ourself message.setTo(self._con.get_own_jid()) event_attrs = {} if groupchat: event_attrs.update(self._parse_gc_attrs(message)) else: event_attrs.update(self._parse_chat_attrs(message)) self_message = is_self_message(message, groupchat) muc_pm = is_muc_pm(message, event_attrs['with_'], groupchat) stanza_id, origin_id = self._get_unique_id( result, message, groupchat, self_message, muc_pm) message_id = message.getID() # Check for duplicates namespace = self.archiving_namespace if groupchat: namespace = muc_caps_cache.get_mam_namespace( archive_jid.getStripped()) if namespace == nbxmpp.NS_MAM_2: # Search only with stanza-id for duplicates on mam:2 if app.logger.find_stanza_id(self._account, archive_jid.getStripped(), stanza_id, origin_id, groupchat=groupchat): log.info('Found duplicate with stanza-id') raise nbxmpp.NodeProcessed msgtxt = message.getTagData('body') event_attrs.update( {'conn': self._con, 'additional_data': {}, 'encrypted': False, 'timestamp': timestamp, 'user_timestamp': user_timestamp, 'self_message': self_message, 'groupchat': groupchat, 'muc_pm': muc_pm, 'stanza_id': stanza_id, 'origin_id': origin_id, 'message_id': message_id, 'correct_id': None, 'archive_jid': archive_jid, 'msgtxt': msgtxt, 'message': message, 'namespace': namespace, }) if groupchat: event = MamGcMessageReceivedEvent(None, **event_attrs) else: event = MamMessageReceivedEvent(None, **event_attrs) app.plugin_manager.extension_point( 'decrypt', self._con, event, self._decryption_finished) if not event.encrypted: eme = parse_eme(event.message) if eme is not None: event.msgtxt = eme self._decryption_finished(event) raise nbxmpp.NodeProcessed @staticmethod def _parse_gc_attrs(message): with_ = message.getFrom() nick = message.getFrom().getResource() # Get the real jid if we have it real_jid = None muc_user = message.getTag('x', namespace=nbxmpp.NS_MUC_USER) if muc_user is not None: real_jid = muc_user.getTagAttr('item', 'jid') if real_jid is not None: real_jid = nbxmpp.JID(real_jid) return {'with_': with_, 'nick': nick, 'real_jid': real_jid, 'kind': KindConstant.GC_MSG} def _parse_chat_attrs(self, message): frm = message.getFrom() to = message.getTo() if frm.bareMatch(self._con.get_own_jid()): with_ = to kind = KindConstant.CHAT_MSG_SENT else: with_ = frm kind = KindConstant.CHAT_MSG_RECV return {'with_': with_, 'nick': None, 'kind': kind} def _decryption_finished(self, event): if not event.msgtxt: # For example Chatstates, Receipts, Chatmarkers log.debug(event.message.getProperties()) return log.debug(event.msgtxt) event.correct_id = parse_correction(event.message) parse_oob(event.message, event.additional_data) with_ = event.with_.getStripped() if event.muc_pm: # we store the message with the full JID with_ = str(event.with_) stanza_id = event.stanza_id if event.self_message: # Self messages can only be deduped with origin-id if event.origin_id is None: log.warning('Self message without origin-id found') return stanza_id = event.origin_id if event.namespace == nbxmpp.NS_MAM_1: if app.logger.search_for_duplicate( self._account, with_, event.timestamp, event.msgtxt): log.info('Found duplicate with fallback for mam:1') return app.logger.insert_into_logs(self._account, with_, event.timestamp, event.kind, unread=False, message=event.msgtxt, contact_name=event.nick, additional_data=event.additional_data, stanza_id=stanza_id) app.nec.push_incoming_event( MamDecryptedMessageReceived(None, **vars(event))) def _is_valid_request(self, jid, query_id): if query_id is None: return False valid_id = self._mam_query_ids.get(jid.getStripped(), None) return valid_id == query_id def _get_query_id(self, jid): query_id = self._con.connection.getAnID() self._mam_query_ids[jid] = query_id return query_id @staticmethod def _parse_iq(stanza): if not nbxmpp.isResultNode(stanza): log.error('Error on MAM query: %s', stanza.getError()) raise InvalidMamIQ fin = stanza.getTag('fin') if fin is None: log.error('Malformed MAM query result received: %s', stanza) raise InvalidMamIQ set_ = fin.getTag('set', namespace=nbxmpp.NS_RSM) if set_ is None: log.error( 'Malformed MAM query result received (no "set" Node): %s', stanza) raise InvalidMamIQ return fin, set_ def _get_from_jid(self, stanza): jid = stanza.getFrom() if jid is None: # No from means, iq from our own archive jid = self._con.get_own_jid().getStripped() else: jid = jid.getStripped() return jid def request_archive_count(self, start_date, end_date): jid = self._con.get_own_jid().getStripped() log.info('Request archive count from: %s', jid) query_id = self._get_query_id(jid) query = self._get_archive_query( query_id, start=start_date, end=end_date, max_=0) self._con.connection.SendAndCallForResponse( query, self._received_count, {'query_id': query_id}) return query_id def _received_count(self, _con, stanza, query_id): try: _, set_ = self._parse_iq(stanza) except InvalidMamIQ: return jid = self._get_from_jid(stanza) self._mam_query_ids.pop(jid) count = set_.getTagData('count') log.info('Received archive count: %s', count) app.nec.push_incoming_event(ArchivingCountReceived( None, query_id=query_id, count=count)) def request_archive_on_signin(self): own_jid = self._con.get_own_jid().getStripped() if own_jid in self._mam_query_ids: log.warning('MAM request for %s already running', own_jid) return archive = app.logger.get_archive_timestamp(own_jid) # Migration of last_mam_id from config to DB if archive is not None: mam_id = archive.last_mam_id else: mam_id = app.config.get_per( 'accounts', self._account, 'last_mam_id') if mam_id: app.config.del_per('accounts', self._account, 'last_mam_id') start_date = None query_id = self._get_query_id(own_jid) if mam_id: log.info('MAM query after: %s', mam_id) query = self._get_archive_query(query_id, after=mam_id) else: # First Start, we request the last week start_date = datetime.utcnow() - timedelta(days=7) log.info('First start: query archive start: %s', start_date) query = self._get_archive_query(query_id, start=start_date) if own_jid in self._catch_up_finished: self._catch_up_finished.remove(own_jid) self._send_archive_query(query, query_id, start_date) def request_archive_on_muc_join(self, jid): archive = app.logger.get_archive_timestamp( jid, type_=JIDConstant.ROOM_TYPE) query_id = self._get_query_id(jid) start_date = None if archive is not None: log.info('Request from archive %s after %s:', jid, archive.last_mam_id) query = self._get_archive_query( query_id, jid=jid, after=archive.last_mam_id) else: # First Start, we dont request history # Depending on what a MUC saves, there could be thousands # of Messages even in just one day. start_date = datetime.utcnow() - timedelta(days=1) log.info('First join: query archive %s from: %s', jid, start_date) query = self._get_archive_query( query_id, jid=jid, start=start_date) if jid in self._catch_up_finished: self._catch_up_finished.remove(jid) self._send_archive_query(query, query_id, start_date, groupchat=True) def _send_archive_query(self, query, query_id, start_date=None, groupchat=False): self._con.connection.SendAndCallForResponse( query, self._result_finished, {'query_id': query_id, 'start_date': start_date, 'groupchat': groupchat}) def _result_finished(self, _con, stanza, query_id, start_date, groupchat): try: fin, set_ = self._parse_iq(stanza) except InvalidMamIQ: return jid = self._get_from_jid(stanza) last = set_.getTagData('last') if last is None: log.info('End of MAM query, no items retrieved') self._catch_up_finished.append(jid) self._mam_query_ids.pop(jid) return complete = fin.getAttr('complete') app.logger.set_archive_timestamp( jid, last_mam_id=last, last_muc_timestamp=None) if complete != 'true': self._mam_query_ids.pop(jid) query_id = self._get_query_id(jid) query = self._get_archive_query(query_id, jid=jid, after=last) self._send_archive_query(query, query_id, groupchat=groupchat) else: self._mam_query_ids.pop(jid) if start_date is not None: app.logger.set_archive_timestamp( jid, last_mam_id=last, oldest_mam_timestamp=start_date.timestamp()) self._catch_up_finished.append(jid) log.info('End of MAM query, last mam id: %s', last) def request_archive_interval(self, start_date, end_date, after=None, query_id=None): jid = self._con.get_own_jid().getStripped() if after is None: log.info('Request intervall from %s to %s from %s', start_date, end_date, jid) else: log.info('Query page after %s from %s', after, jid) if query_id is None: query_id = self._get_query_id(jid) self._mam_query_ids[jid] = query_id query = self._get_archive_query(query_id, start=start_date, end=end_date, after=after, max_=30) self._con.connection.SendAndCallForResponse( query, self._intervall_result, {'query_id': query_id, 'start_date': start_date, 'end_date': end_date}) return query_id def _intervall_result(self, _con, stanza, query_id, start_date, end_date): try: fin, set_ = self._parse_iq(stanza) except InvalidMamIQ: return jid = self._get_from_jid(stanza) self._mam_query_ids.pop(jid) if start_date: timestamp = start_date.timestamp() else: timestamp = ArchiveState.ALL last = set_.getTagData('last') if last is None: app.nec.push_incoming_event(ArchivingIntervalFinished( None, query_id=query_id)) app.logger.set_archive_timestamp( jid, oldest_mam_timestamp=timestamp) log.info('End of MAM request, no items retrieved') return complete = fin.getAttr('complete') if complete != 'true': self.request_archive_interval(start_date, end_date, last, query_id) else: log.info('Request finished') app.logger.set_archive_timestamp( jid, oldest_mam_timestamp=timestamp) app.nec.push_incoming_event(ArchivingIntervalFinished( None, query_id=query_id)) def _get_archive_query(self, query_id, jid=None, start=None, end=None, with_=None, after=None, max_=70): # Muc archive query? namespace = muc_caps_cache.get_mam_namespace(jid) if namespace is None: # Query to our own archive namespace = self.archiving_namespace iq = nbxmpp.Iq('set', to=jid) query = iq.addChild('query', namespace=namespace) form = query.addChild(node=nbxmpp.DataForm(typ='submit')) field = nbxmpp.DataField(typ='hidden', name='FORM_TYPE', value=namespace) form.addChild(node=field) if start: field = nbxmpp.DataField( typ='text-single', name='start', value=start.strftime('%Y-%m-%dT%H:%M:%SZ')) form.addChild(node=field) if end: field = nbxmpp.DataField(typ='text-single', name='end', value=end.strftime('%Y-%m-%dT%H:%M:%SZ')) form.addChild(node=field) if with_: field = nbxmpp.DataField(typ='jid-single', name='with', value=with_) form.addChild(node=field) set_ = query.setTag('set', namespace=nbxmpp.NS_RSM) set_.setTagData('max', max_) if after: set_.setTagData('after', after) query.setAttr('queryid', query_id) return iq def save_archive_id(self, jid, stanza_id, timestamp): if stanza_id is None: return if jid is None: jid = self._con.get_own_jid().getStripped() if jid not in self._catch_up_finished: return log.info('Save: %s: %s, %s', jid, stanza_id, timestamp) app.logger.set_archive_timestamp( jid, last_mam_id=stanza_id, last_muc_timestamp=timestamp) def request_mam_preferences(self): log.info('Request MAM preferences') iq = nbxmpp.Iq('get', self.archiving_namespace) iq.setQuery('prefs') self._con.connection.SendAndCallForResponse( iq, self._preferences_received) def _preferences_received(self, stanza): if not nbxmpp.isResultNode(stanza): log.info('Error: %s', stanza.getError()) app.nec.push_incoming_event(MAMPreferenceError( None, conn=self._con, error=stanza.getError())) return log.info('Received MAM preferences') prefs = stanza.getTag('prefs', namespace=self.archiving_namespace) if prefs is None: log.error('Malformed stanza (no prefs node): %s', stanza) return rules = [] default = prefs.getAttr('default') for item in prefs.getTag('always').getTags('jid'): rules.append((item.getData(), 'Always')) for item in prefs.getTag('never').getTags('jid'): rules.append((item.getData(), 'Never')) app.nec.push_incoming_event(MAMPreferenceReceived( None, conn=self._con, rules=rules, default=default)) def set_mam_preferences(self, rules, default): iq = nbxmpp.Iq(typ='set') prefs = iq.addChild(name='prefs', namespace=self.archiving_namespace, attrs={'default': default}) always = prefs.addChild(name='always') never = prefs.addChild(name='never') for item in rules: jid, archive = item if archive: always.addChild(name='jid').setData(jid) else: never.addChild(name='jid').setData(jid) self._con.connection.SendAndCallForResponse( iq, self._preferences_saved) def _preferences_saved(self, stanza): if not nbxmpp.isResultNode(stanza): log.info('Error: %s', stanza.getError()) app.nec.push_incoming_event(MAMPreferenceError( None, conn=self._con, error=stanza.getError())) else: log.info('Preferences saved') app.nec.push_incoming_event( MAMPreferenceSaved(None, conn=self._con)) class MamMessageReceivedEvent(NetworkIncomingEvent): name = 'mam-message-received' class MamGcMessageReceivedEvent(NetworkIncomingEvent): name = 'mam-gc-message-received' class MamDecryptedMessageReceived(NetworkIncomingEvent): name = 'mam-decrypted-message-received' class MAMPreferenceError(NetworkIncomingEvent): name = 'mam-prefs-error' class MAMPreferenceReceived(NetworkIncomingEvent): name = 'mam-prefs-received' class MAMPreferenceSaved(NetworkIncomingEvent): name = 'mam-prefs-saved' class ArchivingCountReceived(NetworkIncomingEvent): name = 'archiving-count-received' class ArchivingIntervalFinished(NetworkIncomingEvent): name = 'archiving-interval-finished' class ArchivingErrorReceived(NetworkIncomingEvent): name = 'archiving-error-received' class InvalidMamIQ(Exception): pass def get_instance(*args, **kwargs): return MAM(*args, **kwargs), 'MAM'