# -*- coding:utf-8 -*- ## src/common/message_archiving.py ## ## Copyright (C) 2009 Anaƫl Verrier ## ## This file is part of Gajim. ## ## Gajim is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published ## by the Free Software Foundation; version 3 only. ## ## Gajim is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Gajim. If not, see . ## import logging from datetime import datetime, timedelta import nbxmpp from gajim.common import app from gajim.common import ged from gajim.common import helpers from gajim.common.logger import KindConstant, JIDConstant from gajim.common.const import ArchiveState from gajim.common.caps_cache import muc_caps_cache import gajim.common.connection_handlers_events as ev log = logging.getLogger('gajim.c.message_archiving') class ConnectionArchive313: def __init__(self): self.archiving_313_supported = False self.mam_awaiting_disco_result = {} self.iq_answer = [] self.mam_query_ids = [] app.nec.register_incoming_event(ev.MamMessageReceivedEvent) app.nec.register_incoming_event(ev.MamGcMessageReceivedEvent) app.ged.register_event_handler('agent-info-error-received', ged.CORE, self._nec_agent_info_error) app.ged.register_event_handler('agent-info-received', ged.CORE, self._nec_agent_info) app.ged.register_event_handler('mam-decrypted-message-received', ged.CORE, self._nec_mam_decrypted_message_received) app.ged.register_event_handler( 'archiving-313-preferences-changed-received', ged.CORE, self._nec_archiving_313_preferences_changed_received) def cleanup(self): app.ged.remove_event_handler('agent-info-error-received', ged.CORE, self._nec_agent_info_error) app.ged.remove_event_handler('agent-info-received', ged.CORE, self._nec_agent_info) app.ged.remove_event_handler('mam-decrypted-message-received', ged.CORE, self._nec_mam_decrypted_message_received) app.ged.remove_event_handler( 'archiving-313-preferences-changed-received', ged.CORE, self._nec_archiving_313_preferences_changed_received) def _nec_archiving_313_preferences_changed_received(self, obj): if obj.id in self.iq_answer: obj.answer = True def _nec_agent_info_error(self, obj): if obj.jid in self.mam_awaiting_disco_result: log.warn('Unable to discover %s, ignoring those logs', obj.jid) del self.mam_awaiting_disco_result[obj.jid] def _nec_agent_info(self, obj): if obj.jid not in self.mam_awaiting_disco_result: return for identity in obj.identities: if identity['category'] != 'conference': continue # it's a groupchat for msg_obj in self.mam_awaiting_disco_result[obj.jid]: app.logger.insert_jid(msg_obj.with_.getStripped(), type_=JIDConstant.ROOM_TYPE) app.nec.push_incoming_event( ev.MamDecryptedMessageReceivedEvent( None, disco=True, **vars(msg_obj))) del self.mam_awaiting_disco_result[obj.jid] return # it's not a groupchat for msg_obj in self.mam_awaiting_disco_result[obj.jid]: app.logger.insert_jid(msg_obj.with_.getStripped()) app.nec.push_incoming_event( ev.MamDecryptedMessageReceivedEvent( None, disco=True, **vars(msg_obj))) del self.mam_awaiting_disco_result[obj.jid] @staticmethod def parse_iq(stanza): if not nbxmpp.isResultNode(stanza): log.error('Error on MAM query: %s', stanza.getError()) raise InvalidMamIQ fin = stanza.getTag('fin') if fin is None: log.error('Malformed MAM query result received: %s', stanza) raise InvalidMamIQ set_ = fin.getTag('set', namespace=nbxmpp.NS_RSM) if set_ is None: log.error( 'Malformed MAM query result received (no "set" Node): %s', stanza) raise InvalidMamIQ return fin, set_ def parse_from_jid(self, stanza): jid = stanza.getFrom() if jid is None: # No from means, iq from our own archive jid = self.get_own_jid().getStripped() else: jid = jid.getStripped() return jid def _result_finished(self, conn, stanza, query_id, start_date, groupchat): try: fin, set_ = self.parse_iq(stanza) except InvalidMamIQ: return last = set_.getTagData('last') if last is None: log.info('End of MAM query, no items retrieved') return jid = self.parse_from_jid(stanza) complete = fin.getAttr('complete') app.logger.set_archive_timestamp(jid, last_mam_id=last) if complete != 'true': self.mam_query_ids.remove(query_id) query_id = self.get_query_id() query = self.get_archive_query(query_id, jid=jid, after=last) self._send_archive_query(query, query_id, groupchat=groupchat) else: self.mam_query_ids.remove(query_id) if start_date is not None: app.logger.set_archive_timestamp( jid, last_mam_id=last, oldest_mam_timestamp=start_date.timestamp()) log.info('End of MAM query, last mam id: %s', last) def _intervall_result_finished(self, conn, stanza, query_id, start_date, end_date, event_id): try: fin, set_ = self.parse_iq(stanza) except InvalidMamIQ: return self.mam_query_ids.remove(query_id) jid = self.parse_from_jid(stanza) if start_date: timestamp = start_date.timestamp() else: timestamp = ArchiveState.ALL last = set_.getTagData('last') if last is None: app.nec.push_incoming_event(ev.ArchivingIntervalFinished( None, event_id=event_id)) app.logger.set_archive_timestamp( jid, oldest_mam_timestamp=timestamp) log.info('End of MAM query, no items retrieved') return complete = fin.getAttr('complete') if complete != 'true': self.request_archive_interval(event_id, start_date, end_date, last) else: log.info('query finished') app.logger.set_archive_timestamp( jid, oldest_mam_timestamp=timestamp) app.nec.push_incoming_event(ev.ArchivingIntervalFinished( None, event_id=event_id, stanza=stanza)) def _received_count(self, conn, stanza, query_id, event_id): try: _, set_ = self.parse_iq(stanza) except InvalidMamIQ: return self.mam_query_ids.remove(query_id) count = set_.getTagData('count') log.info('message count received: %s', count) app.nec.push_incoming_event(ev.ArchivingCountReceived( None, event_id=event_id, count=count)) def _nec_mam_decrypted_message_received(self, obj): if obj.conn.name != self.name: return namespace = self.archiving_namespace if obj.groupchat: namespace = muc_caps_cache.get_mam_namespace(obj.room_jid) blacklisted = obj.room_jid in helpers.get_mam_blacklist() if namespace != nbxmpp.NS_MAM_2 or blacklisted: # Fallback duplicate search without stanza-id duplicate = app.logger.search_for_duplicate( self.name, obj.with_, obj.timestamp, obj.msgtxt) if duplicate: # dont propagate the event further return True app.logger.insert_into_logs(self.name, obj.with_, obj.timestamp, obj.kind, unread=False, message=obj.msgtxt, contact_name=obj.nick, additional_data=obj.additional_data, stanza_id=obj.unique_id) def get_query_id(self): query_id = self.connection.getAnID() self.mam_query_ids.append(query_id) return query_id def request_archive_on_signin(self): own_jid = self.get_own_jid().getStripped() archive = app.logger.get_archive_timestamp(own_jid) # Migration of last_mam_id from config to DB if archive is not None: mam_id = archive.last_mam_id else: mam_id = app.config.get_per('accounts', self.name, 'last_mam_id') start_date = None query_id = self.get_query_id() if mam_id: log.info('MAM query after: %s', mam_id) query = self.get_archive_query(query_id, after=mam_id) else: # First Start, we request the last week start_date = datetime.utcnow() - timedelta(days=7) log.info('First start: query archive start: %s', start_date) query = self.get_archive_query(query_id, start=start_date) self._send_archive_query(query, query_id, start_date) def request_archive_on_muc_join(self, jid): archive = app.logger.get_archive_timestamp( jid, type_=JIDConstant.ROOM_TYPE) query_id = self.get_query_id() start_date = None if archive is not None: log.info('Query Groupchat MAM Archive %s after %s:', jid, archive.last_mam_id) query = self.get_archive_query( query_id, jid=jid, after=archive.last_mam_id) else: # First Start, we dont request history # Depending on what a MUC saves, there could be thousands # of Messages even in just one day. start_date = datetime.utcnow() - timedelta(days=1) log.info('First join: query archive %s from: %s', jid, start_date) query = self.get_archive_query(query_id, jid=jid, start=start_date) self._send_archive_query(query, query_id, start_date, groupchat=True) def request_archive_count(self, event_id, start_date, end_date): query_id = self.get_query_id() query = self.get_archive_query( query_id, start=start_date, end=end_date, max_=0) self.connection.SendAndCallForResponse( query, self._received_count, {'query_id': query_id, 'event_id': event_id}) def request_archive_interval(self, event_id, start_date, end_date, after=None): query_id = self.get_query_id() query = self.get_archive_query(query_id, start=start_date, end=end_date, after=after, max_=30) app.nec.push_incoming_event(ev.ArchivingQueryID( None, event_id=event_id, query_id=query_id)) self.connection.SendAndCallForResponse( query, self._intervall_result_finished, {'query_id': query_id, 'start_date': start_date, 'end_date': end_date, 'event_id': event_id}) def _send_archive_query(self, query, query_id, start_date=None, groupchat=False): self.connection.SendAndCallForResponse( query, self._result_finished, {'query_id': query_id, 'start_date': start_date, 'groupchat': groupchat}) def get_archive_query(self, query_id, jid=None, start=None, end=None, with_=None, after=None, max_=30): # Muc archive query? namespace = muc_caps_cache.get_mam_namespace(jid) if namespace is None: # Query to our own archive namespace = self.archiving_namespace iq = nbxmpp.Iq('set', to=jid) query = iq.addChild('query', namespace=namespace) form = query.addChild(node=nbxmpp.DataForm(typ='submit')) field = nbxmpp.DataField(typ='hidden', name='FORM_TYPE', value=namespace) form.addChild(node=field) if start: field = nbxmpp.DataField(typ='text-single', name='start', value=start.strftime('%Y-%m-%dT%H:%M:%SZ')) form.addChild(node=field) if end: field = nbxmpp.DataField(typ='text-single', name='end', value=end.strftime('%Y-%m-%dT%H:%M:%SZ')) form.addChild(node=field) if with_: field = nbxmpp.DataField(typ='jid-single', name='with', value=with_) form.addChild(node=field) set_ = query.setTag('set', namespace=nbxmpp.NS_RSM) set_.setTagData('max', max_) if after: set_.setTagData('after', after) query.setAttr('queryid', query_id) return iq def request_archive_preferences(self): if not app.account_is_connected(self.name): return iq = nbxmpp.Iq(typ='get') id_ = self.connection.getAnID() iq.setID(id_) iq.addChild(name='prefs', namespace=self.archiving_namespace) self.connection.send(iq) def set_archive_preferences(self, items, default): if not app.account_is_connected(self.name): return iq = nbxmpp.Iq(typ='set') id_ = self.connection.getAnID() self.iq_answer.append(id_) iq.setID(id_) prefs = iq.addChild(name='prefs', namespace=self.archiving_namespace, attrs={'default': default}) always = prefs.addChild(name='always') never = prefs.addChild(name='never') for item in items: jid, preference = item if preference == 'always': always.addChild(name='jid').setData(jid) else: never.addChild(name='jid').setData(jid) self.connection.send(iq) def _ArchiveCB(self, con, iq_obj): app.nec.push_incoming_event(ev.ArchivingReceivedEvent(None, conn=self, stanza=iq_obj)) raise nbxmpp.NodeProcessed class InvalidMamIQ(Exception): pass