diff --git a/gajim/common/connection_handlers_events.py b/gajim/common/connection_handlers_events.py index d47abe915..63e4c90eb 100644 --- a/gajim/common/connection_handlers_events.py +++ b/gajim/common/connection_handlers_events.py @@ -1860,6 +1860,27 @@ class ArchivingErrorReceivedEvent(nec.NetworkIncomingEvent): self.error_msg = self.stanza.getErrorMsg() return True +class ArchivingCountReceived(nec.NetworkIncomingEvent): + name = 'archiving-count-received' + base_network_events = [] + + def generate(self): + return True + +class ArchivingIntervalFinished(nec.NetworkIncomingEvent): + name = 'archiving-interval-finished' + base_network_events = [] + + def generate(self): + return True + +class ArchivingQueryID(nec.NetworkIncomingEvent): + name = 'archiving-query-id' + base_network_events = [] + + def generate(self): + return True + class Archiving313PreferencesChangedReceivedEvent(nec.NetworkIncomingEvent): name = 'archiving-313-preferences-changed-received' base_network_events = ['archiving-received'] diff --git a/gajim/common/const.py b/gajim/common/const.py index 2c8cb1ce2..5f48810d6 100644 --- a/gajim/common/const.py +++ b/gajim/common/const.py @@ -36,6 +36,10 @@ class AvatarSize(IntEnum): TOOLTIP = 125 VCARD = 200 +class ArchiveState(IntEnum): + NEVER = 0 + ALL = 1 + THANKS = u"""\ Alexander Futász diff --git a/gajim/common/message_archiving.py b/gajim/common/message_archiving.py index 60312f07c..6a18d9f0e 100644 --- a/gajim/common/message_archiving.py +++ b/gajim/common/message_archiving.py @@ -26,6 +26,7 @@ import nbxmpp from gajim.common import app from gajim.common import ged from gajim.common.logger import KindConstant, JIDConstant +from gajim.common.const import ArchiveState import gajim.common.connection_handlers_events as ev log = logging.getLogger('gajim.c.message_archiving') @@ -93,25 +94,33 @@ class ConnectionArchive313: None, disco=True, **vars(msg_obj))) del self.mam_awaiting_disco_result[obj.jid] - def _result_finished(self, conn, stanza, query_id, start_date, groupchat): + @staticmethod + def parse_iq(stanza, query_id): if not nbxmpp.isResultNode(stanza): log.error('Error on MAM query: %s', stanza.getError()) - return + raise InvalidMamIQ fin = stanza.getTag('fin') if fin is None: log.error('Malformed MAM query result received: %s', stanza) - return + raise InvalidMamIQ if fin.getAttr('queryid') != query_id: log.error('Result with unknown query id received') - return + raise InvalidMamIQ set_ = fin.getTag('set', namespace=nbxmpp.NS_RSM) if set_ is None: log.error( 'Malformed MAM query result received (no "set" Node): %s', stanza) + raise InvalidMamIQ + return fin, set_ + + def _result_finished(self, conn, stanza, query_id, start_date, groupchat): + try: + fin, set_ = self.parse_iq(stanza, query_id) + except InvalidMamIQ: return last = set_.getTagData('last') @@ -123,9 +132,10 @@ class ConnectionArchive313: complete = fin.getAttr('complete') app.logger.set_archive_timestamp(jid, last_mam_id=last) if complete != 'true': + self.mam_query_ids.remove(query_id) query_id = self.get_query_id() query = self.get_archive_query(query_id, after=last) - self.send_archive_query(query, query_id, groupchat=groupchat) + self._send_archive_query(query, query_id, groupchat=groupchat) else: self.mam_query_ids.remove(query_id) if start_date is not None: @@ -135,6 +145,52 @@ class ConnectionArchive313: oldest_mam_timestamp=start_date.timestamp()) log.info('End of MAM query, last mam id: %s', last) + def _intervall_result_finished(self, conn, stanza, query_id, + start_date, end_date, event_id): + try: + fin, set_ = self.parse_iq(stanza, query_id) + except InvalidMamIQ: + return + + self.mam_query_ids.remove(query_id) + jid = str(stanza.getFrom()) + if start_date: + timestamp = start_date.timestamp() + else: + timestamp = ArchiveState.ALL + + last = set_.getTagData('last') + if last is None: + app.nec.push_incoming_event(ev.ArchivingIntervalFinished( + None, event_id=event_id)) + app.logger.set_archive_timestamp( + jid, oldest_mam_timestamp=timestamp) + log.info('End of MAM query, no items retrieved') + return + + complete = fin.getAttr('complete') + if complete != 'true': + self.request_archive_interval(event_id, start_date, end_date, last) + else: + log.info('query finished') + app.logger.set_archive_timestamp( + jid, oldest_mam_timestamp=timestamp) + app.nec.push_incoming_event(ev.ArchivingIntervalFinished( + None, event_id=event_id, stanza=stanza)) + + def _received_count(self, conn, stanza, query_id, event_id): + try: + _, set_ = self.parse_iq(stanza, query_id) + except InvalidMamIQ: + return + + self.mam_query_ids.remove(query_id) + + count = set_.getTagData('count') + log.info('message count received: %s', count) + app.nec.push_incoming_event(ev.ArchivingCountReceived( + None, event_id=event_id, count=count)) + def _nec_mam_decrypted_message_received(self, obj): if obj.conn.name != self.name: return @@ -173,17 +229,18 @@ class ConnectionArchive313: start_date = None query_id = self.get_query_id() if mam_id: - log.info('MAM query after %s:', mam_id) + log.info('MAM query after: %s', mam_id) query = self.get_archive_query(query_id, after=mam_id) else: # First Start, we request the last week start_date = datetime.utcnow() - timedelta(days=7) log.info('First start: query archive start: %s', start_date) query = self.get_archive_query(query_id, start=start_date) - self.send_archive_query(query, query_id, start_date) + self._send_archive_query(query, query_id, start_date) def request_archive_on_muc_join(self, jid): - archive = app.logger.get_archive_timestamp(jid) + archive = app.logger.get_archive_timestamp( + jid, type_=JIDConstant.ROOM_TYPE) query_id = self.get_query_id() start_date = None if archive is not None: @@ -196,10 +253,31 @@ class ConnectionArchive313: start_date = datetime.utcnow() - timedelta(days=30) log.info('First join: query archive %s from: %s', jid, start_date) query = self.get_archive_query(query_id, jid=jid, start=start_date) - self.send_archive_query(query, query_id, start_date, groupchat=True) + self._send_archive_query(query, query_id, start_date, groupchat=True) - def send_archive_query(self, query, query_id, start_date=None, - groupchat=False): + def request_archive_count(self, event_id, start_date, end_date): + query_id = self.get_query_id() + query = self.get_archive_query( + query_id, start=start_date, end=end_date, max_=0) + self.connection.SendAndCallForResponse( + query, self._received_count, {'query_id': query_id, + 'event_id': event_id}) + + def request_archive_interval(self, event_id, start_date, + end_date, after=None): + query_id = self.get_query_id() + query = self.get_archive_query(query_id, start=start_date, + end=end_date, after=after, max_=30) + app.nec.push_incoming_event(ev.ArchivingQueryID( + None, event_id=event_id, query_id=query_id)) + self.connection.SendAndCallForResponse( + query, self._intervall_result_finished, {'query_id': query_id, + 'start_date': start_date, + 'end_date': end_date, + 'event_id': event_id}) + + def _send_archive_query(self, query, query_id, start_date=None, + groupchat=False): self.connection.SendAndCallForResponse( query, self._result_finished, {'query_id': query_id, 'start_date': start_date, @@ -267,3 +345,7 @@ class ConnectionArchive313: app.nec.push_incoming_event(ev.ArchivingReceivedEvent(None, conn=self, stanza=iq_obj)) raise nbxmpp.NodeProcessed + + +class InvalidMamIQ(Exception): + pass diff --git a/gajim/history_sync.py b/gajim/history_sync.py index 51d0e7dad..248339e55 100644 --- a/gajim/history_sync.py +++ b/gajim/history_sync.py @@ -19,7 +19,7 @@ import logging from enum import IntEnum -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta import nbxmpp from gi.repository import Gtk, GLib @@ -27,17 +27,16 @@ from gi.repository import Gtk, GLib from gajim.common import app from gajim.common import ged from gajim.gtkgui_helpers import get_icon_pixmap +from gajim.common.const import ArchiveState log = logging.getLogger('gajim.c.message_archiving') + class Pages(IntEnum): TIME = 0 SYNC = 1 SUMMARY = 2 -class ArchiveState(IntEnum): - NEVER = 0 - ALL = 1 class HistorySyncAssistant(Gtk.Assistant): def __init__(self, account, parent): @@ -52,13 +51,22 @@ class HistorySyncAssistant(Gtk.Assistant): self.timedelta = None self.now = datetime.utcnow() self.query_id = None - self.count_query_id = None self.start = None self.end = None self.next = None self.hide_buttons() + self.event_id = id(self) + + own_jid = self.con.get_own_jid().getStripped() + archive = app.logger.get_archive_timestamp(own_jid) + + if archive is not None: + mam_start = float(archive.oldest_mam_timestamp) + else: + # Migration from old config value + mam_start = app.config.get_per( + 'accounts', account, 'mam_start_date') - mam_start = app.config.get_per('accounts', account, 'mam_start_date') if not mam_start or mam_start == ArchiveState.NEVER: self.current_start = self.now elif mam_start == ArchiveState.ALL: @@ -72,7 +80,8 @@ class HistorySyncAssistant(Gtk.Assistant): self.download_history = DownloadHistoryPage(self) self.append_page(self.download_history) - self.set_page_type(self.download_history, Gtk.AssistantPageType.PROGRESS) + self.set_page_type(self.download_history, + Gtk.AssistantPageType.PROGRESS) self.set_page_complete(self.download_history, True) self.summary = SummaryPage(self) @@ -80,12 +89,18 @@ class HistorySyncAssistant(Gtk.Assistant): self.set_page_type(self.summary, Gtk.AssistantPageType.SUMMARY) self.set_page_complete(self.summary, True) - app.ged.register_event_handler('archiving-finished', - ged.PRECORE, - self._nec_archiving_finished) + app.ged.register_event_handler('archiving-count-received', + ged.GUI1, + self._received_count) + app.ged.register_event_handler('archiving-query-id', + ged.GUI1, + self._new_query_id) + app.ged.register_event_handler('archiving-interval-finished', + ged.GUI1, + self._received_finished) app.ged.register_event_handler('raw-mam-message-received', - ged.PRECORE, - self._nec_mam_message_received) + ged.PRECORE, + self._nec_mam_message_received) self.connect('prepare', self.on_page_change) self.connect('destroy', self.on_destroy) @@ -96,9 +111,9 @@ class HistorySyncAssistant(Gtk.Assistant): self.set_current_page(Pages.SUMMARY) self.summary.nothing_to_do() - if self.con.mam_query_id: - self.set_current_page(Pages.SUMMARY) - self.summary.query_already_running() + # if self.con.mam_query_ids: + # self.set_current_page(Pages.SUMMARY) + # self.summary.query_already_running() self.show_all() @@ -129,27 +144,42 @@ class HistorySyncAssistant(Gtk.Assistant): self.start = self.now - self.timedelta self.end = self.current_start - log.info('config: get mam_start_date: %s', self.current_start) + log.info('get mam_start_date: %s', self.current_start) log.info('now: %s', self.now) log.info('start: %s', self.start) log.info('end: %s', self.end) - self.query_count() + self.con.request_archive_count(self.event_id, self.start, self.end) - def query_count(self): - self.count_query_id = self.con.connection.getAnID() - self.con.request_archive(self.count_query_id, - start=self.start, - end=self.end, - max_=0) + def _received_count(self, event): + if event.event_id != self.event_id: + return + if event.count is not None: + self.download_history.count = int(event.count) + self.con.request_archive_interval(self.event_id, self.start, self.end) - def query_messages(self, last=None): - self.query_id = self.con.connection.getAnID() - self.con.request_archive(self.query_id, - start=self.start, - end=self.end, - after=last, - max_=30) + def _received_finished(self, event): + if event.event_id != self.event_id: + return + log.info('query finished') + GLib.idle_add(self.download_history.finished) + self.set_current_page(Pages.SUMMARY) + self.summary.finished() + + def _new_query_id(self, event): + if event.event_id != self.event_id: + return + self.query_id = event.query_id + + def _nec_mam_message_received(self, obj): + if obj.conn.name != self.account: + return + + if obj.result.getAttr('queryid') != self.query_id: + return + + log.debug('received message') + GLib.idle_add(self.download_history.set_fraction) def on_row_selected(self, listbox, row): self.timedelta = row.get_child().get_delta() @@ -164,71 +194,23 @@ class HistorySyncAssistant(Gtk.Assistant): self.prepare_query() def on_destroy(self, *args): - app.ged.remove_event_handler('archiving-finished', - ged.PRECORE, - self._nec_archiving_finished) + app.ged.remove_event_handler('archiving-count-received', + ged.GUI1, + self._received_count) + app.ged.remove_event_handler('archiving-query-id', + ged.GUI1, + self._new_query_id) + app.ged.remove_event_handler('archiving-interval-finished', + ged.GUI1, + self._received_finished) app.ged.remove_event_handler('raw-mam-message-received', - ged.PRECORE, - self._nec_mam_message_received) + ged.PRECORE, + self._nec_mam_message_received) del app.interface.instances[self.account]['history_sync'] def on_close_clicked(self, *args): self.destroy() - def _nec_mam_message_received(self, obj): - if obj.conn.name != self.account: - return - - if obj.result.getAttr('queryid') != self.query_id: - return - - log.debug('received message') - GLib.idle_add(self.download_history.set_fraction) - - def _nec_archiving_finished(self, obj): - if obj.conn.name != self.account: - return - - if obj.query_id not in (self.query_id, self.count_query_id): - return - - set_ = obj.fin.getTag('set', namespace=nbxmpp.NS_RSM) - if not set_: - log.error('invalid result') - log.error(obj.fin) - return - - if obj.query_id == self.count_query_id: - count = set_.getTagData('count') - log.info('message count received: %s', count) - if count: - self.download_history.count = int(count) - self.query_messages() - return - - if obj.query_id == self.query_id: - last = set_.getTagData('last') - complete = obj.fin.getAttr('complete') - if not last and complete != 'true': - log.error('invalid result') - log.error(obj.fin) - return - - if complete != 'true': - self.query_messages(last) - else: - log.info('query finished') - GLib.idle_add(self.download_history.finished) - if self.start: - timestamp = self.start.timestamp() - else: - timestamp = ArchiveState.ALL - app.config.set_per('accounts', self.account, - 'mam_start_date', timestamp) - log.debug('config: set mam_start_date: %s', timestamp) - self.set_current_page(Pages.SUMMARY) - self.summary.finished() - class SelectTimePage(Gtk.Box): def __init__(self, assistant): @@ -257,6 +239,7 @@ class SelectTimePage(Gtk.Box): self.pack_start(label, True, True, 0) self.pack_start(listbox, False, False, 0) + class DownloadHistoryPage(Gtk.Box): def __init__(self, assistant): super().__init__(orientation=Gtk.Orientation.VERTICAL) @@ -292,6 +275,7 @@ class DownloadHistoryPage(Gtk.Box): def finished(self): self.progress.set_fraction(1) + class SummaryPage(Gtk.Box): def __init__(self, assistant): super().__init__(orientation=Gtk.Orientation.VERTICAL) @@ -307,25 +291,26 @@ class SummaryPage(Gtk.Box): def finished(self): received = self.assistant.download_history.received finished = _(''' - Finshed synchronising your History. + Finshed synchronising your History. {received} Messages downloaded. '''.format(received=received)) self.label.set_text(finished) def nothing_to_do(self): nothing_to_do = _(''' - Gajim is fully synchronised + Gajim is fully synchronised with the Archive. ''') self.label.set_text(nothing_to_do) def query_already_running(self): already_running = _(''' - There is already a synchronisation in + There is already a synchronisation in progress. Please try later. ''') self.label.set_text(already_running) + class TimeOption(Gtk.Label): def __init__(self, label, months=None): super().__init__(label=label)