1488 lines
50 KiB
Python
1488 lines
50 KiB
Python
# Copyright (C) 2003-2014 Yann Leboulanger <asterix AT lagaule.org>
|
|
# Copyright (C) 2005-2006 Dimitur Kirov <dkirov AT gmail.com>
|
|
# Nikos Kouremenos <kourem AT gmail.com>
|
|
# Copyright (C) 2006 Alex Mauer <hawke AT hawkesnest.net>
|
|
# Copyright (C) 2006-2007 Travis Shirk <travis AT pobox.com>
|
|
# Copyright (C) 2006-2008 Jean-Marie Traissard <jim AT lapin.org>
|
|
# Copyright (C) 2007 Lukas Petrovicky <lukas AT petrovicky.net>
|
|
# James Newton <redshodan AT gmail.com>
|
|
# Julien Pivotto <roidelapluie AT gmail.com>
|
|
# Copyright (C) 2007-2008 Stephan Erb <steve-e AT h3c.de>
|
|
# Copyright (C) 2008 Brendan Taylor <whateley AT gmail.com>
|
|
# Jonathan Schleifer <js-gajim AT webkeks.org>
|
|
#
|
|
# This file is part of Gajim.
|
|
#
|
|
# Gajim is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published
|
|
# by the Free Software Foundation; version 3 only.
|
|
#
|
|
# Gajim is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import sys
|
|
import re
|
|
import os
|
|
import subprocess
|
|
import urllib
|
|
import webbrowser
|
|
import errno
|
|
import select
|
|
import base64
|
|
import hashlib
|
|
import shlex
|
|
import socket
|
|
import time
|
|
import logging
|
|
import json
|
|
import shutil
|
|
from datetime import datetime, timedelta
|
|
from distutils.version import LooseVersion as V
|
|
from encodings.punycode import punycode_encode
|
|
from string import Template
|
|
|
|
import nbxmpp
|
|
from nbxmpp.stringprepare import nameprep
|
|
import precis_i18n.codec # pylint: disable=unused-import
|
|
|
|
from gajim.common import caps_cache
|
|
from gajim.common import configpaths
|
|
from gajim.common.i18n import Q_
|
|
from gajim.common.i18n import _
|
|
from gajim.common.i18n import ngettext
|
|
from gajim.common.caps_cache import muc_caps_cache
|
|
|
|
log = logging.getLogger('gajim.c.helpers')
|
|
|
|
special_groups = (_('Transports'), _('Not in Roster'), _('Observers'), _('Groupchats'))
|
|
|
|
|
|
class InvalidFormat(Exception):
|
|
pass
|
|
|
|
|
|
def decompose_jid(jidstring):
|
|
user = None
|
|
server = None
|
|
resource = None
|
|
|
|
# Search for delimiters
|
|
user_sep = jidstring.find('@')
|
|
res_sep = jidstring.find('/')
|
|
|
|
if user_sep == -1:
|
|
if res_sep == -1:
|
|
# host
|
|
server = jidstring
|
|
else:
|
|
# host/resource
|
|
server = jidstring[0:res_sep]
|
|
resource = jidstring[res_sep + 1:]
|
|
else:
|
|
if res_sep == -1:
|
|
# user@host
|
|
user = jidstring[0:user_sep]
|
|
server = jidstring[user_sep + 1:]
|
|
else:
|
|
if user_sep < res_sep:
|
|
# user@host/resource
|
|
user = jidstring[0:user_sep]
|
|
server = jidstring[user_sep + 1:user_sep + (res_sep - user_sep)]
|
|
resource = jidstring[res_sep + 1:]
|
|
else:
|
|
# server/resource (with an @ in resource)
|
|
server = jidstring[0:res_sep]
|
|
resource = jidstring[res_sep + 1:]
|
|
return user, server, resource
|
|
|
|
def parse_jid(jidstring):
|
|
"""
|
|
Perform stringprep on all JID fragments from a string and return the full
|
|
jid
|
|
"""
|
|
# This function comes from http://svn.twistedmatrix.com/cvs/trunk/twisted/words/protocols/jabber/jid.py
|
|
|
|
return prep(*decompose_jid(jidstring))
|
|
|
|
def idn_to_ascii(host):
|
|
"""
|
|
Convert IDN (Internationalized Domain Names) to ACE (ASCII-compatible
|
|
encoding)
|
|
"""
|
|
from encodings import idna
|
|
labels = idna.dots.split(host)
|
|
converted_labels = []
|
|
for label in labels:
|
|
if label:
|
|
converted_labels.append(idna.ToASCII(label).decode('utf-8'))
|
|
else:
|
|
converted_labels.append('')
|
|
return ".".join(converted_labels)
|
|
|
|
def ascii_to_idn(host):
|
|
"""
|
|
Convert ACE (ASCII-compatible encoding) to IDN (Internationalized Domain
|
|
Names)
|
|
"""
|
|
from encodings import idna
|
|
labels = idna.dots.split(host)
|
|
converted_labels = []
|
|
for label in labels:
|
|
converted_labels.append(idna.ToUnicode(label))
|
|
return ".".join(converted_labels)
|
|
|
|
def puny_encode_url(url):
|
|
_url = url
|
|
if '//' not in _url:
|
|
_url = '//' + _url
|
|
try:
|
|
o = urllib.parse.urlparse(_url)
|
|
p_loc = idn_to_ascii(o.netloc)
|
|
except Exception:
|
|
log.debug('urlparse failed: %s', url)
|
|
return False
|
|
return url.replace(o.netloc, p_loc)
|
|
|
|
def parse_resource(resource):
|
|
"""
|
|
Perform stringprep on resource and return it
|
|
"""
|
|
if resource:
|
|
try:
|
|
return resource.encode('Nickname').decode('utf-8')
|
|
except UnicodeError:
|
|
raise InvalidFormat('Invalid character in resource.')
|
|
|
|
def prep(user, server, resource):
|
|
"""
|
|
Perform stringprep on all JID fragments and return the full jid
|
|
"""
|
|
# This function comes from
|
|
#http://svn.twistedmatrix.com/cvs/trunk/twisted/words/protocols/jabber/jid.py
|
|
|
|
ip_address = False
|
|
|
|
try:
|
|
socket.inet_aton(server)
|
|
ip_address = True
|
|
except socket.error:
|
|
pass
|
|
|
|
if not ip_address and hasattr(socket, 'inet_pton'):
|
|
try:
|
|
socket.inet_pton(socket.AF_INET6, server.strip('[]'))
|
|
server = '[%s]' % server.strip('[]')
|
|
ip_address = True
|
|
except (socket.error, ValueError):
|
|
pass
|
|
|
|
if not ip_address:
|
|
if server is not None:
|
|
if server.endswith('.'): # RFC7622, 3.2
|
|
server = server[:-1]
|
|
if not server or len(server.encode('utf-8')) > 1023:
|
|
raise InvalidFormat(_('Server must be between 1 and 1023 bytes'))
|
|
try:
|
|
server = nameprep.prepare(server)
|
|
except UnicodeError:
|
|
raise InvalidFormat(_('Invalid character in hostname.'))
|
|
else:
|
|
raise InvalidFormat(_('Server address required.'))
|
|
|
|
if user is not None:
|
|
if not user or len(user.encode('utf-8')) > 1023:
|
|
raise InvalidFormat(_('Username must be between 1 and 1023 bytes'))
|
|
try:
|
|
user = user.encode('UsernameCaseMapped').decode('utf-8')
|
|
except UnicodeError:
|
|
raise InvalidFormat(_('Invalid character in username.'))
|
|
else:
|
|
user = None
|
|
|
|
if resource is not None:
|
|
if not resource or len(resource.encode('utf-8')) > 1023:
|
|
raise InvalidFormat(_('Resource must be between 1 and 1023 bytes'))
|
|
try:
|
|
resource = resource.encode('OpaqueString').decode('utf-8')
|
|
except UnicodeError:
|
|
raise InvalidFormat(_('Invalid character in resource.'))
|
|
else:
|
|
resource = None
|
|
|
|
if user:
|
|
if resource:
|
|
return '%s@%s/%s' % (user, server, resource)
|
|
return '%s@%s' % (user, server)
|
|
|
|
if resource:
|
|
return '%s/%s' % (server, resource)
|
|
return server
|
|
|
|
def windowsify(s):
|
|
if os.name == 'nt':
|
|
return s.capitalize()
|
|
return s
|
|
|
|
def temp_failure_retry(func, *args, **kwargs):
|
|
while True:
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except (os.error, IOError, select.error) as ex:
|
|
if ex.errno == errno.EINTR:
|
|
continue
|
|
else:
|
|
raise
|
|
|
|
def get_uf_show(show, use_mnemonic=False):
|
|
"""
|
|
Return a userfriendly string for dnd/xa/chat and make all strings
|
|
translatable
|
|
|
|
If use_mnemonic is True, it adds _ so GUI should call with True for
|
|
accessibility issues
|
|
"""
|
|
if show == 'dnd':
|
|
if use_mnemonic:
|
|
uf_show = _('_Busy')
|
|
else:
|
|
uf_show = _('Busy')
|
|
elif show == 'xa':
|
|
if use_mnemonic:
|
|
uf_show = _('_Not Available')
|
|
else:
|
|
uf_show = _('Not Available')
|
|
elif show == 'chat':
|
|
if use_mnemonic:
|
|
uf_show = _('_Free for Chat')
|
|
else:
|
|
uf_show = _('Free for Chat')
|
|
elif show == 'online':
|
|
if use_mnemonic:
|
|
uf_show = Q_('?user status:_Available')
|
|
else:
|
|
uf_show = Q_('?user status:Available')
|
|
elif show == 'connecting':
|
|
uf_show = _('Connecting')
|
|
elif show == 'away':
|
|
if use_mnemonic:
|
|
uf_show = _('A_way')
|
|
else:
|
|
uf_show = _('Away')
|
|
elif show == 'offline':
|
|
if use_mnemonic:
|
|
uf_show = _('_Offline')
|
|
else:
|
|
uf_show = _('Offline')
|
|
elif show == 'invisible':
|
|
if use_mnemonic:
|
|
uf_show = _('_Invisible')
|
|
else:
|
|
uf_show = _('Invisible')
|
|
elif show == 'not in roster':
|
|
uf_show = _('Not in Roster')
|
|
elif show == 'requested':
|
|
uf_show = Q_('?contact has status:Unknown')
|
|
else:
|
|
uf_show = Q_('?contact has status:Has errors')
|
|
return uf_show
|
|
|
|
def get_css_show_color(show):
|
|
if show in ('online', 'chat', 'invisible'):
|
|
return 'status-online'
|
|
if show in ('offline', 'not in roster', 'requested'):
|
|
return None
|
|
if show in ('xa', 'dnd'):
|
|
return 'status-dnd'
|
|
if show == 'away':
|
|
return 'status-away'
|
|
|
|
def get_uf_sub(sub):
|
|
if sub == 'none':
|
|
uf_sub = Q_('?Subscription we already have:None')
|
|
elif sub == 'to':
|
|
uf_sub = _('To')
|
|
elif sub == 'from':
|
|
uf_sub = _('From')
|
|
elif sub == 'both':
|
|
uf_sub = _('Both')
|
|
else:
|
|
uf_sub = _('Unknown')
|
|
|
|
return uf_sub
|
|
|
|
def get_uf_ask(ask):
|
|
if ask is None:
|
|
uf_ask = Q_('?Ask (for Subscription):None')
|
|
elif ask == 'subscribe':
|
|
uf_ask = _('Subscribe')
|
|
else:
|
|
uf_ask = ask
|
|
|
|
return uf_ask
|
|
|
|
def get_uf_role(role, plural=False):
|
|
''' plural determines if you get Moderators or Moderator'''
|
|
if role == 'none':
|
|
role_name = Q_('?Group Chat Contact Role:None')
|
|
elif role == 'moderator':
|
|
if plural:
|
|
role_name = _('Moderators')
|
|
else:
|
|
role_name = _('Moderator')
|
|
elif role == 'participant':
|
|
if plural:
|
|
role_name = _('Participants')
|
|
else:
|
|
role_name = _('Participant')
|
|
elif role == 'visitor':
|
|
if plural:
|
|
role_name = _('Visitors')
|
|
else:
|
|
role_name = _('Visitor')
|
|
return role_name
|
|
|
|
def get_uf_affiliation(affiliation):
|
|
'''Get a nice and translated affilition for muc'''
|
|
if affiliation == 'none':
|
|
affiliation_name = Q_('?Group Chat Contact Affiliation:None')
|
|
elif affiliation == 'owner':
|
|
affiliation_name = _('Owner')
|
|
elif affiliation == 'admin':
|
|
affiliation_name = _('Administrator')
|
|
elif affiliation == 'member':
|
|
affiliation_name = _('Member')
|
|
else: # Argl ! An unknown affiliation !
|
|
affiliation_name = affiliation.capitalize()
|
|
return affiliation_name
|
|
|
|
def get_sorted_keys(adict):
|
|
keys = sorted(adict.keys())
|
|
return keys
|
|
|
|
def to_one_line(msg):
|
|
msg = msg.replace('\\', '\\\\')
|
|
msg = msg.replace('\n', '\\n')
|
|
# s1 = 'test\ntest\\ntest'
|
|
# s11 = s1.replace('\\', '\\\\')
|
|
# s12 = s11.replace('\n', '\\n')
|
|
# s12
|
|
# 'test\\ntest\\\\ntest'
|
|
return msg
|
|
|
|
def from_one_line(msg):
|
|
# (?<!\\) is a lookbehind assertion which asks anything but '\'
|
|
# to match the regexp that follows it
|
|
|
|
# So here match '\\n' but not if you have a '\' before that
|
|
expr = re.compile(r'(?<!\\)\\n')
|
|
msg = expr.sub('\n', msg)
|
|
msg = msg.replace('\\\\', '\\')
|
|
# s12 = 'test\\ntest\\\\ntest'
|
|
# s13 = re.sub('\n', s12)
|
|
# s14 s13.replace('\\\\', '\\')
|
|
# s14
|
|
# 'test\ntest\\ntest'
|
|
return msg
|
|
|
|
def get_uf_chatstate(chatstate):
|
|
"""
|
|
Remove chatstate jargon and returns user friendly messages
|
|
"""
|
|
if chatstate == 'active':
|
|
return _('is paying attention to the conversation')
|
|
if chatstate == 'inactive':
|
|
return _('is doing something else')
|
|
if chatstate == 'composing':
|
|
return _('is composing a message…')
|
|
if chatstate == 'paused':
|
|
#paused means he or she was composing but has stopped for a while
|
|
return _('paused composing a message')
|
|
if chatstate == 'gone':
|
|
return _('has closed the chat window or tab')
|
|
return ''
|
|
|
|
def find_soundplayer():
|
|
if sys.platform in ('win32', 'darwin'):
|
|
return
|
|
|
|
if app.config.get('soundplayer') != '':
|
|
return
|
|
|
|
commands = ('aucat', 'paplay', 'aplay', 'play', 'ossplay')
|
|
for command in commands:
|
|
if shutil.which(command) is not None:
|
|
if command == 'paplay':
|
|
command += ' -n gajim --property=media.role=event'
|
|
elif command in ('aplay', 'play'):
|
|
command += ' -q'
|
|
elif command == 'ossplay':
|
|
command += ' -qq'
|
|
elif command == 'aucat':
|
|
command += ' -i'
|
|
app.config.set('soundplayer', command)
|
|
break
|
|
|
|
def exec_command(command, use_shell=False, posix=True):
|
|
"""
|
|
execute a command. if use_shell is True, we run the command as is it was
|
|
typed in a console. So it may be dangerous if you are not sure about what
|
|
is executed.
|
|
"""
|
|
if use_shell:
|
|
subprocess.Popen('%s &' % command, shell=True).wait()
|
|
else:
|
|
args = shlex.split(command, posix=posix)
|
|
p = subprocess.Popen(args)
|
|
app.thread_interface(p.wait)
|
|
|
|
def build_command(executable, parameter):
|
|
# we add to the parameter (can hold path with spaces)
|
|
# "" so we have good parsing from shell
|
|
parameter = parameter.replace('"', '\\"') # but first escape "
|
|
command = '%s "%s"' % (executable, parameter)
|
|
return command
|
|
|
|
def get_file_path_from_dnd_dropped_uri(uri):
|
|
path = urllib.parse.unquote(uri) # escape special chars
|
|
path = path.strip('\r\n\x00') # remove \r\n and NULL
|
|
# get the path to file
|
|
if re.match('^file:///[a-zA-Z]:/', path): # windows
|
|
path = path[8:] # 8 is len('file:///')
|
|
elif path.startswith('file://'): # nautilus, rox
|
|
path = path[7:] # 7 is len('file://')
|
|
elif path.startswith('file:'): # xffm
|
|
path = path[5:] # 5 is len('file:')
|
|
return path
|
|
|
|
def get_xmpp_show(show):
|
|
if show in ('online', 'offline'):
|
|
return None
|
|
return show
|
|
|
|
def sanitize_filename(filename):
|
|
"""
|
|
Make sure the filename we will write does contain only acceptable and latin
|
|
characters, and is not too long (in that case hash it)
|
|
"""
|
|
# 48 is the limit
|
|
if len(filename) > 48:
|
|
hash_ = hashlib.md5(filename.encode('utf-8'))
|
|
filename = base64.b64encode(hash_.digest()).decode('utf-8')
|
|
|
|
# make it latin chars only
|
|
filename = punycode_encode(filename).decode('utf-8')
|
|
filename = filename.replace('/', '_')
|
|
if os.name == 'nt':
|
|
filename = filename.replace('?', '_').replace(':', '_')\
|
|
.replace('\\', '_').replace('"', "'").replace('|', '_')\
|
|
.replace('*', '_').replace('<', '_').replace('>', '_')
|
|
|
|
return filename
|
|
|
|
def reduce_chars_newlines(text, max_chars=0, max_lines=0):
|
|
"""
|
|
Cut the chars after 'max_chars' on each line and show only the first
|
|
'max_lines'
|
|
|
|
If any of the params is not present (None or 0) the action on it is not
|
|
performed
|
|
"""
|
|
def _cut_if_long(string):
|
|
if len(string) > max_chars:
|
|
string = string[:max_chars - 3] + '…'
|
|
return string
|
|
|
|
if max_lines == 0:
|
|
lines = text.split('\n')
|
|
else:
|
|
lines = text.split('\n', max_lines)[:max_lines]
|
|
if max_chars > 0:
|
|
if lines:
|
|
lines = [_cut_if_long(e) for e in lines]
|
|
if lines:
|
|
reduced_text = '\n'.join(lines)
|
|
if reduced_text != text:
|
|
reduced_text += '…'
|
|
else:
|
|
reduced_text = ''
|
|
return reduced_text
|
|
|
|
def get_account_status(account):
|
|
status = reduce_chars_newlines(account['status_line'], 100, 1)
|
|
return status
|
|
|
|
def datetime_tuple(timestamp):
|
|
"""
|
|
Convert timestamp using strptime and the format: %Y%m%dT%H:%M:%S
|
|
|
|
Because of various datetime formats are used the following exceptions
|
|
are handled:
|
|
- Optional milliseconds appened to the string are removed
|
|
- Optional Z (that means UTC) appened to the string are removed
|
|
- XEP-082 datetime strings have all '-' cahrs removed to meet
|
|
the above format.
|
|
"""
|
|
date, tim = timestamp.split('T', 1)
|
|
date = date.replace('-', '')
|
|
tim = tim.replace('z', '')
|
|
tim = tim.replace('Z', '')
|
|
zone = None
|
|
if '+' in tim:
|
|
sign = -1
|
|
tim, zone = tim.split('+', 1)
|
|
if '-' in tim:
|
|
sign = 1
|
|
tim, zone = tim.split('-', 1)
|
|
tim = tim.split('.')[0]
|
|
tim = time.strptime(date + 'T' + tim, '%Y%m%dT%H:%M:%S')
|
|
if zone:
|
|
zone = zone.replace(':', '')
|
|
tim = datetime.fromtimestamp(time.mktime(tim))
|
|
if len(zone) > 2:
|
|
zone = time.strptime(zone, '%H%M')
|
|
else:
|
|
zone = time.strptime(zone, '%H')
|
|
zone = timedelta(hours=zone.tm_hour, minutes=zone.tm_min)
|
|
tim += zone * sign
|
|
tim = tim.timetuple()
|
|
return tim
|
|
|
|
|
|
from gajim.common import app
|
|
if app.is_installed('PYCURL'):
|
|
import pycurl
|
|
from io import StringIO
|
|
|
|
def convert_bytes(string):
|
|
suffix = ''
|
|
# IEC standard says KiB = 1024 bytes KB = 1000 bytes
|
|
# but do we use the standard?
|
|
use_kib_mib = app.config.get('use_kib_mib')
|
|
align = 1024.
|
|
bytes_ = float(string)
|
|
if bytes_ >= align:
|
|
bytes_ = round(bytes_/align, 1)
|
|
if bytes_ >= align:
|
|
bytes_ = round(bytes_/align, 1)
|
|
if bytes_ >= align:
|
|
bytes_ = round(bytes_/align, 1)
|
|
if use_kib_mib:
|
|
#GiB means gibibyte
|
|
suffix = _('%s GiB')
|
|
else:
|
|
#GB means gigabyte
|
|
suffix = _('%s GB')
|
|
else:
|
|
if use_kib_mib:
|
|
#MiB means mibibyte
|
|
suffix = _('%s MiB')
|
|
else:
|
|
#MB means megabyte
|
|
suffix = _('%s MB')
|
|
else:
|
|
if use_kib_mib:
|
|
#KiB means kibibyte
|
|
suffix = _('%s KiB')
|
|
else:
|
|
#KB means kilo bytes
|
|
suffix = _('%s KB')
|
|
else:
|
|
#B means bytes
|
|
suffix = _('%s B')
|
|
return suffix % str(bytes_)
|
|
|
|
def get_contact_dict_for_account(account):
|
|
"""
|
|
Create a dict of jid, nick -> contact with all contacts of account.
|
|
|
|
Can be used for completion lists
|
|
"""
|
|
contacts_dict = {}
|
|
for jid in app.contacts.get_jid_list(account):
|
|
contact = app.contacts.get_contact_with_highest_priority(account,
|
|
jid)
|
|
contacts_dict[jid] = contact
|
|
name = contact.name
|
|
if name in contacts_dict:
|
|
contact1 = contacts_dict[name]
|
|
del contacts_dict[name]
|
|
contacts_dict['%s (%s)' % (name, contact1.jid)] = contact1
|
|
contacts_dict['%s (%s)' % (name, jid)] = contact
|
|
elif contact.name:
|
|
if contact.name == app.get_nick_from_jid(jid):
|
|
del contacts_dict[jid]
|
|
contacts_dict[name] = contact
|
|
return contacts_dict
|
|
|
|
def launch_browser_mailer(kind, uri):
|
|
# kind = 'url' or 'mail'
|
|
if kind == 'url' and uri.startswith('file://'):
|
|
launch_file_manager(uri)
|
|
return
|
|
if kind in ('mail', 'sth_at_sth') and not uri.startswith('mailto:'):
|
|
uri = 'mailto:' + uri
|
|
|
|
if kind == 'url' and uri.startswith('www.'):
|
|
uri = 'http://' + uri
|
|
|
|
if not app.config.get('autodetect_browser_mailer'):
|
|
if kind == 'url':
|
|
command = app.config.get('custombrowser')
|
|
elif kind in ('mail', 'sth_at_sth'):
|
|
command = app.config.get('custommailapp')
|
|
if command == '': # if no app is configured
|
|
return
|
|
|
|
command = build_command(command, uri)
|
|
try:
|
|
exec_command(command)
|
|
except Exception:
|
|
pass
|
|
|
|
else:
|
|
webbrowser.open(uri)
|
|
|
|
|
|
def launch_file_manager(path_to_open):
|
|
if os.name == 'nt':
|
|
try:
|
|
os.startfile(path_to_open) # if pywin32 is installed we open
|
|
except Exception:
|
|
pass
|
|
else:
|
|
if not app.config.get('autodetect_browser_mailer'):
|
|
command = app.config.get('custom_file_manager')
|
|
if command == '': # if no app is configured
|
|
return
|
|
else:
|
|
command = 'xdg-open'
|
|
command = build_command(command, path_to_open)
|
|
try:
|
|
exec_command(command)
|
|
except Exception:
|
|
pass
|
|
|
|
def play_sound(event):
|
|
if not app.config.get('sounds_on'):
|
|
return
|
|
path_to_soundfile = app.config.get_per('soundevents', event, 'path')
|
|
play_sound_file(path_to_soundfile)
|
|
|
|
def check_soundfile_path(file_, dirs=None):
|
|
"""
|
|
Check if the sound file exists
|
|
|
|
:param file_: the file to check, absolute or relative to 'dirs' path
|
|
:param dirs: list of knows paths to fallback if the file doesn't exists
|
|
(eg: ~/.gajim/sounds/, DATADIR/sounds...).
|
|
:return the path to file or None if it doesn't exists.
|
|
"""
|
|
if dirs is None:
|
|
dirs = [configpaths.get('MY_DATA'),
|
|
configpaths.get('DATA')]
|
|
|
|
if not file_:
|
|
return None
|
|
if os.path.exists(file_):
|
|
return file_
|
|
|
|
for d in dirs:
|
|
d = os.path.join(d, 'sounds', file_)
|
|
if os.path.exists(d):
|
|
return d
|
|
return None
|
|
|
|
def strip_soundfile_path(file_, dirs=None, abs_=True):
|
|
"""
|
|
Remove knowns paths from a sound file
|
|
|
|
Filechooser returns absolute path. If path is a known fallback path, we remove it.
|
|
So config have no hardcoded path to DATA_DIR and text in textfield is shorther.
|
|
param: file_: the filename to strip.
|
|
param: dirs: list of knowns paths from which the filename should be stripped.
|
|
param: abs_: force absolute path on dirs
|
|
"""
|
|
if not file_:
|
|
return None
|
|
|
|
if dirs is None:
|
|
dirs = [configpaths.get('MY_DATA'),
|
|
configpaths.get('DATA')]
|
|
|
|
name = os.path.basename(file_)
|
|
for d in dirs:
|
|
d = os.path.join(d, 'sounds', name)
|
|
if abs_:
|
|
d = os.path.abspath(d)
|
|
if file_ == d:
|
|
return name
|
|
return file_
|
|
|
|
def play_sound_file(path_to_soundfile):
|
|
path_to_soundfile = check_soundfile_path(path_to_soundfile)
|
|
if path_to_soundfile is None:
|
|
return
|
|
|
|
if sys.platform == 'win32':
|
|
import winsound
|
|
try:
|
|
winsound.PlaySound(path_to_soundfile,
|
|
winsound.SND_FILENAME|winsound.SND_ASYNC)
|
|
except Exception:
|
|
log.exception('Sound Playback Error')
|
|
|
|
elif sys.platform == 'darwin':
|
|
try:
|
|
from AppKit import NSSound
|
|
except ImportError:
|
|
log.exception('Sound Playback Error')
|
|
return
|
|
|
|
sound = NSSound.alloc()
|
|
sound.initWithContentsOfFile_byReference_(path_to_soundfile, True)
|
|
sound.play()
|
|
|
|
elif app.config.get('soundplayer') == '':
|
|
try:
|
|
import wave
|
|
import ossaudiodev
|
|
except Exception:
|
|
log.exception('Sound Playback Error')
|
|
return
|
|
|
|
def _oss_play():
|
|
sndfile = wave.open(path_to_soundfile, 'rb')
|
|
nc, sw, fr, nf, _comptype, _compname = sndfile.getparams()
|
|
dev = ossaudiodev.open('/dev/dsp', 'w')
|
|
dev.setparameters(sw * 8, nc, fr)
|
|
dev.write(sndfile.readframes(nf))
|
|
sndfile.close()
|
|
dev.close()
|
|
app.thread_interface(_oss_play)
|
|
|
|
else:
|
|
player = app.config.get('soundplayer')
|
|
command = build_command(player, path_to_soundfile)
|
|
exec_command(command)
|
|
|
|
def get_global_show():
|
|
maxi = 0
|
|
for account in app.connections:
|
|
if not app.config.get_per('accounts', account,
|
|
'sync_with_global_status'):
|
|
continue
|
|
connected = app.connections[account].connected
|
|
if connected > maxi:
|
|
maxi = connected
|
|
return app.SHOW_LIST[maxi]
|
|
|
|
def get_global_status():
|
|
maxi = 0
|
|
for account in app.connections:
|
|
if not app.config.get_per('accounts', account,
|
|
'sync_with_global_status'):
|
|
continue
|
|
connected = app.connections[account].connected
|
|
if connected > maxi:
|
|
maxi = connected
|
|
status = app.connections[account].status
|
|
return status
|
|
|
|
|
|
def statuses_unified():
|
|
"""
|
|
Test if all statuses are the same
|
|
"""
|
|
reference = None
|
|
for account in app.connections:
|
|
if not app.config.get_per('accounts', account,
|
|
'sync_with_global_status'):
|
|
continue
|
|
if reference is None:
|
|
reference = app.connections[account].connected
|
|
elif reference != app.connections[account].connected:
|
|
return False
|
|
return True
|
|
|
|
def get_icon_name_to_show(contact, account=None):
|
|
"""
|
|
Get the icon name to show in online, away, requested, etc
|
|
"""
|
|
if account and app.events.get_nb_roster_events(account, contact.jid):
|
|
return 'event'
|
|
if account and app.events.get_nb_roster_events(
|
|
account, contact.get_full_jid()):
|
|
return 'event'
|
|
if account and account in app.interface.minimized_controls and \
|
|
contact.jid in app.interface.minimized_controls[account] and app.interface.\
|
|
minimized_controls[account][contact.jid].get_nb_unread_pm() > 0:
|
|
return 'event'
|
|
if account and contact.jid in app.gc_connected[account]:
|
|
if app.gc_connected[account][contact.jid]:
|
|
return 'muc_active'
|
|
return 'muc_inactive'
|
|
if contact.jid.find('@') <= 0: # if not '@' or '@' starts the jid ==> agent
|
|
return contact.show
|
|
if contact.sub in ('both', 'to'):
|
|
return contact.show
|
|
if contact.ask == 'subscribe':
|
|
return 'requested'
|
|
transport = app.get_transport_name_from_jid(contact.jid)
|
|
if transport:
|
|
return contact.show
|
|
if contact.show in app.SHOW_LIST:
|
|
return contact.show
|
|
return 'not in roster'
|
|
|
|
def get_full_jid_from_iq(iq_obj):
|
|
"""
|
|
Return the full jid (with resource) from an iq
|
|
"""
|
|
jid = iq_obj.getFrom()
|
|
if jid is None:
|
|
return None
|
|
return parse_jid(str(iq_obj.getFrom()))
|
|
|
|
def get_jid_from_iq(iq_obj):
|
|
"""
|
|
Return the jid (without resource) from an iq
|
|
"""
|
|
jid = get_full_jid_from_iq(iq_obj)
|
|
return app.get_jid_without_resource(jid)
|
|
|
|
def get_auth_sha(sid, initiator, target):
|
|
"""
|
|
Return sha of sid + initiator + target used for proxy auth
|
|
"""
|
|
return hashlib.sha1(("%s%s%s" % (sid, initiator, target)).encode('utf-8')).\
|
|
hexdigest()
|
|
|
|
def remove_invalid_xml_chars(string):
|
|
if string:
|
|
string = re.sub(app.interface.invalid_XML_chars_re, '', string)
|
|
return string
|
|
|
|
def get_random_string_16():
|
|
"""
|
|
Create random string of length 16
|
|
"""
|
|
rng = list(range(65, 90))
|
|
rng.extend(range(48, 57))
|
|
char_sequence = [chr(e) for e in rng]
|
|
from random import sample
|
|
return ''.join(sample(char_sequence, 16))
|
|
|
|
def get_os_info():
|
|
if app.os_info:
|
|
return app.os_info
|
|
app.os_info = 'N/A'
|
|
if os.name == 'nt' or sys.platform == 'darwin':
|
|
import platform
|
|
app.os_info = platform.system() + " " + platform.release()
|
|
elif os.name == 'posix':
|
|
try:
|
|
import distro
|
|
app.os_info = distro.name(pretty=True)
|
|
except ImportError:
|
|
import platform
|
|
app.os_info = platform.system()
|
|
return app.os_info
|
|
|
|
def allow_showing_notification(account, type_='notify_on_new_message',
|
|
is_first_message=True):
|
|
"""
|
|
Is it allowed to show nofication?
|
|
|
|
Check OUR status and if we allow notifications for that status type is the
|
|
option that need to be True e.g.: notify_on_signing is_first_message: set it
|
|
to false when it's not the first message
|
|
"""
|
|
if type_ and (not app.config.get(type_) or not is_first_message):
|
|
return False
|
|
if app.config.get('autopopupaway'): # always show notification
|
|
return True
|
|
if app.connections[account].connected in (2, 3): # we're online or chat
|
|
return True
|
|
return False
|
|
|
|
def allow_popup_window(account):
|
|
"""
|
|
Is it allowed to popup windows?
|
|
"""
|
|
autopopup = app.config.get('autopopup')
|
|
autopopupaway = app.config.get('autopopupaway')
|
|
if autopopup and (autopopupaway or \
|
|
app.connections[account].connected in (2, 3)): # we're online or chat
|
|
return True
|
|
return False
|
|
|
|
def allow_sound_notification(account, sound_event):
|
|
if app.config.get('sounddnd') or app.connections[account].connected != \
|
|
app.SHOW_LIST.index('dnd') and app.config.get_per('soundevents',
|
|
sound_event, 'enabled'):
|
|
return True
|
|
return False
|
|
|
|
def get_chat_control(account, contact):
|
|
full_jid_with_resource = contact.jid
|
|
if contact.resource:
|
|
full_jid_with_resource += '/' + contact.resource
|
|
highest_contact = app.contacts.get_contact_with_highest_priority(
|
|
account, contact.jid)
|
|
|
|
# Look for a chat control that has the given resource, or default to
|
|
# one without resource
|
|
ctrl = app.interface.msg_win_mgr.get_control(full_jid_with_resource,
|
|
account)
|
|
|
|
if ctrl:
|
|
return ctrl
|
|
|
|
if (highest_contact and
|
|
highest_contact.resource and
|
|
contact.resource != highest_contact.resource):
|
|
return None
|
|
|
|
# unknown contact or offline message
|
|
return app.interface.msg_win_mgr.get_control(contact.jid, account)
|
|
|
|
def get_notification_icon_tooltip_dict():
|
|
"""
|
|
Return a dict of the form {acct: {'show': show, 'message': message,
|
|
'event_lines': [list of text lines to show in tooltip]}
|
|
"""
|
|
# How many events must there be before they're shown summarized, not per-user
|
|
max_ungrouped_events = 10
|
|
|
|
accounts = get_accounts_info()
|
|
|
|
# Gather events. (With accounts, when there are more.)
|
|
for account in accounts:
|
|
account_name = account['name']
|
|
account['event_lines'] = []
|
|
# Gather events per-account
|
|
pending_events = app.events.get_events(account=account_name)
|
|
messages, non_messages, total_messages, total_non_messages = {}, {}, 0, 0
|
|
for jid in pending_events:
|
|
for event in pending_events[jid]:
|
|
if event.type_.count('file') > 0:
|
|
# This is a non-messagee event.
|
|
messages[jid] = non_messages.get(jid, 0) + 1
|
|
total_non_messages = total_non_messages + 1
|
|
else:
|
|
# This is a message.
|
|
messages[jid] = messages.get(jid, 0) + 1
|
|
total_messages = total_messages + 1
|
|
# Display unread messages numbers, if any
|
|
if total_messages > 0:
|
|
if total_messages > max_ungrouped_events:
|
|
text = ngettext(
|
|
'%d message pending',
|
|
'%d messages pending',
|
|
total_messages, total_messages, total_messages)
|
|
account['event_lines'].append(text)
|
|
else:
|
|
for jid in messages:
|
|
text = ngettext(
|
|
'%d message pending',
|
|
'%d messages pending',
|
|
messages[jid], messages[jid], messages[jid])
|
|
contact = app.contacts.get_first_contact_from_jid(
|
|
account['name'], jid)
|
|
text += ' '
|
|
if jid in app.gc_connected[account['name']]:
|
|
text += _('from room %s') % (jid)
|
|
elif contact:
|
|
name = contact.get_shown_name()
|
|
text += _('from user %s') % (name)
|
|
else:
|
|
text += _('from %s') % (jid)
|
|
account['event_lines'].append(text)
|
|
|
|
# Display unseen events numbers, if any
|
|
if total_non_messages > 0:
|
|
if total_non_messages > max_ungrouped_events:
|
|
text = ngettext(
|
|
'%d event pending',
|
|
'%d events pending',
|
|
total_non_messages, total_non_messages, total_non_messages)
|
|
account['event_lines'].append(text)
|
|
else:
|
|
for jid in non_messages:
|
|
text = ngettext('%d event pending', '%d events pending',
|
|
non_messages[jid], non_messages[jid], non_messages[jid])
|
|
text += ' ' + _('from user %s') % (jid)
|
|
account[account]['event_lines'].append(text)
|
|
|
|
return accounts
|
|
|
|
def get_notification_icon_tooltip_text():
|
|
text = None
|
|
# How many events must there be before they're shown summarized, not per-user
|
|
# max_ungrouped_events = 10
|
|
# Character which should be used to indent in the tooltip.
|
|
indent_with = ' '
|
|
|
|
accounts = get_notification_icon_tooltip_dict()
|
|
|
|
if not accounts:
|
|
# No configured account
|
|
return _('Gajim')
|
|
|
|
# at least one account present
|
|
|
|
# Is there more that one account?
|
|
if len(accounts) == 1:
|
|
show_more_accounts = False
|
|
else:
|
|
show_more_accounts = True
|
|
|
|
# If there is only one account, its status is shown on the first line.
|
|
if show_more_accounts:
|
|
text = _('Gajim')
|
|
else:
|
|
text = _('Gajim - %s') % (get_account_status(accounts[0]))
|
|
|
|
# Gather and display events. (With accounts, when there are more.)
|
|
for account in accounts:
|
|
account_name = account['name']
|
|
# Set account status, if not set above
|
|
if show_more_accounts:
|
|
message = '\n' + indent_with + ' %s - %s'
|
|
text += message % (account_name, get_account_status(account))
|
|
# Account list shown, messages need to be indented more
|
|
indent_how = 2
|
|
else:
|
|
# If no account list is shown, messages could have default indenting.
|
|
indent_how = 1
|
|
for line in account['event_lines']:
|
|
text += '\n' + indent_with * indent_how + ' '
|
|
text += line
|
|
return text
|
|
|
|
def get_accounts_info():
|
|
"""
|
|
Helper for notification icon tooltip
|
|
"""
|
|
accounts = []
|
|
accounts_list = sorted(app.contacts.get_accounts())
|
|
for account in accounts_list:
|
|
status_idx = app.connections[account].connected
|
|
# uncomment the following to hide offline accounts
|
|
# if status_idx == 0: continue
|
|
status = app.SHOW_LIST[status_idx]
|
|
message = app.connections[account].status
|
|
single_line = get_uf_show(status)
|
|
if message is None:
|
|
message = ''
|
|
else:
|
|
message = message.strip()
|
|
if message != '':
|
|
single_line += ': ' + message
|
|
account_label = app.get_account_label(account)
|
|
accounts.append({'name': account,
|
|
'account_label': account_label,
|
|
'status_line': single_line,
|
|
'show': status,
|
|
'message': message})
|
|
return accounts
|
|
|
|
def get_current_show(account):
|
|
if account not in app.connections:
|
|
return 'offline'
|
|
status = app.connections[account].connected
|
|
return app.SHOW_LIST[status]
|
|
|
|
def get_iconset_path(iconset):
|
|
if os.path.isdir(os.path.join(configpaths.get('DATA'), 'iconsets', iconset)):
|
|
return os.path.join(configpaths.get('DATA'), 'iconsets', iconset)
|
|
if os.path.isdir(os.path.join(configpaths.get('MY_ICONSETS'), iconset)):
|
|
return os.path.join(configpaths.get('MY_ICONSETS'), iconset)
|
|
|
|
def get_mood_iconset_path(iconset):
|
|
if os.path.isdir(os.path.join(configpaths.get('DATA'), 'moods', iconset)):
|
|
return os.path.join(configpaths.get('DATA'), 'moods', iconset)
|
|
if os.path.isdir(
|
|
os.path.join(configpaths.get('MY_MOOD_ICONSETS'), iconset)):
|
|
return os.path.join(configpaths.get('MY_MOOD_ICONSETS'), iconset)
|
|
|
|
def get_activity_iconset_path(iconset):
|
|
if os.path.isdir(os.path.join(configpaths.get('DATA'), 'activities', iconset)):
|
|
return os.path.join(configpaths.get('DATA'), 'activities', iconset)
|
|
if os.path.isdir(os.path.join(configpaths.get('MY_ACTIVITY_ICONSETS'),
|
|
iconset)):
|
|
return os.path.join(configpaths.get('MY_ACTIVITY_ICONSETS'), iconset)
|
|
|
|
def get_transport_path(transport):
|
|
if os.path.isdir(os.path.join(configpaths.get('DATA'), 'iconsets', 'transports',
|
|
transport)):
|
|
return os.path.join(configpaths.get('DATA'), 'iconsets', 'transports', transport)
|
|
if os.path.isdir(os.path.join(configpaths.get('MY_ICONSETS'), 'transports',
|
|
transport)):
|
|
return os.path.join(configpaths.get('MY_ICONSETS'), 'transports', transport)
|
|
# No transport folder found, use default jabber one
|
|
return get_iconset_path(app.config.get('iconset'))
|
|
|
|
def prepare_and_validate_gpg_keyID(account, jid, keyID):
|
|
"""
|
|
Return an eight char long keyID that can be used with for GPG encryption
|
|
with this contact
|
|
|
|
If the given keyID is None, return UNKNOWN; if the key does not match the
|
|
assigned key XXXXXXXXMISMATCH is returned. If the key is trusted and not yet
|
|
assigned, assign it.
|
|
"""
|
|
if app.connections[account].USE_GPG:
|
|
if keyID and len(keyID) == 16:
|
|
keyID = keyID[8:]
|
|
|
|
attached_keys = app.config.get_per('accounts', account,
|
|
'attached_gpg_keys').split()
|
|
|
|
if jid in attached_keys and keyID:
|
|
attachedkeyID = attached_keys[attached_keys.index(jid) + 1]
|
|
if attachedkeyID != keyID:
|
|
# Get signing subkeys for the attached key
|
|
subkeys = []
|
|
for key in app.connections[account].gpg.list_keys():
|
|
if key['keyid'][8:] == attachedkeyID:
|
|
subkeys = [subkey[0][8:] for subkey in key['subkeys'] \
|
|
if subkey[1] == 's']
|
|
break
|
|
|
|
if keyID not in subkeys:
|
|
# Mismatch! Another gpg key was expected
|
|
keyID += 'MISMATCH'
|
|
elif jid in attached_keys:
|
|
# An unsigned presence, just use the assigned key
|
|
keyID = attached_keys[attached_keys.index(jid) + 1]
|
|
elif keyID:
|
|
full_key = app.connections[account].ask_gpg_keys(keyID=keyID)
|
|
# Assign the corresponding key, if we have it in our keyring
|
|
if full_key:
|
|
for u in app.contacts.get_contacts(account, jid):
|
|
u.keyID = keyID
|
|
keys_str = app.config.get_per('accounts', account,
|
|
'attached_gpg_keys')
|
|
keys_str += jid + ' ' + keyID + ' '
|
|
app.config.set_per('accounts', account, 'attached_gpg_keys',
|
|
keys_str)
|
|
elif keyID is None:
|
|
keyID = 'UNKNOWN'
|
|
return keyID
|
|
|
|
def update_optional_features(account=None):
|
|
if account:
|
|
accounts = [account]
|
|
else:
|
|
accounts = [a for a in app.connections]
|
|
for a in accounts:
|
|
app.gajim_optional_features[a] = []
|
|
if app.config.get_per('accounts', a, 'subscribe_mood'):
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_MOOD + '+notify')
|
|
if app.config.get_per('accounts', a, 'subscribe_activity'):
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_ACTIVITY + \
|
|
'+notify')
|
|
if app.config.get_per('accounts', a, 'publish_tune'):
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_TUNE)
|
|
if app.config.get_per('accounts', a, 'publish_location'):
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_LOCATION)
|
|
if app.config.get_per('accounts', a, 'subscribe_tune'):
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_TUNE + '+notify')
|
|
if app.config.get_per('accounts', a, 'subscribe_nick'):
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_NICK + '+notify')
|
|
if app.config.get_per('accounts', a, 'subscribe_location'):
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_LOCATION + \
|
|
'+notify')
|
|
if app.config.get('outgoing_chat_state_notifactions') != 'disabled':
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_CHATSTATES)
|
|
if not app.config.get('ignore_incoming_xhtml'):
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_XHTML_IM)
|
|
if app.config.get_per('accounts', a, 'answer_receipts'):
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_RECEIPTS)
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE)
|
|
if app.is_installed('FARSTREAM'):
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_RTP)
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_RTP_AUDIO)
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_RTP_VIDEO)
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_ICE_UDP)
|
|
app.gajim_optional_features[a].append(
|
|
nbxmpp.NS_JINGLE_FILE_TRANSFER_5)
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_XTLS)
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_BYTESTREAM)
|
|
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_IBB)
|
|
|
|
# Give plugins the possibility to add their features
|
|
app.plugin_manager.extension_point('update_caps', a)
|
|
|
|
app.caps_hash[a] = caps_cache.compute_caps_hash([app.gajim_identity],
|
|
app.gajim_common_features + app.gajim_optional_features[a])
|
|
# re-send presence with new hash
|
|
connected = app.connections[a].connected
|
|
if connected > 1 and app.SHOW_LIST[connected] != 'invisible':
|
|
app.connections[a].change_status(app.SHOW_LIST[connected],
|
|
app.connections[a].status)
|
|
|
|
def jid_is_blocked(account, jid):
|
|
con = app.connections[account]
|
|
return (jid in con.get_module('Blocking').blocked or
|
|
jid in con.get_module('PrivacyLists').blocked_contacts or
|
|
con.get_module('PrivacyLists').blocked_all)
|
|
|
|
def group_is_blocked(account, group):
|
|
con = app.connections[account]
|
|
return (group in con.get_module('PrivacyLists').blocked_groups or
|
|
con.get_module('PrivacyLists').blocked_all)
|
|
|
|
def get_subscription_request_msg(account=None):
|
|
s = app.config.get_per('accounts', account, 'subscription_request_msg')
|
|
if s:
|
|
return s
|
|
s = _('I would like to add you to my contact list.')
|
|
if account:
|
|
s = _('Hello, I am $name.') + ' ' + s
|
|
name = app.connections[account].get_module('VCardTemp').get_vard_name()
|
|
nick = app.nicks[account]
|
|
if name and nick:
|
|
name += ' (%s)' % nick
|
|
elif nick:
|
|
name = nick
|
|
s = Template(s).safe_substitute({'name': name})
|
|
return s
|
|
|
|
def replace_dataform_media(form, stanza):
|
|
found = False
|
|
for field in form.getTags('field'):
|
|
for media in field.getTags('media'):
|
|
for uri in media.getTags('uri'):
|
|
uri_data = uri.getData()
|
|
if uri_data.startswith('cid:'):
|
|
uri_data = uri_data[4:]
|
|
for data in stanza.getTags('data', namespace=nbxmpp.NS_BOB):
|
|
if data.getAttr('cid') == uri_data:
|
|
uri.setData(data.getData())
|
|
found = True
|
|
return found
|
|
|
|
def get_proxy_info(account):
|
|
p = app.config.get_per('accounts', account, 'proxy')
|
|
if not p:
|
|
if app.config.get_per('accounts', account, 'use_env_http_proxy'):
|
|
try:
|
|
try:
|
|
env_http_proxy = os.environ['HTTP_PROXY']
|
|
except Exception:
|
|
env_http_proxy = os.environ['http_proxy']
|
|
env_http_proxy = env_http_proxy.strip('"')
|
|
# Dispose of the http:// prefix
|
|
env_http_proxy = env_http_proxy.split('://')[-1]
|
|
env_http_proxy = env_http_proxy.split('@')
|
|
|
|
if len(env_http_proxy) == 2:
|
|
login = env_http_proxy[0].split(':')
|
|
addr = env_http_proxy[1].split(':')
|
|
else:
|
|
login = ['', '']
|
|
addr = env_http_proxy[0].split(':')
|
|
|
|
proxy = {'host': addr[0], 'type' : 'http', 'user':login[0]}
|
|
|
|
if len(addr) == 2:
|
|
proxy['port'] = addr[1]
|
|
else:
|
|
proxy['port'] = 3128
|
|
|
|
if len(login) == 2:
|
|
proxy['pass'] = login[1]
|
|
proxy['useauth'] = True
|
|
else:
|
|
proxy['pass'] = ''
|
|
return proxy
|
|
|
|
except Exception:
|
|
proxy = None
|
|
p = app.config.get('global_proxy')
|
|
if p and p in app.config.get_per('proxies'):
|
|
proxy = {}
|
|
proxyptr = app.config.get_per('proxies', p)
|
|
if not proxyptr:
|
|
return proxy
|
|
for key in proxyptr.keys():
|
|
proxy[key] = proxyptr[key]
|
|
return proxy
|
|
|
|
def _get_img_direct(attrs):
|
|
"""
|
|
Download an image. This function should be launched in a separated thread.
|
|
"""
|
|
mem = b''
|
|
alt = ''
|
|
max_size = 2*1024*1024
|
|
if 'max_size' in attrs:
|
|
max_size = attrs['max_size']
|
|
# Wait maximum 10s for connection
|
|
socket.setdefaulttimeout(10)
|
|
try:
|
|
req = urllib.request.Request(attrs['src'])
|
|
req.add_header('User-Agent', 'Gajim ' + app.version)
|
|
f = urllib.request.urlopen(req)
|
|
except Exception as ex:
|
|
log.debug('Error loading image %s ', attrs['src'] + str(ex))
|
|
alt = attrs.get('alt', 'Broken image')
|
|
else:
|
|
# Wait 2s between each byte
|
|
try:
|
|
f.fp._sock.fp._sock.settimeout(2)
|
|
except Exception:
|
|
pass
|
|
# On a slow internet connection with ~1000kbps you need ~10 seconds for 1 MB
|
|
deadline = time.time() + (10 * (max_size / 1048576))
|
|
while True:
|
|
if time.time() > deadline:
|
|
log.debug('Timeout loading image %s ', attrs['src'])
|
|
mem = ''
|
|
alt = attrs.get('alt', '')
|
|
if alt:
|
|
alt += '\n'
|
|
alt += _('Timeout loading image')
|
|
break
|
|
try:
|
|
temp = f.read(100)
|
|
except socket.timeout as ex:
|
|
log.debug('Timeout loading image %s ', attrs['src'] + str(ex))
|
|
alt = attrs.get('alt', '')
|
|
if alt:
|
|
alt += '\n'
|
|
alt += _('Timeout loading image')
|
|
break
|
|
if temp:
|
|
mem += temp
|
|
else:
|
|
break
|
|
if len(mem) > max_size:
|
|
alt = attrs.get('alt', '')
|
|
if alt:
|
|
alt += '\n'
|
|
alt += _('Image is too big')
|
|
break
|
|
f.close()
|
|
return (mem, alt)
|
|
|
|
def _get_img_proxy(attrs, proxy):
|
|
"""
|
|
Download an image through a proxy. This function should be launched in a
|
|
separated thread.
|
|
"""
|
|
if not app.is_installed('PYCURL'):
|
|
return '', _('PyCURL is not installed')
|
|
alt, max_size = '', 2*1024*1024
|
|
if 'max_size' in attrs:
|
|
max_size = attrs['max_size']
|
|
try:
|
|
b = StringIO()
|
|
c = pycurl.Curl()
|
|
c.setopt(pycurl.URL, attrs['src'].encode('utf-8'))
|
|
c.setopt(pycurl.FOLLOWLOCATION, 1)
|
|
# Wait maximum 10s for connection
|
|
c.setopt(pycurl.CONNECTTIMEOUT, 10)
|
|
# On a slow internet connection with ~1000kbps you need ~10 seconds for 1 MB
|
|
c.setopt(pycurl.TIMEOUT, 10 * (max_size / 1048576))
|
|
c.setopt(pycurl.MAXFILESIZE, max_size)
|
|
c.setopt(pycurl.WRITEFUNCTION, b.write)
|
|
c.setopt(pycurl.USERAGENT, 'Gajim ' + app.version)
|
|
# set proxy
|
|
c.setopt(pycurl.PROXY, proxy['host'].encode('utf-8'))
|
|
c.setopt(pycurl.PROXYPORT, proxy['port'])
|
|
if proxy['useauth']:
|
|
c.setopt(pycurl.PROXYUSERPWD, proxy['user'].encode('utf-8')\
|
|
+ ':' + proxy['pass'].encode('utf-8'))
|
|
c.setopt(pycurl.PROXYAUTH, pycurl.HTTPAUTH_ANY)
|
|
if proxy['type'] == 'http':
|
|
c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_HTTP)
|
|
elif proxy['type'] == 'socks5':
|
|
c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5)
|
|
c.close()
|
|
t = b.getvalue()
|
|
return (t, attrs.get('alt', ''))
|
|
except pycurl.error as ex:
|
|
alt = attrs.get('alt', '')
|
|
if alt:
|
|
alt += '\n'
|
|
if ex.errno == pycurl.E_FILESIZE_EXCEEDED:
|
|
alt += _('Image is too big')
|
|
elif ex.errno == pycurl.E_OPERATION_TIMEOUTED:
|
|
alt += _('Timeout loading image')
|
|
else:
|
|
alt += _('Error loading image')
|
|
except Exception as ex:
|
|
log.debug('Error loading image %s ', attrs['src'] + str(ex))
|
|
alt = attrs.get('alt', 'Broken image')
|
|
return ('', alt)
|
|
|
|
def download_image(account, attrs):
|
|
proxy = get_proxy_info(account)
|
|
if proxy and proxy['type'] in ('http', 'socks5'):
|
|
return _get_img_proxy(attrs, proxy)
|
|
return _get_img_direct(attrs)
|
|
|
|
def version_condition(current_version, required_version):
|
|
if V(current_version) < V(required_version):
|
|
return False
|
|
return True
|
|
|
|
def get_available_emoticon_themes():
|
|
emoticons_themes = ['font']
|
|
files = []
|
|
dir_iterator = os.scandir(configpaths.get('EMOTICONS'))
|
|
for folder in dir_iterator:
|
|
if not folder.is_dir():
|
|
continue
|
|
file_iterator = os.scandir(folder.path)
|
|
for theme in file_iterator:
|
|
if theme.is_file():
|
|
files.append(theme.name)
|
|
|
|
if os.path.isdir(configpaths.get('MY_EMOTS')):
|
|
files += os.listdir(configpaths.get('MY_EMOTS'))
|
|
|
|
for file in files:
|
|
if file.endswith('.png'):
|
|
emoticons_themes.append(file[:-4])
|
|
emoticons_themes.sort()
|
|
return emoticons_themes
|
|
|
|
def call_counter(func):
|
|
def helper(self, restart=False):
|
|
if restart:
|
|
self._connect_machine_calls = 0
|
|
self._connect_machine_calls += 1
|
|
return func(self, restart=False)
|
|
return helper
|
|
|
|
def get_sync_threshold(jid, archive_info):
|
|
if archive_info is None or archive_info.sync_threshold is None:
|
|
if muc_caps_cache.supports(jid, 'muc#roomconfig_membersonly'):
|
|
threshold = app.config.get('private_room_sync_threshold')
|
|
else:
|
|
threshold = app.config.get('public_room_sync_threshold')
|
|
app.logger.set_archive_infos(jid, sync_threshold=threshold)
|
|
return threshold
|
|
return archive_info.sync_threshold
|
|
|
|
def load_json(path, key=None, default=None):
|
|
try:
|
|
with open(path, 'r') as file:
|
|
json_dict = json.loads(file.read())
|
|
except Exception:
|
|
log.exception('Parsing error')
|
|
return default
|
|
|
|
if key is None:
|
|
return json_dict
|
|
return json_dict.get(key, default)
|