Merge branch 'logger' into 'master'
New logging methods and more MAM refactoring See merge request !112
This commit is contained in:
commit
bc7ce12932
|
@ -40,7 +40,7 @@ from common import i18n
|
|||
from common import dataforms
|
||||
from common import exceptions
|
||||
from common.zeroconf.zeroconf import Constant
|
||||
from common.logger import LOG_DB_PATH
|
||||
from common.logger import LOG_DB_PATH, KindConstant
|
||||
from common.pep import SUPPORTED_PERSONAL_USER_EVENTS
|
||||
from common.jingle_transport import JingleTransportSocks5
|
||||
from common.file_props import FilesProp
|
||||
|
@ -1051,15 +1051,19 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
|
|||
self.query_id = self.result.getAttr('queryid')
|
||||
|
||||
frm = self.msg_.getFrom()
|
||||
# Some servers dont set the 'to' attribute when
|
||||
# we send a message to ourself
|
||||
to = self.msg_.getTo()
|
||||
if to is None:
|
||||
to = own_jid
|
||||
|
||||
if frm.bareMatch(own_jid):
|
||||
self.stanza_id = self.msg_.getOriginID()
|
||||
if not self.stanza_id:
|
||||
if self.stanza_id is None:
|
||||
self.stanza_id = self.msg_.getID()
|
||||
|
||||
self.with_ = to
|
||||
self.direction = 'to'
|
||||
self.kind = KindConstant.CHAT_MSG_SENT
|
||||
else:
|
||||
if self.result.getNamespace() == nbxmpp.NS_MAM_2:
|
||||
self.stanza_id = self.result.getID()
|
||||
|
@ -1067,22 +1071,32 @@ class MamMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
|
|||
self.stanza_id = self.msg_.getID()
|
||||
|
||||
self.with_ = frm
|
||||
self.direction = 'from'
|
||||
self.kind = KindConstant.CHAT_MSG_RECV
|
||||
|
||||
if not self.stanza_id:
|
||||
if self.stanza_id is None:
|
||||
log.debug('Could not retrieve stanza-id')
|
||||
|
||||
# Use timestamp provided by archive,
|
||||
# Fallback: Use timestamp provided by user and issue a warning
|
||||
delay = self.forwarded.getTag('delay', namespace=nbxmpp.NS_DELAY2)
|
||||
if not delay:
|
||||
log.warning('No timestamp on archive Message, try fallback')
|
||||
delay = self.msg_.getTag('delay', namespace=nbxmpp.NS_DELAY2)
|
||||
if not delay:
|
||||
delay = self.forwarded.getTagAttr(
|
||||
'delay', 'stamp', namespace=nbxmpp.NS_DELAY2)
|
||||
if delay is None:
|
||||
log.error('Received MAM message without timestamp')
|
||||
return
|
||||
|
||||
self.timestamp = helpers.parse_delay(delay)
|
||||
self.timestamp = helpers.parse_datetime(
|
||||
delay, check_utc=True, epoch=True)
|
||||
if self.timestamp is None:
|
||||
log.error('Received MAM message with invalid timestamp: %s', delay)
|
||||
return
|
||||
|
||||
# Save timestamp added by the user
|
||||
user_delay = self.msg_.getTagAttr(
|
||||
'delay', 'stamp', namespace=nbxmpp.NS_DELAY2)
|
||||
if user_delay is not None:
|
||||
self.user_timestamp = helpers.parse_datetime(
|
||||
user_delay, check_utc=True, epoch=True)
|
||||
if self.user_timestamp is None:
|
||||
log.warning('Received MAM message with '
|
||||
'invalid user timestamp: %s', user_delay)
|
||||
|
||||
log.debug('Received mam-message: stanza id: %s', self.stanza_id)
|
||||
return True
|
||||
|
@ -1095,19 +1109,27 @@ class MamDecryptedMessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
|
|||
if not self.msgtxt:
|
||||
# For example Chatstates, Receipts, Chatmarkers
|
||||
log.debug('Received MAM message without text')
|
||||
return False
|
||||
return
|
||||
self.is_pm = gajim.logger.jid_is_room_jid(self.with_.getStripped())
|
||||
if self.is_pm is None:
|
||||
# Check if this event is triggered after a disco, so we dont
|
||||
# run into an endless loop
|
||||
if hasattr(self, 'disco'):
|
||||
log.error('JID not known even after sucessful disco')
|
||||
return
|
||||
# we don't know this JID, we need to disco it.
|
||||
server = self.with_.getDomain()
|
||||
if server not in self.conn.mam_awaiting_disco_result:
|
||||
self.conn.mam_awaiting_disco_result[server] = [
|
||||
[self.with_, self.direction, self.timestamp, self.msgtxt]]
|
||||
self.conn.mam_awaiting_disco_result[server] = [self]
|
||||
self.conn.discoverInfo(server)
|
||||
else:
|
||||
self.conn.mam_awaiting_disco_result[server].append(
|
||||
[self.with_, self.direction, self.timestamp, self.msgtxt])
|
||||
self.conn.mam_awaiting_disco_result[server].append(self)
|
||||
return
|
||||
|
||||
if self.is_pm:
|
||||
self.with_ = str(self.with_)
|
||||
else:
|
||||
self.with_ = self.with_.getStripped()
|
||||
return True
|
||||
|
||||
class MessageReceivedEvent(nec.NetworkIncomingEvent, HelperEvent):
|
||||
|
|
|
@ -673,34 +673,6 @@ def datetime_tuple(timestamp):
|
|||
tim = tim.timetuple()
|
||||
return tim
|
||||
|
||||
def parse_delay(timestamp):
|
||||
'''
|
||||
Parse a timestamp
|
||||
https://xmpp.org/extensions/xep-0203.html
|
||||
Note: Not all delay tags should be parsed with this method
|
||||
see https://xmpp.org/extensions/xep-0082.html for more information
|
||||
|
||||
:param timestamp: a XEP-0203 fomated timestring string or a delay Node
|
||||
|
||||
Examples:
|
||||
'2017-11-05T01:41:20Z'
|
||||
'2017-11-05T01:41:20.123Z'
|
||||
|
||||
return epoch UTC timestamp
|
||||
'''
|
||||
if isinstance(timestamp, nbxmpp.protocol.Node):
|
||||
timestamp = timestamp.getAttr('stamp')
|
||||
timestamp += '+0000'
|
||||
try:
|
||||
datetime_ = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ%z')
|
||||
except ValueError:
|
||||
try:
|
||||
datetime_ = datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ%z')
|
||||
except ValueError:
|
||||
log.error('Could not parse delay timestamp: %s', timestamp)
|
||||
raise
|
||||
return datetime_.timestamp()
|
||||
|
||||
def parse_datetime(timestring, check_utc=False, convert='utc', epoch=False):
|
||||
'''
|
||||
Parse a XEP-0082 DateTime Profile String
|
||||
|
|
|
@ -1139,25 +1139,23 @@ class Logger:
|
|||
(account_jid_id,))
|
||||
self._timeout_commit()
|
||||
|
||||
def save_if_not_exists(self, with_, direction, tim, msg, is_pm=False, additional_data=None):
|
||||
if additional_data is None:
|
||||
additional_data = {}
|
||||
def search_for_duplicate(self, jid, timestamp, msg):
|
||||
"""
|
||||
Check if a message is already in the `logs` table
|
||||
|
||||
if is_pm:
|
||||
with_ = str(with_)
|
||||
type_ = 'gc_msg'
|
||||
else:
|
||||
with_ = with_.getStripped()
|
||||
if direction == 'from':
|
||||
type_ = 'chat_msg_recv'
|
||||
elif direction == 'to':
|
||||
type_ = 'chat_msg_sent'
|
||||
:param jid: The jid as string
|
||||
|
||||
start_time = tim - 300 # 5 minutes arrount given time
|
||||
end_time = tim + 300 # 5 minutes arrount given time
|
||||
:param timestamp: The timestamp in UTC epoch
|
||||
|
||||
:param msg: The message text
|
||||
"""
|
||||
|
||||
# Add 5 minutes around the timestamp
|
||||
start_time = timestamp - 300
|
||||
end_time = timestamp + 300
|
||||
|
||||
log.debug('start: %s, end: %s, jid: %s, message: %s',
|
||||
start_time, end_time, with_, msg)
|
||||
start_time, end_time, jid, msg)
|
||||
|
||||
sql = '''
|
||||
SELECT * FROM logs
|
||||
|
@ -1165,14 +1163,63 @@ class Logger:
|
|||
AND time BETWEEN ? AND ?
|
||||
'''
|
||||
|
||||
result = self.con.execute(sql, (with_, msg, start_time, end_time)).fetchone()
|
||||
result = self.con.execute(sql, (jid, msg, start_time, end_time)).fetchone()
|
||||
|
||||
if result:
|
||||
log.debug('Log already in DB, ignoring it')
|
||||
return
|
||||
log.debug('New log received from server archives, storing it')
|
||||
self.write(type_, with_, message=msg, tim=tim,
|
||||
additional_data=additional_data, mam_query=True)
|
||||
if result is not None:
|
||||
log.debug('Message already in DB')
|
||||
return True
|
||||
return False
|
||||
|
||||
def insert_jid(self, jid, kind=None, type_=JIDConstant.NORMAL_TYPE):
|
||||
"""
|
||||
Insert a new jid into the `jids` table.
|
||||
|
||||
:param jid: The jid as string
|
||||
|
||||
:param kind: A KindConstant
|
||||
|
||||
:param type_: A JIDConstant
|
||||
"""
|
||||
if kind == KindConstant.GC_MSG:
|
||||
type_ = JIDConstant.ROOM_TYPE
|
||||
sql = 'INSERT OR IGNORE INTO jids (jid, type) VALUES (?, ?)'
|
||||
self.con.execute(sql, (jid, type_))
|
||||
self._timeout_commit()
|
||||
|
||||
def insert_into_logs(self, jid, time_, kind, unread=True, **kwargs):
|
||||
"""
|
||||
Insert a new message into the `logs` table
|
||||
|
||||
:param jid: The jid as string
|
||||
|
||||
:param time_: The timestamp in UTC epoch
|
||||
|
||||
:param kind: A KindConstant
|
||||
|
||||
:param unread: If True the message is added to the`unread_messages`
|
||||
table. Only if kind == CHAT_MSG_RECV
|
||||
|
||||
:param kwargs: Every additional named argument must correspond to
|
||||
a field in the `logs` table
|
||||
"""
|
||||
self.insert_jid(jid, kind=kind)
|
||||
|
||||
sql = '''
|
||||
INSERT INTO logs (jid_id, time, kind, {columns})
|
||||
VALUES ((SELECT jid_id FROM jids WHERE jid = ?), ?, ?, {values})
|
||||
'''.format(columns=', '.join(kwargs.keys()),
|
||||
values=', '.join('?' * len(kwargs)))
|
||||
|
||||
lastrowid = self.con.execute(sql, (jid, time_, kind, *kwargs.values())).lastrowid
|
||||
|
||||
if unread and kind == KindConstant.CHAT_MSG_RECV:
|
||||
sql = '''INSERT INTO unread_messages (message_id, jid_id)
|
||||
VALUES (?, (SELECT jid_id FROM jids WHERE jid = ?))'''
|
||||
self.con.execute(sql, (lastrowid, jid))
|
||||
|
||||
self._timeout_commit()
|
||||
|
||||
return lastrowid
|
||||
|
||||
def _nec_gc_message_received(self, obj):
|
||||
tim_f = float(obj.timestamp)
|
||||
|
|
|
@ -25,6 +25,7 @@ import nbxmpp
|
|||
|
||||
from common import gajim
|
||||
from common import ged
|
||||
from common.logger import KindConstant, JIDConstant
|
||||
import common.connection_handlers_events as ev
|
||||
|
||||
log = logging.getLogger('gajim.c.message_archiving')
|
||||
|
@ -77,23 +78,28 @@ class ConnectionArchive313:
|
|||
del self.mam_awaiting_disco_result[obj.jid]
|
||||
|
||||
def _nec_agent_info(self, obj):
|
||||
if obj.jid in self.mam_awaiting_disco_result:
|
||||
for identity in obj.identities:
|
||||
if identity['category'] == 'conference':
|
||||
# it's a groupchat
|
||||
for with_, direction, tim, msg_txt in \
|
||||
self.mam_awaiting_disco_result[obj.jid]:
|
||||
gajim.logger.get_jid_id(with_.getStripped(), 'ROOM')
|
||||
gajim.logger.save_if_not_exists(with_, direction, tim,
|
||||
msg_txt, is_pm=True)
|
||||
del self.mam_awaiting_disco_result[obj.jid]
|
||||
return
|
||||
# it's not a groupchat
|
||||
for with_, direction, tim, msg_txt in \
|
||||
self.mam_awaiting_disco_result[obj.jid]:
|
||||
gajim.logger.get_jid_id(with_.getStripped())
|
||||
gajim.logger.save_if_not_exists(with_, direction, tim, msg_txt)
|
||||
if obj.jid not in self.mam_awaiting_disco_result:
|
||||
return
|
||||
|
||||
for identity in obj.identities:
|
||||
if identity['category'] != 'conference':
|
||||
continue
|
||||
# it's a groupchat
|
||||
for msg_obj in self.mam_awaiting_disco_result[obj.jid]:
|
||||
gajim.logger.insert_jid(msg_obj.with_.getStripped(),
|
||||
type_=JIDConstant.ROOM_TYPE)
|
||||
gajim.nec.push_incoming_event(
|
||||
ev.MamDecryptedMessageReceivedEvent(
|
||||
None, disco=True, **vars(msg_obj)))
|
||||
del self.mam_awaiting_disco_result[obj.jid]
|
||||
return
|
||||
# it's not a groupchat
|
||||
for msg_obj in self.mam_awaiting_disco_result[obj.jid]:
|
||||
gajim.logger.insert_jid(msg_obj.with_.getStripped())
|
||||
gajim.nec.push_incoming_event(
|
||||
ev.MamDecryptedMessageReceivedEvent(
|
||||
None, disco=True, **vars(msg_obj)))
|
||||
del self.mam_awaiting_disco_result[obj.jid]
|
||||
|
||||
def _nec_result_finished(self, obj):
|
||||
if obj.conn.name != self.name:
|
||||
|
@ -121,8 +127,13 @@ class ConnectionArchive313:
|
|||
def _nec_mam_decrypted_message_received(self, obj):
|
||||
if obj.conn.name != self.name:
|
||||
return
|
||||
gajim.logger.save_if_not_exists(obj.with_, obj.direction, obj.timestamp,
|
||||
obj.msgtxt, is_pm=obj.is_pm, additional_data=obj.additional_data)
|
||||
duplicate = gajim.logger.search_for_duplicate(
|
||||
obj.with_, obj.timestamp, obj.msgtxt)
|
||||
if not duplicate:
|
||||
gajim.logger.insert_into_logs(
|
||||
obj.with_, obj.timestamp, obj.kind,
|
||||
unread=False,
|
||||
message=obj.msgtxt)
|
||||
|
||||
def get_query_id(self):
|
||||
self.mam_query_id = self.connection.getAnID()
|
||||
|
|
Loading…
Reference in New Issue