# This file is part of Gajim. # # Gajim is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation; version 3 only. # # Gajim is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Gajim. If not, see . # XEP-0163: Personal Eventing Protocol from typing import Any from typing import Dict from typing import List from typing import Tuple import logging import nbxmpp from gajim.common import app from gajim.common.exceptions import StanzaMalformed from gajim.common.nec import NetworkIncomingEvent from gajim.common.const import PEPEventType from gajim.common.types import ConnectionT from gajim.common.types import PEPHandlersDict # pylint: disable=unused-import from gajim.common.types import PEPNotifyCallback log = logging.getLogger('gajim.c.m.pep') class PEP: def __init__(self, con: ConnectionT) -> None: self._con = con self._account = con.name self.handlers = [ ('message', self._pep_event_received, 'headline', nbxmpp.NS_PUBSUB_EVENT) ] self.supported = False self._pep_handlers = {} # type: PEPHandlersDict self._store_publish_modules = [] # type: List[Any] def pass_disco(self, from_: nbxmpp.JID, identities: List[Dict[str, str]], _features: List[str], _data: List[nbxmpp.DataForm], _node: str) -> None: for identity in identities: if identity['category'] == 'pubsub': if identity.get('type') == 'pep': log.info('Discovered PEP support: %s', from_) self.supported = True def register_pep_handler( self, namespace: str, notify_handler: PEPNotifyCallback) -> None: if namespace in self._pep_handlers: self._pep_handlers[namespace].append(notify_handler) else: self._pep_handlers[namespace] = [notify_handler] module_instance = notify_handler.__self__ # type: ignore if module_instance.store_publish: if module_instance not in self._store_publish_modules: self._store_publish_modules.append(module_instance) def _pep_event_received(self, _con: ConnectionT, stanza: nbxmpp.Message) -> None: jid = stanza.getFrom() event = stanza.getTag('event', namespace=nbxmpp.NS_PUBSUB_EVENT) items = event.getTag('items') if items is None: log.warning('Malformed PEP event (no items node): %s', stanza) raise nbxmpp.NodeProcessed namespace = items.getAttr('node') if namespace is None: log.warning('Malformed PEP event (no node attr): %s', stanza) raise nbxmpp.NodeProcessed log.info('PEP notification received: %s %s', jid, namespace) handlers = self._pep_handlers.get(namespace, None) if handlers is None: raise nbxmpp.NodeProcessed # Check if this is a retraction retract = items.getTag('retract') if retract is not None: id_ = retract.getAttr('id') log.info('Received retract of id: %s', id_) raise nbxmpp.NodeProcessed # Check if we have items items_ = items.getTags('item') if items_ is None: log.warning('Malformed PEP event received: %s', stanza) raise nbxmpp.NodeProcessed for handler in handlers: handler(jid, items_[0]) raise nbxmpp.NodeProcessed def send_stored_publish(self) -> None: for module in self._store_publish_modules: module.send_stored_publish() def reset_stored_publish(self) -> None: for module in self._store_publish_modules: module.reset_stored_publish() class AbstractPEPData: type_ = PEPEventType.ABSTRACT def __init__(self, data: Any) -> None: self.data = data def as_markup_text(self) -> str: # pylint: disable=no-self-use '''SHOULD be implemented by subclasses''' return '' def __eq__(self, other: Any) -> bool: return other == self.type_ def __bool__(self) -> bool: return self.data is not None def __str__(self) -> str: return str(self.data) class AbstractPEPModule: name = '' namespace = '' pep_class = AbstractPEPData store_publish = True _log = log def __init__(self, con: ConnectionT) -> None: self._con = con self._account = con.name self.handlers = [] # type: List[Tuple[Any, ...]] self._stored_publish = None self._con.get_module('PEP').register_pep_handler( self.namespace, self._pep_notify_received) def _pep_notify_received(self, jid: nbxmpp.JID, item: nbxmpp.Node) -> None: try: data = self._extract_info(item) except StanzaMalformed as error: log.warning('%s, %s: %s', jid, error, item) return self._log.info('Received: %s %s', jid, data) user_pep = self.pep_class(data) self._notification_received(jid, user_pep) app.nec.push_incoming_event( PEPReceivedEvent(None, conn=self._con, jid=str(jid), pep_type=self.name, user_pep=user_pep)) def _extract_info(self, item: nbxmpp.Node) -> Any: '''To be implemented by subclasses''' raise NotImplementedError def _build_node(self, data: Any) -> nbxmpp.Node: '''To be implemented by subclasses''' raise NotImplementedError def _notification_received(self, jid: nbxmpp.JID, user_pep: Any) -> None: for contact in app.contacts.get_contacts(self._account, str(jid)): if user_pep: contact.pep[self.name] = user_pep else: contact.pep.pop(self.name, None) if jid == self._con.get_own_jid().getStripped(): if user_pep: self._con.pep[self.name] = user_pep else: self._con.pep.pop(self.name, None) def send_stored_publish(self) -> None: if self._stored_publish is not None: self._log.info('Send stored publish') self.send(self._stored_publish) self._stored_publish = None def reset_stored_publish(self) -> None: self._log.info('Reset stored publish') self._stored_publish = None def send(self, data: Any) -> None: if not self._con.get_module('PEP').supported: return if self._con.connected == 1: # We are connecting, save activity and send it later self._stored_publish = data return if data: self._log.info('Send: %s', data) else: self._log.info('Remove') item = self._build_node(data) self._con.get_module('PubSub').send_pb_publish( '', self.namespace, item, 'current') class PEPReceivedEvent(NetworkIncomingEvent): name = 'pep-received' def get_instance(*args: Any, **kwargs: Any) -> Tuple[PEP, str]: return PEP(*args, **kwargs), 'PEP'