gajim-plural/gajim/common/modules/privacylists.py

467 lines
17 KiB
Python

# This file is part of Gajim.
#
# Gajim is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# Gajim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
# XEP-0016: Privacy Lists
import nbxmpp
from nbxmpp.structs import StanzaHandler
from gajim.common import app
from gajim.common import helpers
from gajim.common.nec import NetworkEvent
from gajim.common.nec import NetworkIncomingEvent
from gajim.common.modules.base import BaseModule
from gajim.common.connection_handlers_events import InformationEvent
class PrivacyLists(BaseModule):
def __init__(self, con):
BaseModule.__init__(self, con)
self.default_list = None
self.active_list = None
self.blocked_contacts = []
self.blocked_groups = []
self.blocked_list = []
self.blocked_all = False
self.handlers = [
StanzaHandler(name='iq',
callback=self._list_push_received,
typ='set',
ns=nbxmpp.NS_PRIVACY),
]
self.supported = False
def pass_disco(self, from_, _identities, features, _data, _node):
if nbxmpp.NS_PRIVACY not in features:
return
self.supported = True
self._log.info('Discovered XEP-0016: Privacy Lists: %s', from_)
app.nec.push_incoming_event(
NetworkEvent('feature-discovered',
account=self._account,
feature=nbxmpp.NS_PRIVACY))
def _list_push_received(self, _con, stanza, _properties):
result = stanza.buildReply('result')
result.delChild(result.getTag('query'))
self._con.connection.send(result)
for list_ in stanza.getQueryPayload():
if list_.getName() == 'list':
name = list_.getAttr('name')
self._log.info('Received Push: %s', name)
self.get_privacy_list(name)
raise nbxmpp.NodeProcessed
def get_privacy_lists(self, callback=None):
self._log.info('Request lists')
iq = nbxmpp.Iq('get', nbxmpp.NS_PRIVACY)
self._con.connection.SendAndCallForResponse(
iq, self._privacy_lists_received, {'callback': callback})
def _privacy_lists_received(self, _con, stanza, callback):
lists = []
new_default = None
result = nbxmpp.isResultNode(stanza)
if not result:
self._log.warning('List not available: %s', stanza.getError())
else:
for list_ in stanza.getQueryPayload():
name = list_.getAttr('name')
if list_.getName() == 'active':
self.active_list = name
elif list_.getName() == 'default':
new_default = name
else:
lists.append(name)
self._log.info('Received lists: %s', lists)
# Download default list if we dont have it
if self.default_list != new_default:
self.default_list = new_default
if new_default is not None:
self._log.info('Found new default list: %s', new_default)
self.get_privacy_list(new_default)
if callback:
callback(result)
else:
app.nec.push_incoming_event(
PrivacyListsReceivedEvent(None,
conn=self._con,
active_list=self.active_list,
default_list=self.default_list,
lists=lists))
def get_privacy_list(self, name):
self._log.info('Request list: %s', name)
list_ = nbxmpp.Node('list', {'name': name})
iq = nbxmpp.Iq('get', nbxmpp.NS_PRIVACY, payload=[list_])
self._con.connection.SendAndCallForResponse(
iq, self._privacy_list_received)
def _privacy_list_received(self, stanza):
if not nbxmpp.isResultNode(stanza):
self._log.warning('List not available: %s', stanza.getError())
return
rules = []
list_ = stanza.getQueryPayload()[0]
name = list_.getAttr('name')
for child in list_.getChildren():
item = child.getAttrs()
childs = []
for scnd_child in child.getChildren():
childs.append(scnd_child.getName())
item['child'] = childs
if len(item) not in (3, 5):
self._log.warning('Wrong count of attrs: %s', stanza)
continue
rules.append(item)
self._log.info('Received list: %s', name)
if name == self.default_list:
self._default_list_received(rules)
app.nec.push_incoming_event(PrivacyListReceivedEvent(
None, conn=self._con, list_name=name, rules=rules))
def del_privacy_list(self, name):
self._log.info('Remove list: %s', name)
def _del_privacy_list_result(stanza):
if not nbxmpp.isResultNode(stanza):
self._log.warning('List deletion failed: %s', stanza.getError())
app.nec.push_incoming_event(InformationEvent(
None, dialog_name='privacy-list-error', args=name))
else:
app.nec.push_incoming_event(PrivacyListRemovedEvent(
None, conn=self._con, list_name=name))
node = nbxmpp.Node('list', {'name': name})
iq = nbxmpp.Iq('set', nbxmpp.NS_PRIVACY, payload=[node])
self._con.connection.SendAndCallForResponse(
iq, _del_privacy_list_result)
def set_privacy_list(self, name, rules):
node = nbxmpp.Node('list', {'name': name})
iq = nbxmpp.Iq('set', nbxmpp.NS_PRIVACY, payload=[node])
for item in rules:
childs = item.get('child', [])
for child in childs:
node.setTag(child)
item.pop('child', None)
node.setTag('item', item)
self._log.info('Update list: %s %s', name, rules)
self._con.connection.SendAndCallForResponse(
iq, self._default_result_handler, {})
def _default_list_received(self, rules):
roster = app.interface.roster
for rule in rules:
if rule['action'] == 'allow':
if 'type' not in rule:
self.blocked_all = False
elif rule['type'] == 'jid':
if rule['value'] in self.blocked_contacts:
self.blocked_contacts.remove(rule['value'])
elif rule['type'] == 'group':
if rule['value'] in self.blocked_groups:
self.blocked_groups.remove(rule['value'])
elif rule['action'] == 'deny':
if 'type' not in rule:
self.blocked_all = True
elif rule['type'] == 'jid':
if rule['value'] not in self.blocked_contacts:
self.blocked_contacts.append(rule['value'])
elif rule['type'] == 'group':
if rule['value'] not in self.blocked_groups:
self.blocked_groups.append(rule['value'])
self.blocked_list.append(rule)
if 'type' in rule:
if rule['type'] == 'jid':
roster.draw_contact(rule['value'], self._account)
if rule['type'] == 'group':
roster.draw_group(rule['value'], self._account)
def set_active_list(self, name=None):
self._log.info('Set active list: %s', name)
attr = {}
if name:
attr['name'] = name
node = nbxmpp.Node('active', attr)
iq = nbxmpp.Iq('set', nbxmpp.NS_PRIVACY, payload=[node])
self._con.connection.SendAndCallForResponse(
iq, self._default_result_handler, {})
def set_default_list(self, name=None):
self._log.info('Set default list: %s', name)
attr = {}
if name:
attr['name'] = name
node = nbxmpp.Node('default', attr)
iq = nbxmpp.Iq('set', nbxmpp.NS_PRIVACY, payload=[node])
self._con.connection.SendAndCallForResponse(
iq, self._default_result_handler, {})
def _default_result_handler(self, _con, stanza):
if not nbxmpp.isResultNode(stanza):
self._log.warning('Operation failed: %s', stanza.getError())
def _build_invisible_rule(self):
node = nbxmpp.Node('list', {'name': 'invisible'})
iq = nbxmpp.Iq('set', nbxmpp.NS_PRIVACY, payload=[node])
if (self._account in app.interface.status_sent_to_groups and
app.interface.status_sent_to_groups[self._account]):
for group in app.interface.status_sent_to_groups[self._account]:
item = node.setTag('item', {'type': 'group',
'value': group,
'action': 'allow',
'order': '1'})
item.setTag('presence-out')
if (self._account in app.interface.status_sent_to_users and
app.interface.status_sent_to_users[self._account]):
for jid in app.interface.status_sent_to_users[self._account]:
item = node.setTag('item', {'type': 'jid',
'value': jid,
'action': 'allow',
'order': '2'})
item.setTag('presence-out')
item = node.setTag('item', {'action': 'deny', 'order': '3'})
item.setTag('presence-out')
return iq
def set_invisible_rule(self, callback=None, **kwargs):
self._log.info('Update invisible list')
iq = self._build_invisible_rule()
if callback is None:
callback = self._default_result_handler
self._con.connection.SendAndCallForResponse(
iq, callback, kwargs)
def _get_max_blocked_list_order(self):
max_order = 0
for rule in self.blocked_list:
order = int(rule['order'])
if order > max_order:
max_order = order
return max_order
def block_gc_contact(self, jid):
if jid in self.blocked_contacts:
return
self._log.info('Block GC contact: %s', jid)
if self.default_list is None:
self.default_list = 'block'
max_order = self._get_max_blocked_list_order()
new_rule = {'order': str(max_order + 1),
'type': 'jid',
'action': 'deny',
'value': jid,
'child': ['message', 'iq', 'presence-out']}
self.blocked_list.append(new_rule)
self.blocked_contacts.append(jid)
self.set_privacy_list(self.default_list, self.blocked_list)
if len(self.blocked_list) == 1:
self.set_default_list(self.default_list)
def block_contacts(self, contact_list, message):
if not self.supported:
jid_list = [contact.jid for contact in contact_list]
self._con.get_module('Blocking').block(jid_list)
return
if self.default_list is None:
self.default_list = 'block'
for contact in contact_list:
self._log.info('Block contacts: %s', contact.jid)
contact.show = 'offline'
self._con.send_custom_status('offline', message, contact.jid)
max_order = self._get_max_blocked_list_order()
new_rule = {'order': str(max_order + 1),
'type': 'jid',
'action': 'deny',
'value': contact.jid}
self.blocked_list.append(new_rule)
self.blocked_contacts.append(contact.jid)
self.set_privacy_list(self.default_list, self.blocked_list)
if len(self.blocked_list) == 1:
self.set_default_list(self.default_list)
def unblock_gc_contact(self, jid):
new_blocked_list = []
# needed for draw_contact:
if jid not in self.blocked_contacts:
return
self.blocked_contacts.remove(jid)
self._log.info('Unblock GC contact: %s', jid)
for rule in self.blocked_list:
if (rule['action'] != 'deny' or
rule['type'] != 'jid' or
rule['value'] != jid):
new_blocked_list.append(rule)
if not new_blocked_list:
self.blocked_list = []
self.blocked_contacts = []
self.blocked_groups = []
self.set_default_list(None)
self.del_privacy_list(self.default_list)
else:
self.set_privacy_list(self.default_list, new_blocked_list)
def unblock_contacts(self, contact_list):
if not self.supported:
jid_list = [contact.jid for contact in contact_list]
self._con.get_module('Blocking').unblock(jid_list)
return
new_blocked_list = []
to_unblock = []
for contact in contact_list:
self._log.info('Unblock contacts: %s', contact.jid)
to_unblock.append(contact.jid)
if contact.jid in self.blocked_contacts:
self.blocked_contacts.remove(contact.jid)
for rule in self.blocked_list:
if (rule['action'] != 'deny' or
rule['type'] != 'jid' or
rule['value'] not in to_unblock):
new_blocked_list.append(rule)
if not new_blocked_list:
self.blocked_list = []
self.blocked_contacts = []
self.blocked_groups = []
self.set_default_list(None)
self.del_privacy_list(self.default_list)
else:
self.set_privacy_list(self.default_list, new_blocked_list)
if not app.interface.roster.regroup:
show = app.SHOW_LIST[self._con.connected]
else: # accounts merged
show = helpers.get_global_show()
if show == 'invisible':
return
for contact in contact_list:
self._con.send_custom_status(show, self._con.status, contact.jid)
self._presence_probe(contact.jid)
def block_group(self, group, contact_list, message):
if not self.supported:
return
if group in self.blocked_groups:
return
self.blocked_groups.append(group)
self._log.info('Block group: %s', group)
if self.default_list is None:
self.default_list = 'block'
for contact in contact_list:
self._con.send_custom_status('offline', message, contact.jid)
max_order = self._get_max_blocked_list_order()
new_rule = {'order': str(max_order + 1),
'type': 'group',
'action': 'deny',
'value': group}
self.blocked_list.append(new_rule)
self.set_privacy_list(self.default_list, self.blocked_list)
if len(self.blocked_list) == 1:
self.set_default_list(self.default_list)
def unblock_group(self, group, contact_list):
if not self.supported:
return
if group not in self.blocked_groups:
return
self.blocked_groups.remove(group)
self._log.info('Unblock group: %s', group)
new_blocked_list = []
for rule in self.blocked_list:
if (rule['action'] != 'deny' or
rule['type'] != 'group' or
rule['value'] != group):
new_blocked_list.append(rule)
if not new_blocked_list:
self.blocked_list = []
self.blocked_contacts = []
self.blocked_groups = []
self.set_default_list('')
self.del_privacy_list(self.default_list)
else:
self.set_privacy_list(self.default_list, new_blocked_list)
if not app.interface.roster.regroup:
show = app.SHOW_LIST[self._con.connected]
else: # accounts merged
show = helpers.get_global_show()
if show == 'invisible':
return
for contact in contact_list:
self._con.send_custom_status(show, self._con.status, contact.jid)
def _presence_probe(self, jid):
self._log.info('Presence probe: %s', jid)
# Send a presence Probe to get the current Status
probe = nbxmpp.Presence(jid, 'probe', frm=self._con.get_own_jid())
self._con.connection.send(probe)
class PrivacyListsReceivedEvent(NetworkIncomingEvent):
name = 'privacy-lists-received'
class PrivacyListReceivedEvent(NetworkIncomingEvent):
name = 'privacy-list-received'
class PrivacyListRemovedEvent(NetworkIncomingEvent):
name = 'privacy-list-removed'
def get_instance(*args, **kwargs):
return PrivacyLists(*args, **kwargs), 'PrivacyLists'