prosody includes the supplied queryid in the `<fin>` tag, but this is not XEP compliant. We set the queryid attr so we can match the MAM Messages to our query. Thats the only purpose, it is not used to identify the iq result at the end of the query. For that purpose is the `id` attr on the `<iq>` node. So dont try to parse `queryid` from the `<fin>` tag.
364 lines
14 KiB
Python
364 lines
14 KiB
Python
# -*- coding:utf-8 -*-
|
|
## src/common/message_archiving.py
|
|
##
|
|
## Copyright (C) 2009 Anaël Verrier <elghinn AT free.fr>
|
|
##
|
|
## This file is part of Gajim.
|
|
##
|
|
## Gajim is free software; you can redistribute it and/or modify
|
|
## it under the terms of the GNU General Public License as published
|
|
## by the Free Software Foundation; version 3 only.
|
|
##
|
|
## Gajim is distributed in the hope that it will be useful,
|
|
## but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
## GNU General Public License for more details.
|
|
##
|
|
## You should have received a copy of the GNU General Public License
|
|
## along with Gajim. If not, see <http://www.gnu.org/licenses/>.
|
|
##
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
import nbxmpp
|
|
|
|
from gajim.common import app
|
|
from gajim.common import ged
|
|
from gajim.common.logger import KindConstant, JIDConstant
|
|
from gajim.common.const import ArchiveState
|
|
from gajim.common.caps_cache import muc_caps_cache
|
|
import gajim.common.connection_handlers_events as ev
|
|
|
|
log = logging.getLogger('gajim.c.message_archiving')
|
|
|
|
|
|
class ConnectionArchive313:
|
|
def __init__(self):
|
|
self.archiving_313_supported = False
|
|
self.mam_awaiting_disco_result = {}
|
|
self.iq_answer = []
|
|
self.mam_query_ids = []
|
|
app.nec.register_incoming_event(ev.MamMessageReceivedEvent)
|
|
app.nec.register_incoming_event(ev.MamGcMessageReceivedEvent)
|
|
app.ged.register_event_handler('agent-info-error-received', ged.CORE,
|
|
self._nec_agent_info_error)
|
|
app.ged.register_event_handler('agent-info-received', ged.CORE,
|
|
self._nec_agent_info)
|
|
app.ged.register_event_handler('mam-decrypted-message-received',
|
|
ged.CORE, self._nec_mam_decrypted_message_received)
|
|
app.ged.register_event_handler(
|
|
'archiving-313-preferences-changed-received', ged.CORE,
|
|
self._nec_archiving_313_preferences_changed_received)
|
|
|
|
def cleanup(self):
|
|
app.ged.remove_event_handler('agent-info-error-received', ged.CORE,
|
|
self._nec_agent_info_error)
|
|
app.ged.remove_event_handler('agent-info-received', ged.CORE,
|
|
self._nec_agent_info)
|
|
app.ged.remove_event_handler('mam-decrypted-message-received',
|
|
ged.CORE, self._nec_mam_decrypted_message_received)
|
|
app.ged.remove_event_handler(
|
|
'archiving-313-preferences-changed-received', ged.CORE,
|
|
self._nec_archiving_313_preferences_changed_received)
|
|
|
|
def _nec_archiving_313_preferences_changed_received(self, obj):
|
|
if obj.id in self.iq_answer:
|
|
obj.answer = True
|
|
|
|
def _nec_agent_info_error(self, obj):
|
|
if obj.jid in self.mam_awaiting_disco_result:
|
|
log.warn('Unable to discover %s, ignoring those logs', obj.jid)
|
|
del self.mam_awaiting_disco_result[obj.jid]
|
|
|
|
def _nec_agent_info(self, obj):
|
|
if obj.jid not in self.mam_awaiting_disco_result:
|
|
return
|
|
|
|
for identity in obj.identities:
|
|
if identity['category'] != 'conference':
|
|
continue
|
|
# it's a groupchat
|
|
for msg_obj in self.mam_awaiting_disco_result[obj.jid]:
|
|
app.logger.insert_jid(msg_obj.with_.getStripped(),
|
|
type_=JIDConstant.ROOM_TYPE)
|
|
app.nec.push_incoming_event(
|
|
ev.MamDecryptedMessageReceivedEvent(
|
|
None, disco=True, **vars(msg_obj)))
|
|
del self.mam_awaiting_disco_result[obj.jid]
|
|
return
|
|
# it's not a groupchat
|
|
for msg_obj in self.mam_awaiting_disco_result[obj.jid]:
|
|
app.logger.insert_jid(msg_obj.with_.getStripped())
|
|
app.nec.push_incoming_event(
|
|
ev.MamDecryptedMessageReceivedEvent(
|
|
None, disco=True, **vars(msg_obj)))
|
|
del self.mam_awaiting_disco_result[obj.jid]
|
|
|
|
@staticmethod
|
|
def parse_iq(stanza):
|
|
if not nbxmpp.isResultNode(stanza):
|
|
log.error('Error on MAM query: %s', stanza.getError())
|
|
raise InvalidMamIQ
|
|
|
|
fin = stanza.getTag('fin')
|
|
if fin is None:
|
|
log.error('Malformed MAM query result received: %s', stanza)
|
|
raise InvalidMamIQ
|
|
|
|
set_ = fin.getTag('set', namespace=nbxmpp.NS_RSM)
|
|
if set_ is None:
|
|
log.error(
|
|
'Malformed MAM query result received (no "set" Node): %s',
|
|
stanza)
|
|
raise InvalidMamIQ
|
|
return fin, set_
|
|
|
|
def parse_from_jid(self, stanza):
|
|
jid = stanza.getFrom()
|
|
if jid is None:
|
|
# No from means, iq from our own archive
|
|
jid = self.get_own_jid().getStripped()
|
|
else:
|
|
jid = jid.getStripped()
|
|
return jid
|
|
|
|
def _result_finished(self, conn, stanza, query_id, start_date, groupchat):
|
|
try:
|
|
fin, set_ = self.parse_iq(stanza)
|
|
except InvalidMamIQ:
|
|
return
|
|
|
|
last = set_.getTagData('last')
|
|
if last is None:
|
|
log.info('End of MAM query, no items retrieved')
|
|
return
|
|
|
|
jid = self.parse_from_jid(stanza)
|
|
complete = fin.getAttr('complete')
|
|
app.logger.set_archive_timestamp(jid, last_mam_id=last)
|
|
if complete != 'true':
|
|
self.mam_query_ids.remove(query_id)
|
|
query_id = self.get_query_id()
|
|
query = self.get_archive_query(query_id, jid=jid, after=last)
|
|
self._send_archive_query(query, query_id, groupchat=groupchat)
|
|
else:
|
|
self.mam_query_ids.remove(query_id)
|
|
if start_date is not None:
|
|
app.logger.set_archive_timestamp(
|
|
jid,
|
|
last_mam_id=last,
|
|
oldest_mam_timestamp=start_date.timestamp())
|
|
log.info('End of MAM query, last mam id: %s', last)
|
|
|
|
def _intervall_result_finished(self, conn, stanza, query_id,
|
|
start_date, end_date, event_id):
|
|
try:
|
|
fin, set_ = self.parse_iq(stanza)
|
|
except InvalidMamIQ:
|
|
return
|
|
|
|
self.mam_query_ids.remove(query_id)
|
|
jid = self.parse_from_jid(stanza)
|
|
if start_date:
|
|
timestamp = start_date.timestamp()
|
|
else:
|
|
timestamp = ArchiveState.ALL
|
|
|
|
last = set_.getTagData('last')
|
|
if last is None:
|
|
app.nec.push_incoming_event(ev.ArchivingIntervalFinished(
|
|
None, event_id=event_id))
|
|
app.logger.set_archive_timestamp(
|
|
jid, oldest_mam_timestamp=timestamp)
|
|
log.info('End of MAM query, no items retrieved')
|
|
return
|
|
|
|
complete = fin.getAttr('complete')
|
|
if complete != 'true':
|
|
self.request_archive_interval(event_id, start_date, end_date, last)
|
|
else:
|
|
log.info('query finished')
|
|
app.logger.set_archive_timestamp(
|
|
jid, oldest_mam_timestamp=timestamp)
|
|
app.nec.push_incoming_event(ev.ArchivingIntervalFinished(
|
|
None, event_id=event_id, stanza=stanza))
|
|
|
|
def _received_count(self, conn, stanza, query_id, event_id):
|
|
try:
|
|
_, set_ = self.parse_iq(stanza)
|
|
except InvalidMamIQ:
|
|
return
|
|
|
|
self.mam_query_ids.remove(query_id)
|
|
|
|
count = set_.getTagData('count')
|
|
log.info('message count received: %s', count)
|
|
app.nec.push_incoming_event(ev.ArchivingCountReceived(
|
|
None, event_id=event_id, count=count))
|
|
|
|
def _nec_mam_decrypted_message_received(self, obj):
|
|
if obj.conn.name != self.name:
|
|
return
|
|
# if self.archiving_namespace != nbxmpp.NS_MAM_2:
|
|
# Fallback duplicate search without stanza-id
|
|
duplicate = app.logger.search_for_duplicate(
|
|
obj.with_, obj.timestamp, obj.msgtxt)
|
|
if duplicate:
|
|
# dont propagate the event further
|
|
return True
|
|
app.logger.insert_into_logs(self.name,
|
|
obj.with_,
|
|
obj.timestamp,
|
|
obj.kind,
|
|
unread=False,
|
|
message=obj.msgtxt,
|
|
contact_name=obj.nick,
|
|
additional_data=obj.additional_data,
|
|
stanza_id=obj.unique_id)
|
|
|
|
def get_query_id(self):
|
|
query_id = self.connection.getAnID()
|
|
self.mam_query_ids.append(query_id)
|
|
return query_id
|
|
|
|
def request_archive_on_signin(self):
|
|
own_jid = self.get_own_jid().getStripped()
|
|
archive = app.logger.get_archive_timestamp(own_jid)
|
|
|
|
# Migration of last_mam_id from config to DB
|
|
if archive is not None:
|
|
mam_id = archive.last_mam_id
|
|
else:
|
|
mam_id = app.config.get_per('accounts', self.name, 'last_mam_id')
|
|
|
|
start_date = None
|
|
query_id = self.get_query_id()
|
|
if mam_id:
|
|
log.info('MAM query after: %s', mam_id)
|
|
query = self.get_archive_query(query_id, after=mam_id)
|
|
else:
|
|
# First Start, we request the last week
|
|
start_date = datetime.utcnow() - timedelta(days=7)
|
|
log.info('First start: query archive start: %s', start_date)
|
|
query = self.get_archive_query(query_id, start=start_date)
|
|
self._send_archive_query(query, query_id, start_date)
|
|
|
|
def request_archive_on_muc_join(self, jid):
|
|
archive = app.logger.get_archive_timestamp(
|
|
jid, type_=JIDConstant.ROOM_TYPE)
|
|
query_id = self.get_query_id()
|
|
start_date = None
|
|
if archive is not None:
|
|
log.info('Query Groupchat MAM Archive %s after %s:',
|
|
jid, archive.last_mam_id)
|
|
query = self.get_archive_query(
|
|
query_id, jid=jid, after=archive.last_mam_id)
|
|
else:
|
|
# First Start, we dont request history
|
|
# Depending on what a MUC saves, there could be thousands
|
|
# of Messages even in just one day.
|
|
start_date = datetime.utcnow() - timedelta(days=1)
|
|
log.info('First join: query archive %s from: %s', jid, start_date)
|
|
query = self.get_archive_query(query_id, jid=jid, start=start_date)
|
|
self._send_archive_query(query, query_id, start_date, groupchat=True)
|
|
|
|
def request_archive_count(self, event_id, start_date, end_date):
|
|
query_id = self.get_query_id()
|
|
query = self.get_archive_query(
|
|
query_id, start=start_date, end=end_date, max_=0)
|
|
self.connection.SendAndCallForResponse(
|
|
query, self._received_count, {'query_id': query_id,
|
|
'event_id': event_id})
|
|
|
|
def request_archive_interval(self, event_id, start_date,
|
|
end_date, after=None):
|
|
query_id = self.get_query_id()
|
|
query = self.get_archive_query(query_id, start=start_date,
|
|
end=end_date, after=after, max_=30)
|
|
app.nec.push_incoming_event(ev.ArchivingQueryID(
|
|
None, event_id=event_id, query_id=query_id))
|
|
self.connection.SendAndCallForResponse(
|
|
query, self._intervall_result_finished, {'query_id': query_id,
|
|
'start_date': start_date,
|
|
'end_date': end_date,
|
|
'event_id': event_id})
|
|
|
|
def _send_archive_query(self, query, query_id, start_date=None,
|
|
groupchat=False):
|
|
self.connection.SendAndCallForResponse(
|
|
query, self._result_finished, {'query_id': query_id,
|
|
'start_date': start_date,
|
|
'groupchat': groupchat})
|
|
|
|
def get_archive_query(self, query_id, jid=None, start=None, end=None, with_=None,
|
|
after=None, max_=30):
|
|
# Muc archive query?
|
|
namespace = muc_caps_cache.get_mam_namespace(jid)
|
|
if namespace is None:
|
|
# Query to our own archive
|
|
namespace = self.archiving_namespace
|
|
|
|
iq = nbxmpp.Iq('set', to=jid)
|
|
query = iq.addChild('query', namespace=namespace)
|
|
form = query.addChild(node=nbxmpp.DataForm(typ='submit'))
|
|
field = nbxmpp.DataField(typ='hidden',
|
|
name='FORM_TYPE',
|
|
value=namespace)
|
|
form.addChild(node=field)
|
|
if start:
|
|
field = nbxmpp.DataField(typ='text-single',
|
|
name='start',
|
|
value=start.strftime('%Y-%m-%dT%H:%M:%SZ'))
|
|
form.addChild(node=field)
|
|
if end:
|
|
field = nbxmpp.DataField(typ='text-single',
|
|
name='end',
|
|
value=end.strftime('%Y-%m-%dT%H:%M:%SZ'))
|
|
form.addChild(node=field)
|
|
if with_:
|
|
field = nbxmpp.DataField(typ='jid-single', name='with', value=with_)
|
|
form.addChild(node=field)
|
|
|
|
set_ = query.setTag('set', namespace=nbxmpp.NS_RSM)
|
|
set_.setTagData('max', max_)
|
|
if after:
|
|
set_.setTagData('after', after)
|
|
query.setAttr('queryid', query_id)
|
|
return iq
|
|
|
|
def request_archive_preferences(self):
|
|
if not app.account_is_connected(self.name):
|
|
return
|
|
iq = nbxmpp.Iq(typ='get')
|
|
id_ = self.connection.getAnID()
|
|
iq.setID(id_)
|
|
iq.addChild(name='prefs', namespace=self.archiving_namespace)
|
|
self.connection.send(iq)
|
|
|
|
def set_archive_preferences(self, items, default):
|
|
if not app.account_is_connected(self.name):
|
|
return
|
|
iq = nbxmpp.Iq(typ='set')
|
|
id_ = self.connection.getAnID()
|
|
self.iq_answer.append(id_)
|
|
iq.setID(id_)
|
|
prefs = iq.addChild(name='prefs', namespace=self.archiving_namespace, attrs={'default': default})
|
|
always = prefs.addChild(name='always')
|
|
never = prefs.addChild(name='never')
|
|
for item in items:
|
|
jid, preference = item
|
|
if preference == 'always':
|
|
always.addChild(name='jid').setData(jid)
|
|
else:
|
|
never.addChild(name='jid').setData(jid)
|
|
self.connection.send(iq)
|
|
|
|
def _ArchiveCB(self, con, iq_obj):
|
|
app.nec.push_incoming_event(ev.ArchivingReceivedEvent(None, conn=self,
|
|
stanza=iq_obj))
|
|
raise nbxmpp.NodeProcessed
|
|
|
|
|
|
class InvalidMamIQ(Exception):
|
|
pass
|