Refactor message handlers

- Adapt to nbxmpp now unwraping MAM messages
- Use nbxmpp properties
- Save message-id to database
This commit is contained in:
Philipp Hörist 2019-01-04 00:03:35 +01:00
parent cdb37828e4
commit b600328639
5 changed files with 125 additions and 219 deletions

View File

@ -72,6 +72,7 @@ LOGS_SQL_STATEMENT = '''
subject TEXT,
additional_data TEXT,
stanza_id TEXT,
message_id TEXT,
encryption TEXT,
encryption_state TEXT,
marker INTEGER
@ -228,6 +229,13 @@ class Logger:
]
self._execute_multiple(con, statements)
if self._get_user_version(con) < 3:
statements = [
'ALTER TABLE logs ADD COLUMN "message_id" TEXT',
'PRAGMA user_version=3'
]
self._execute_multiple(con, statements)
def _migrate_cache(self, con):
if self._get_user_version(con) == 0:
# All migrations from 0.16.9 until 1.0.0

View File

@ -19,6 +19,7 @@ import time
from datetime import datetime, timedelta
import nbxmpp
from nbxmpp.structs import StanzaHandler
from gajim.common import app
from gajim.common.nec import NetworkEvent
@ -33,8 +34,6 @@ from gajim.common.modules.misc import parse_delay
from gajim.common.modules.misc import parse_oob
from gajim.common.modules.misc import parse_correction
from gajim.common.modules.misc import parse_eme
from gajim.common.modules.util import is_self_message
from gajim.common.modules.util import is_muc_pm
log = logging.getLogger('gajim.c.m.archiving')
@ -45,8 +44,9 @@ class MAM:
self._account = con.name
self.handlers = [
('message', self._mam_message_received, '', nbxmpp.NS_MAM_1),
('message', self._mam_message_received, '', nbxmpp.NS_MAM_2),
StanzaHandler(name='message',
callback=self._mam_message_received,
priority=51),
]
self.available = False
@ -72,132 +72,91 @@ class MAM:
account=self._account,
feature=self.archiving_namespace))
def _from_valid_archive(self, stanza, message, groupchat):
if groupchat:
expected_archive = message.getFrom()
def _from_valid_archive(self, stanza, properties):
if properties.type.is_groupchat:
expected_archive = properties.jid
else:
expected_archive = self._con.get_own_jid()
archive_jid = stanza.getFrom()
if archive_jid is None:
if groupchat:
return
# Message from our own archive
return self._con.get_own_jid()
return properties.mam.archive.bareMatch(expected_archive)
if archive_jid.bareMatch(expected_archive):
return archive_jid
def _get_unique_id(self, properties):
if properties.type.is_groupchat:
return properties.mam.id, None
def _get_unique_id(self, result, message, groupchat, self_message, muc_pm):
stanza_id = result.getAttr('id')
if groupchat:
return stanza_id, None
if properties.is_self_message:
return None, properties.id
origin_id = message.getOriginID()
if self_message:
return None, origin_id
if properties.is_muc_pm:
return properties.mam.id, properties.id
if muc_pm:
return stanza_id, origin_id
if self._con.get_own_jid().bareMatch(message.getFrom()):
if self._con.get_own_jid().bareMatch(properties.jid):
# message we sent
return stanza_id, origin_id
return properties.mam.id, properties.id
# A message we received
return stanza_id, None
return properties.mam.id, None
def _mam_message_received(self, _con, stanza, properties):
if not properties.is_mam_message:
return
def _mam_message_received(self, _con, stanza):
app.nec.push_incoming_event(
NetworkIncomingEvent('raw-mam-message-received',
conn=self._con,
stanza=stanza))
result = stanza.getTag('result', protocol=True)
queryid = result.getAttr('queryid')
forwarded = result.getTag('forwarded',
namespace=nbxmpp.NS_FORWARD,
protocol=True)
message = forwarded.getTag('message', protocol=True)
groupchat = message.getType() == 'groupchat'
archive_jid = self._from_valid_archive(stanza, message, groupchat)
if archive_jid is None:
log.warning('Message from invalid archive %s', stanza)
if not self._from_valid_archive(stanza, properties):
log.warning('Message from invalid archive %s',
properties.mam.archive)
raise nbxmpp.NodeProcessed
log.info('Received message from archive: %s', archive_jid)
if not self._is_valid_request(archive_jid, queryid):
log.warning('Invalid MAM Message: unknown query id')
log.info('Received message from archive: %s', properties.mam.archive)
if not self._is_valid_request(properties):
log.warning('Invalid MAM Message: unknown query id %s',
properties.mam.query_id)
log.debug(stanza)
raise nbxmpp.NodeProcessed
# Timestamp parsing
# Most servers dont set the 'from' attr, so we cant check for it
delay_timestamp = parse_delay(forwarded)
if delay_timestamp is None:
log.warning('No timestamp on MAM message')
log.warning(stanza)
raise nbxmpp.NodeProcessed
# Fix for self messaging
if not groupchat:
to = message.getTo()
if to is None:
# Some servers dont set the 'to' attribute when
# we send a message to ourself
message.setTo(self._con.get_own_jid())
event_attrs = {}
groupchat = properties.type.is_groupchat
if groupchat:
event_attrs.update(self._parse_gc_attrs(message))
event_attrs.update(self._parse_gc_attrs(properties))
else:
event_attrs.update(self._parse_chat_attrs(message))
event_attrs.update(self._parse_chat_attrs(stanza, properties))
self_message = is_self_message(message, groupchat)
muc_pm = is_muc_pm(message, event_attrs['with_'], groupchat)
stanza_id, message_id = self._get_unique_id(properties)
stanza_id, origin_id = self._get_unique_id(
result, message, groupchat, self_message, muc_pm)
message_id = message.getID()
# Check for duplicates
namespace = self.archiving_namespace
if groupchat:
namespace = muc_caps_cache.get_mam_namespace(
archive_jid.getStripped())
if namespace == nbxmpp.NS_MAM_2:
if properties.mam.is_ver_2:
# Search only with stanza-id for duplicates on mam:2
if app.logger.find_stanza_id(self._account,
archive_jid.getStripped(),
str(properties.mam.archive),
stanza_id,
origin_id,
message_id,
groupchat=groupchat):
log.info('Found duplicate with stanza-id')
log.info('Found duplicate with stanza-id: %s, message-id: %s',
stanza_id, message_id)
raise nbxmpp.NodeProcessed
msgtxt = message.getTagData('body')
event_attrs.update(
{'conn': self._con,
'additional_data': AdditionalDataDict(),
'encrypted': False,
'timestamp': delay_timestamp,
'self_message': self_message,
'timestamp': properties.mam.timestamp,
'self_message': properties.is_self_message,
'groupchat': groupchat,
'muc_pm': muc_pm,
'muc_pm': properties.is_muc_pm,
'stanza_id': stanza_id,
'origin_id': origin_id,
'message_id': message_id,
'origin_id': message_id,
'message_id': properties.id,
'correct_id': None,
'archive_jid': archive_jid,
'msgtxt': msgtxt,
'message': message,
'stanza': message,
'namespace': namespace,
'archive_jid': properties.mam.archive,
'msgtxt': properties.body,
'message': stanza,
'stanza': stanza,
'namespace': properties.mam.namespace,
})
if groupchat:
@ -217,26 +176,18 @@ class MAM:
raise nbxmpp.NodeProcessed
@staticmethod
def _parse_gc_attrs(message):
with_ = message.getFrom()
nick = message.getFrom().getResource()
# Get the real jid if we have it
def _parse_gc_attrs(properties):
real_jid = None
muc_user = message.getTag('x', namespace=nbxmpp.NS_MUC_USER)
if muc_user is not None:
real_jid = muc_user.getTagAttr('item', 'jid')
if real_jid is not None:
real_jid = nbxmpp.JID(real_jid)
return {'with_': with_,
'nick': nick,
if properties.muc_user is not None:
real_jid = properties.muc_user.jid
return {'with_': properties.jid,
'nick': properties.muc_nickname,
'real_jid': real_jid,
'kind': KindConstant.GC_MSG}
def _parse_chat_attrs(self, message):
frm = message.getFrom()
to = message.getTo()
def _parse_chat_attrs(self, stanza, properties):
frm = properties.jid
to = stanza.getTo()
if frm.bareMatch(self._con.get_own_jid()):
with_ = to
kind = KindConstant.CHAT_MSG_SENT
@ -290,17 +241,15 @@ class MAM:
message=event.msgtxt,
contact_name=event.nick,
additional_data=event.additional_data,
stanza_id=stanza_id)
stanza_id=stanza_id,
message_id=event.message_id)
app.nec.push_incoming_event(
MamDecryptedMessageReceived(None, **vars(event)))
def _is_valid_request(self, jid, query_id):
if query_id is None:
return False
valid_id = self._mam_query_ids.get(jid.getStripped(), None)
return valid_id == query_id
def _is_valid_request(self, properties):
valid_id = self._mam_query_ids.get(str(properties.mam.archive), None)
return valid_id == properties.mam.query_id
def _get_query_id(self, jid):
query_id = self._con.connection.getAnID()

View File

@ -18,9 +18,10 @@ import time
import logging
import nbxmpp
from nbxmpp.structs import StanzaHandler
from nbxmpp.const import MessageType
from gajim.common import app
from gajim.common import helpers
from gajim.common import caps_cache
from gajim.common.i18n import _
from gajim.common.nec import NetworkIncomingEvent
@ -36,8 +37,6 @@ from gajim.common.modules.misc import parse_attention
from gajim.common.modules.misc import parse_form
from gajim.common.modules.misc import parse_oob
from gajim.common.modules.misc import parse_xhtml
from gajim.common.modules.util import is_self_message
from gajim.common.modules.util import is_muc_pm
from gajim.common.connection_handlers_events import MessageErrorEvent
@ -49,30 +48,26 @@ class Message:
self._con = con
self._account = con.name
self.handlers = [('message', self._message_received)]
self.handlers = [
StanzaHandler(name='message',
callback=self._message_received,
priority=50),
]
# XEPs for which this message module should not be executed
self._message_namespaces = set([nbxmpp.NS_PUBSUB_EVENT,
nbxmpp.NS_ROSTERX,
nbxmpp.NS_MAM_1,
nbxmpp.NS_MAM_2,
nbxmpp.NS_CONFERENCE,
nbxmpp.NS_IBB,
nbxmpp.NS_CAPTCHA,])
nbxmpp.NS_IBB])
def _message_received(self, _con, stanza, properties):
if properties.is_mam_message:
return
# Check if a child of the message contains any
# namespaces that we handle in other modules.
# nbxmpp executes less common handlers last
if self._message_namespaces & set(stanza.getProperties()):
return
muc_user = stanza.getTag('x', namespace=nbxmpp.NS_MUC_USER)
if muc_user is not None and (stanza.getType() != 'error'):
if muc_user.getChildren():
# Not a PM, handled by MUC module
return
log.info('Received from %s', stanza.getFrom())
app.nec.push_incoming_event(NetworkEvent(
@ -87,49 +82,36 @@ class Message:
# Ugly, we treat the from attr as the remote jid,
# to make that work with sent carbons we have to do this.
# TODO: Check where in Gajim and plugins we depend on that behavior
stanza.setFrom(stanza.getTo().getBare())
stanza.setFrom(stanza.getTo())
from_ = stanza.getFrom()
type_ = stanza.getType()
if type_ is None:
type_ = 'normal'
fjid = str(from_)
jid = from_.getBare()
resource = from_.getResource()
self_message = is_self_message(stanza, type_ == 'groupchat')
muc_pm = is_muc_pm(stanza, from_, type_ == 'groupchat')
id_ = stanza.getID()
fjid = None
if from_ is not None:
try:
fjid = helpers.parse_jid(str(from_))
except helpers.InvalidFormat:
log.warning('Invalid JID: %s, ignoring it',
stanza.getFrom())
return
jid, resource = app.get_room_and_nick_from_fjid(fjid)
type_ = properties.type
# Check for duplicates
stanza_id, origin_id = self._get_unique_id(
stanza, forwarded, sent, self_message, muc_pm)
stanza_id, message_id = self._get_unique_id(properties)
# Check groupchat messages for duplicates,
# We do this because of MUC History messages
if type_ == 'groupchat' or self_message or muc_pm:
if type_ == 'groupchat':
if (properties.type.is_groupchat or
properties.is_self_message or
properties.is_muc_pm):
if properties.type.is_groupchat:
archive_jid = stanza.getFrom().getStripped()
else:
archive_jid = self._con.get_own_jid().getStripped()
if app.logger.find_stanza_id(self._account,
archive_jid,
stanza_id,
origin_id,
type_ == 'groupchat'):
message_id,
properties.type.is_groupchat):
return
thread_id = stanza.getThread()
msgtxt = stanza.getBody()
thread_id = properties.thread
msgtxt = properties.body
# TODO: remove all control UI stuff
gc_control = app.interface.msg_win_mgr.get_gc_control(
@ -139,7 +121,7 @@ class Message:
gc_control = minimized.get(jid)
if gc_control and jid == fjid:
if type_ == 'error':
if properties.type.is_error:
if msgtxt:
msgtxt = _('error while sending %(message)s ( %(error)s )') % {
'message': msgtxt, 'error': stanza.getErrorMsg()}
@ -148,11 +130,11 @@ class Message:
# TODO: why is this here?
if stanza.getTag('html'):
stanza.delChild('html')
type_ = 'groupchat'
type_ = MessageType.GROUPCHAT
session = None
if type_ != 'groupchat':
if muc_pm and type_ == 'error':
if not properties.type.is_groupchat:
if properties.is_muc_pm and properties.type.is_error:
session = self._con.find_session(fjid, thread_id)
if not session:
session = self._con.get_latest_session(fjid)
@ -171,7 +153,7 @@ class Message:
'conn': self._con,
'stanza': stanza,
'account': self._account,
'id_': id_,
'id_': properties.id,
'encrypted': False,
'additional_data': AdditionalDataDict(),
'forwarded': forwarded,
@ -180,13 +162,14 @@ class Message:
'jid': jid,
'resource': resource,
'stanza_id': stanza_id,
'unique_id': stanza_id or origin_id,
'mtype': type_,
'unique_id': stanza_id or message_id,
'message_id': properties.id,
'mtype': type_.value,
'msgtxt': msgtxt,
'thread_id': thread_id,
'session': session,
'self_message': self_message,
'muc_pm': muc_pm,
'self_message': properties.is_self_message,
'muc_pm': properties.is_muc_pm,
'gc_control': gc_control
}
@ -313,7 +296,8 @@ class Message:
message=event.msgtxt,
contact_name=event.nick,
additional_data=event.additional_data,
stanza_id=event.stanza_id)
stanza_id=event.stanza_id,
message_id=event.message_id)
app.logger.set_room_last_message_time(event.room_jid, event.timestamp)
self._con.get_module('MAM').save_archive_id(
event.room_jid, event.stanza_id, event.timestamp)
@ -324,36 +308,29 @@ class Message:
if stanza_id is None and namespace == nbxmpp.NS_MAM_2:
log.warning('%s announces mam:2 without stanza-id', room_jid)
def _get_unique_id(self, stanza, _forwarded, _sent, self_message, _muc_pm):
if stanza.getType() == 'groupchat':
# TODO: Disco the MUC check if 'urn:xmpp:mam:2' is announced
return self._get_stanza_id(stanza), None
def _get_unique_id(self, properties):
if properties.is_self_message:
# Deduplicate self message with message-id
return None, properties.id
if stanza.getType() != 'chat':
if properties.stanza_id.by is None:
# We can not verify who sent this stanza-id, ignore it.
return None, None
# Messages we receive live
if self._con.get_module('MAM').archiving_namespace != nbxmpp.NS_MAM_2:
if properties.type.is_groupchat:
namespace = caps_cache.muc_caps_cache.get_mam_namespace(
properties.jid.getBare())
archive = properties.jid
else:
namespace = self._con.get_module('MAM').archiving_namespace
archive = self._con.get_own_jid()
if namespace != nbxmpp.NS_MAM_2:
# Only mam:2 ensures valid stanza-id
return None, None
if self_message:
return self._get_stanza_id(stanza), stanza.getOriginID()
return self._get_stanza_id(stanza), None
def _get_stanza_id(self, stanza):
stanza_id, by = stanza.getStanzaIDAttrs()
if by is None:
# We can not verify who set this stanza-id, ignore it.
return
if stanza.getType() == 'groupchat':
if stanza.getFrom().bareMatch(by):
# by attribute must match the server
return stanza_id
elif self._con.get_own_jid().bareMatch(by):
# by attribute must match the server
return stanza_id
return
if archive.bareMatch(properties.stanza_id.by):
return properties.stanza_id.id, None
class MessageReceivedEvent(NetworkIncomingEvent):

View File

@ -16,35 +16,6 @@
from typing import Union
import nbxmpp
from gajim.common import app
def is_self_message(message: nbxmpp.Node,
groupchat: bool = False) -> bool:
if groupchat:
return False
frm = message.getFrom()
to = message.getTo()
return frm.bareMatch(to)
def is_muc_pm(message: nbxmpp.Node,
jid: nbxmpp.JID,
groupchat: bool = False) -> bool:
if groupchat:
return False
muc_user = message.getTag('x', namespace=nbxmpp.NS_MUC_USER)
if muc_user is not None:
return muc_user.getChildren() == []
# muc#user namespace was added in MUC 1.28 so we need a fallback
# Check if we know the jid
if app.logger.jid_is_room_jid(jid.getStripped()):
return True
return False
def from_xs_boolean(value: Union[str, bool]) -> bool:
if isinstance(value, bool):

View File

@ -134,7 +134,8 @@ class ChatControlSession:
message=msg_to_log,
subject=obj.subject,
additional_data=obj.additional_data,
stanza_id=obj.unique_id)
stanza_id=obj.unique_id,
message_id=obj.message_id)
self.conn.get_module('MAM').save_archive_id(
None, obj.stanza_id, obj.timestamp)