gajim-plural/gajim/common/message_archiving.py

374 lines
15 KiB
Python

# -*- coding:utf-8 -*-
## src/common/message_archiving.py
##
## Copyright (C) 2009 Anaël Verrier <elghinn AT free.fr>
##
## This file is part of Gajim.
##
## Gajim is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published
## by the Free Software Foundation; version 3 only.
##
## Gajim is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Gajim. If not, see <http://www.gnu.org/licenses/>.
##
import logging
from datetime import datetime, timedelta
import nbxmpp
from gajim.common import app
from gajim.common import ged
from gajim.common import helpers
from gajim.common.logger import KindConstant, JIDConstant
from gajim.common.const import ArchiveState
from gajim.common.caps_cache import muc_caps_cache
import gajim.common.connection_handlers_events as ev
log = logging.getLogger('gajim.c.message_archiving')
class ConnectionArchive313:
def __init__(self):
self.archiving_313_supported = False
self.mam_awaiting_disco_result = {}
self.iq_answer = []
self.mam_query_ids = []
app.nec.register_incoming_event(ev.MamMessageReceivedEvent)
app.nec.register_incoming_event(ev.MamGcMessageReceivedEvent)
app.ged.register_event_handler('agent-info-error-received', ged.CORE,
self._nec_agent_info_error)
app.ged.register_event_handler('agent-info-received', ged.CORE,
self._nec_agent_info)
app.ged.register_event_handler('mam-decrypted-message-received',
ged.CORE, self._nec_mam_decrypted_message_received)
app.ged.register_event_handler(
'archiving-313-preferences-changed-received', ged.CORE,
self._nec_archiving_313_preferences_changed_received)
def cleanup(self):
app.ged.remove_event_handler('agent-info-error-received', ged.CORE,
self._nec_agent_info_error)
app.ged.remove_event_handler('agent-info-received', ged.CORE,
self._nec_agent_info)
app.ged.remove_event_handler('mam-decrypted-message-received',
ged.CORE, self._nec_mam_decrypted_message_received)
app.ged.remove_event_handler(
'archiving-313-preferences-changed-received', ged.CORE,
self._nec_archiving_313_preferences_changed_received)
def _nec_archiving_313_preferences_changed_received(self, obj):
if obj.id in self.iq_answer:
obj.answer = True
def _nec_agent_info_error(self, obj):
if obj.jid in self.mam_awaiting_disco_result:
log.warn('Unable to discover %s, ignoring those logs', obj.jid)
del self.mam_awaiting_disco_result[obj.jid]
def _nec_agent_info(self, obj):
if obj.jid not in self.mam_awaiting_disco_result:
return
for identity in obj.identities:
if identity['category'] != 'conference':
continue
# it's a groupchat
for msg_obj in self.mam_awaiting_disco_result[obj.jid]:
app.logger.insert_jid(msg_obj.with_.getStripped(),
type_=JIDConstant.ROOM_TYPE)
app.nec.push_incoming_event(
ev.MamDecryptedMessageReceivedEvent(
None, disco=True, **vars(msg_obj)))
del self.mam_awaiting_disco_result[obj.jid]
return
# it's not a groupchat
for msg_obj in self.mam_awaiting_disco_result[obj.jid]:
app.logger.insert_jid(msg_obj.with_.getStripped())
app.nec.push_incoming_event(
ev.MamDecryptedMessageReceivedEvent(
None, disco=True, **vars(msg_obj)))
del self.mam_awaiting_disco_result[obj.jid]
@staticmethod
def parse_iq(stanza):
if not nbxmpp.isResultNode(stanza):
log.error('Error on MAM query: %s', stanza.getError())
raise InvalidMamIQ
fin = stanza.getTag('fin')
if fin is None:
log.error('Malformed MAM query result received: %s', stanza)
raise InvalidMamIQ
set_ = fin.getTag('set', namespace=nbxmpp.NS_RSM)
if set_ is None:
log.error(
'Malformed MAM query result received (no "set" Node): %s',
stanza)
raise InvalidMamIQ
return fin, set_
def parse_from_jid(self, stanza):
jid = stanza.getFrom()
if jid is None:
# No from means, iq from our own archive
jid = self.get_own_jid().getStripped()
else:
jid = jid.getStripped()
return jid
def _result_finished(self, conn, stanza, query_id, start_date, groupchat):
try:
fin, set_ = self.parse_iq(stanza)
except InvalidMamIQ:
return
last = set_.getTagData('last')
if last is None:
log.info('End of MAM query, no items retrieved')
return
jid = self.parse_from_jid(stanza)
complete = fin.getAttr('complete')
app.logger.set_archive_timestamp(jid, last_mam_id=last)
if complete != 'true':
self.mam_query_ids.remove(query_id)
query_id = self.get_query_id()
query = self.get_archive_query(query_id, jid=jid, after=last)
self._send_archive_query(query, query_id, groupchat=groupchat)
else:
self.mam_query_ids.remove(query_id)
if start_date is not None:
app.logger.set_archive_timestamp(
jid,
last_mam_id=last,
oldest_mam_timestamp=start_date.timestamp())
log.info('End of MAM query, last mam id: %s', last)
def _intervall_result_finished(self, conn, stanza, query_id,
start_date, end_date, event_id):
try:
fin, set_ = self.parse_iq(stanza)
except InvalidMamIQ:
return
self.mam_query_ids.remove(query_id)
jid = self.parse_from_jid(stanza)
if start_date:
timestamp = start_date.timestamp()
else:
timestamp = ArchiveState.ALL
last = set_.getTagData('last')
if last is None:
app.nec.push_incoming_event(ev.ArchivingIntervalFinished(
None, event_id=event_id))
app.logger.set_archive_timestamp(
jid, oldest_mam_timestamp=timestamp)
log.info('End of MAM query, no items retrieved')
return
complete = fin.getAttr('complete')
if complete != 'true':
self.request_archive_interval(event_id, start_date, end_date, last)
else:
log.info('query finished')
app.logger.set_archive_timestamp(
jid, oldest_mam_timestamp=timestamp)
app.nec.push_incoming_event(ev.ArchivingIntervalFinished(
None, event_id=event_id, stanza=stanza))
def _received_count(self, conn, stanza, query_id, event_id):
try:
_, set_ = self.parse_iq(stanza)
except InvalidMamIQ:
return
self.mam_query_ids.remove(query_id)
count = set_.getTagData('count')
log.info('message count received: %s', count)
app.nec.push_incoming_event(ev.ArchivingCountReceived(
None, event_id=event_id, count=count))
def _nec_mam_decrypted_message_received(self, obj):
if obj.conn.name != self.name:
return
namespace = self.archiving_namespace
blacklisted = False
if obj.groupchat:
namespace = muc_caps_cache.get_mam_namespace(obj.room_jid)
blacklisted = obj.room_jid in helpers.get_mam_blacklist()
if namespace != nbxmpp.NS_MAM_2 or blacklisted:
# Fallback duplicate search without stanza-id
duplicate = app.logger.search_for_duplicate(
self.name, obj.with_, obj.timestamp, obj.msgtxt)
if duplicate:
# dont propagate the event further
return True
app.logger.insert_into_logs(self.name,
obj.with_,
obj.timestamp,
obj.kind,
unread=False,
message=obj.msgtxt,
contact_name=obj.nick,
additional_data=obj.additional_data,
stanza_id=obj.unique_id)
def get_query_id(self):
query_id = self.connection.getAnID()
self.mam_query_ids.append(query_id)
return query_id
def request_archive_on_signin(self):
own_jid = self.get_own_jid().getStripped()
archive = app.logger.get_archive_timestamp(own_jid)
# Migration of last_mam_id from config to DB
if archive is not None:
mam_id = archive.last_mam_id
else:
mam_id = app.config.get_per('accounts', self.name, 'last_mam_id')
start_date = None
query_id = self.get_query_id()
if mam_id:
log.info('MAM query after: %s', mam_id)
query = self.get_archive_query(query_id, after=mam_id)
else:
# First Start, we request the last week
start_date = datetime.utcnow() - timedelta(days=7)
log.info('First start: query archive start: %s', start_date)
query = self.get_archive_query(query_id, start=start_date)
self._send_archive_query(query, query_id, start_date)
def request_archive_on_muc_join(self, jid):
archive = app.logger.get_archive_timestamp(
jid, type_=JIDConstant.ROOM_TYPE)
query_id = self.get_query_id()
start_date = None
if archive is not None:
log.info('Query Groupchat MAM Archive %s after %s:',
jid, archive.last_mam_id)
query = self.get_archive_query(
query_id, jid=jid, after=archive.last_mam_id)
else:
# First Start, we dont request history
# Depending on what a MUC saves, there could be thousands
# of Messages even in just one day.
start_date = datetime.utcnow() - timedelta(days=1)
log.info('First join: query archive %s from: %s', jid, start_date)
query = self.get_archive_query(query_id, jid=jid, start=start_date)
self._send_archive_query(query, query_id, start_date, groupchat=True)
def request_archive_count(self, event_id, start_date, end_date):
query_id = self.get_query_id()
query = self.get_archive_query(
query_id, start=start_date, end=end_date, max_=0)
self.connection.SendAndCallForResponse(
query, self._received_count, {'query_id': query_id,
'event_id': event_id})
def request_archive_interval(self, event_id, start_date,
end_date, after=None):
query_id = self.get_query_id()
query = self.get_archive_query(query_id, start=start_date,
end=end_date, after=after, max_=30)
app.nec.push_incoming_event(ev.ArchivingQueryID(
None, event_id=event_id, query_id=query_id))
self.connection.SendAndCallForResponse(
query, self._intervall_result_finished, {'query_id': query_id,
'start_date': start_date,
'end_date': end_date,
'event_id': event_id})
def _send_archive_query(self, query, query_id, start_date=None,
groupchat=False):
self.connection.SendAndCallForResponse(
query, self._result_finished, {'query_id': query_id,
'start_date': start_date,
'groupchat': groupchat})
def get_archive_query(self, query_id, jid=None, start=None, end=None, with_=None,
after=None, max_=30):
# Muc archive query?
namespace = muc_caps_cache.get_mam_namespace(jid)
if namespace is None:
# Query to our own archive
namespace = self.archiving_namespace
iq = nbxmpp.Iq('set', to=jid)
query = iq.addChild('query', namespace=namespace)
form = query.addChild(node=nbxmpp.DataForm(typ='submit'))
field = nbxmpp.DataField(typ='hidden',
name='FORM_TYPE',
value=namespace)
form.addChild(node=field)
if start:
field = nbxmpp.DataField(typ='text-single',
name='start',
value=start.strftime('%Y-%m-%dT%H:%M:%SZ'))
form.addChild(node=field)
if end:
field = nbxmpp.DataField(typ='text-single',
name='end',
value=end.strftime('%Y-%m-%dT%H:%M:%SZ'))
form.addChild(node=field)
if with_:
field = nbxmpp.DataField(typ='jid-single', name='with', value=with_)
form.addChild(node=field)
set_ = query.setTag('set', namespace=nbxmpp.NS_RSM)
set_.setTagData('max', max_)
if after:
set_.setTagData('after', after)
query.setAttr('queryid', query_id)
return iq
def request_archive_preferences(self):
if not app.account_is_connected(self.name):
return
iq = nbxmpp.Iq(typ='get')
id_ = self.connection.getAnID()
iq.setID(id_)
iq.addChild(name='prefs', namespace=self.archiving_namespace)
self.connection.send(iq)
def set_archive_preferences(self, items, default):
if not app.account_is_connected(self.name):
return
iq = nbxmpp.Iq(typ='set')
id_ = self.connection.getAnID()
self.iq_answer.append(id_)
iq.setID(id_)
prefs = iq.addChild(name='prefs', namespace=self.archiving_namespace, attrs={'default': default})
always = prefs.addChild(name='always')
never = prefs.addChild(name='never')
for item in items:
jid, preference = item
if preference == 'always':
always.addChild(name='jid').setData(jid)
else:
never.addChild(name='jid').setData(jid)
self.connection.send(iq)
def _ArchiveCB(self, con, iq_obj):
app.nec.push_incoming_event(ev.ArchivingReceivedEvent(None, conn=self,
stanza=iq_obj))
raise nbxmpp.NodeProcessed
class InvalidMamIQ(Exception):
pass