gajim-plural/gajim/common/helpers.py

1495 lines
51 KiB
Python

# Copyright (C) 2003-2014 Yann Leboulanger <asterix AT lagaule.org>
# Copyright (C) 2005-2006 Dimitur Kirov <dkirov AT gmail.com>
# Nikos Kouremenos <kourem AT gmail.com>
# Copyright (C) 2006 Alex Mauer <hawke AT hawkesnest.net>
# Copyright (C) 2006-2007 Travis Shirk <travis AT pobox.com>
# Copyright (C) 2006-2008 Jean-Marie Traissard <jim AT lapin.org>
# Copyright (C) 2007 Lukas Petrovicky <lukas AT petrovicky.net>
# James Newton <redshodan AT gmail.com>
# Julien Pivotto <roidelapluie AT gmail.com>
# Copyright (C) 2007-2008 Stephan Erb <steve-e AT h3c.de>
# Copyright (C) 2008 Brendan Taylor <whateley AT gmail.com>
# Jonathan Schleifer <js-gajim AT webkeks.org>
#
# This file is part of Gajim.
#
# Gajim is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# Gajim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Gajim. If not, see <http://www.gnu.org/licenses/>.
import sys
import re
import os
import subprocess
import urllib
import webbrowser
import errno
import select
import base64
import hashlib
import shlex
import socket
import time
import logging
from datetime import datetime, timedelta
from distutils.version import LooseVersion as V
from encodings.punycode import punycode_encode
from string import Template
import nbxmpp
from gajim.common import caps_cache
from gajim.common import configpaths
from gajim.common.i18n import Q_
from gajim.common.i18n import _
from gajim.common.i18n import ngettext
from gajim.common.caps_cache import muc_caps_cache
try:
import precis_i18n.codec # pylint: disable=unused-import
HAS_PRECIS_I18N = True
except ImportError:
HAS_PRECIS_I18N = False
HAS_SOUND = True
if sys.platform == 'win32':
try:
import winsound # windows-only built-in module for playing wav
except ImportError:
HAS_SOUND = False
print('Gajim is not able to playback sound because'
'pywin32 is missing', file=sys.stderr)
elif sys.platform == 'darwin':
try:
from AppKit import NSSound
except ImportError:
HAS_SOUND = False
print('Gajim is not able to playback sound because'
'pyobjc is missing', file=sys.stderr)
try:
import wave # posix-only fallback wav playback
import ossaudiodev as oss
except Exception:
pass
log = logging.getLogger('gajim.c.helpers')
special_groups = (_('Transports'), _('Not in Roster'), _('Observers'), _('Groupchats'))
class InvalidFormat(Exception):
pass
def decompose_jid(jidstring):
user = None
server = None
resource = None
# Search for delimiters
user_sep = jidstring.find('@')
res_sep = jidstring.find('/')
if user_sep == -1:
if res_sep == -1:
# host
server = jidstring
else:
# host/resource
server = jidstring[0:res_sep]
resource = jidstring[res_sep + 1:]
else:
if res_sep == -1:
# user@host
user = jidstring[0:user_sep]
server = jidstring[user_sep + 1:]
else:
if user_sep < res_sep:
# user@host/resource
user = jidstring[0:user_sep]
server = jidstring[user_sep + 1:user_sep + (res_sep - user_sep)]
resource = jidstring[res_sep + 1:]
else:
# server/resource (with an @ in resource)
server = jidstring[0:res_sep]
resource = jidstring[res_sep + 1:]
return user, server, resource
def parse_jid(jidstring):
"""
Perform stringprep on all JID fragments from a string and return the full
jid
"""
# This function comes from http://svn.twistedmatrix.com/cvs/trunk/twisted/words/protocols/jabber/jid.py
return prep(*decompose_jid(jidstring))
def idn_to_ascii(host):
"""
Convert IDN (Internationalized Domain Names) to ACE (ASCII-compatible
encoding)
"""
from encodings import idna
labels = idna.dots.split(host)
converted_labels = []
for label in labels:
if label:
converted_labels.append(idna.ToASCII(label).decode('utf-8'))
else:
converted_labels.append('')
return ".".join(converted_labels)
def ascii_to_idn(host):
"""
Convert ACE (ASCII-compatible encoding) to IDN (Internationalized Domain
Names)
"""
from encodings import idna
labels = idna.dots.split(host)
converted_labels = []
for label in labels:
converted_labels.append(idna.ToUnicode(label))
return ".".join(converted_labels)
def puny_encode_url(url):
_url = url
if '//' not in _url:
_url = '//' + _url
try:
o = urllib.parse.urlparse(_url)
p_loc = idn_to_ascii(o.netloc)
except Exception:
log.debug('urlparse failed: %s', url)
return False
return url.replace(o.netloc, p_loc)
def parse_resource(resource):
"""
Perform stringprep on resource and return it
"""
if resource:
try:
if HAS_PRECIS_I18N:
return resource.encode('Nickname').decode('utf-8')
from nbxmpp.stringprepare import resourceprep
return resourceprep.prepare(resource)
except UnicodeError:
raise InvalidFormat('Invalid character in resource.')
def prep(user, server, resource):
"""
Perform stringprep on all JID fragments and return the full jid
"""
# This function comes from
#http://svn.twistedmatrix.com/cvs/trunk/twisted/words/protocols/jabber/jid.py
ip_address = False
try:
socket.inet_aton(server)
ip_address = True
except socket.error:
pass
if not ip_address and hasattr(socket, 'inet_pton'):
try:
socket.inet_pton(socket.AF_INET6, server.strip('[]'))
server = '[%s]' % server.strip('[]')
ip_address = True
except (socket.error, ValueError):
pass
if not ip_address:
if server is not None:
if server.endswith('.'): # RFC7622, 3.2
server = server[:-1]
if not server or len(server.encode('utf-8')) > 1023:
raise InvalidFormat(_('Server must be between 1 and 1023 bytes'))
try:
from nbxmpp.stringprepare import nameprep
server = nameprep.prepare(server)
except UnicodeError:
raise InvalidFormat(_('Invalid character in hostname.'))
else:
raise InvalidFormat(_('Server address required.'))
if user is not None:
if not user or len(user.encode('utf-8')) > 1023:
raise InvalidFormat(_('Username must be between 1 and 1023 bytes'))
try:
if HAS_PRECIS_I18N:
user = user.encode('UsernameCaseMapped').decode('utf-8')
else:
from nbxmpp.stringprepare import nodeprep
user = nodeprep.prepare(user)
except UnicodeError:
raise InvalidFormat(_('Invalid character in username.'))
else:
user = None
if resource is not None:
if not resource or len(resource.encode('utf-8')) > 1023:
raise InvalidFormat(_('Resource must be between 1 and 1023 bytes'))
try:
if HAS_PRECIS_I18N:
resource = resource.encode('OpaqueString').decode('utf-8')
else:
from nbxmpp.stringprepare import resourceprep
resource = resourceprep.prepare(resource)
except UnicodeError:
raise InvalidFormat(_('Invalid character in resource.'))
else:
resource = None
if user:
if resource:
return '%s@%s/%s' % (user, server, resource)
return '%s@%s' % (user, server)
if resource:
return '%s/%s' % (server, resource)
return server
def windowsify(s):
if os.name == 'nt':
return s.capitalize()
return s
def temp_failure_retry(func, *args, **kwargs):
while True:
try:
return func(*args, **kwargs)
except (os.error, IOError, select.error) as ex:
if ex.errno == errno.EINTR:
continue
else:
raise
def get_uf_show(show, use_mnemonic=False):
"""
Return a userfriendly string for dnd/xa/chat and make all strings
translatable
If use_mnemonic is True, it adds _ so GUI should call with True for
accessibility issues
"""
if show == 'dnd':
if use_mnemonic:
uf_show = _('_Busy')
else:
uf_show = _('Busy')
elif show == 'xa':
if use_mnemonic:
uf_show = _('_Not Available')
else:
uf_show = _('Not Available')
elif show == 'chat':
if use_mnemonic:
uf_show = _('_Free for Chat')
else:
uf_show = _('Free for Chat')
elif show == 'online':
if use_mnemonic:
uf_show = Q_('?user status:_Available')
else:
uf_show = Q_('?user status:Available')
elif show == 'connecting':
uf_show = _('Connecting')
elif show == 'away':
if use_mnemonic:
uf_show = _('A_way')
else:
uf_show = _('Away')
elif show == 'offline':
if use_mnemonic:
uf_show = _('_Offline')
else:
uf_show = _('Offline')
elif show == 'invisible':
if use_mnemonic:
uf_show = _('_Invisible')
else:
uf_show = _('Invisible')
elif show == 'not in roster':
uf_show = _('Not in Roster')
elif show == 'requested':
uf_show = Q_('?contact has status:Unknown')
else:
uf_show = Q_('?contact has status:Has errors')
return uf_show
def get_css_show_color(show):
if show in ('online', 'chat', 'invisible'):
return 'status-online'
if show in ('offline', 'not in roster', 'requested'):
return None
if show in ('xa', 'dnd'):
return 'status-dnd'
if show == 'away':
return 'status-away'
def get_uf_sub(sub):
if sub == 'none':
uf_sub = Q_('?Subscription we already have:None')
elif sub == 'to':
uf_sub = _('To')
elif sub == 'from':
uf_sub = _('From')
elif sub == 'both':
uf_sub = _('Both')
else:
uf_sub = _('Unknown')
return uf_sub
def get_uf_ask(ask):
if ask is None:
uf_ask = Q_('?Ask (for Subscription):None')
elif ask == 'subscribe':
uf_ask = _('Subscribe')
else:
uf_ask = ask
return uf_ask
def get_uf_role(role, plural=False):
''' plural determines if you get Moderators or Moderator'''
if role == 'none':
role_name = Q_('?Group Chat Contact Role:None')
elif role == 'moderator':
if plural:
role_name = _('Moderators')
else:
role_name = _('Moderator')
elif role == 'participant':
if plural:
role_name = _('Participants')
else:
role_name = _('Participant')
elif role == 'visitor':
if plural:
role_name = _('Visitors')
else:
role_name = _('Visitor')
return role_name
def get_uf_affiliation(affiliation):
'''Get a nice and translated affilition for muc'''
if affiliation == 'none':
affiliation_name = Q_('?Group Chat Contact Affiliation:None')
elif affiliation == 'owner':
affiliation_name = _('Owner')
elif affiliation == 'admin':
affiliation_name = _('Administrator')
elif affiliation == 'member':
affiliation_name = _('Member')
else: # Argl ! An unknown affiliation !
affiliation_name = affiliation.capitalize()
return affiliation_name
def get_sorted_keys(adict):
keys = sorted(adict.keys())
return keys
def to_one_line(msg):
msg = msg.replace('\\', '\\\\')
msg = msg.replace('\n', '\\n')
# s1 = 'test\ntest\\ntest'
# s11 = s1.replace('\\', '\\\\')
# s12 = s11.replace('\n', '\\n')
# s12
# 'test\\ntest\\\\ntest'
return msg
def from_one_line(msg):
# (?<!\\) is a lookbehind assertion which asks anything but '\'
# to match the regexp that follows it
# So here match '\\n' but not if you have a '\' before that
expr = re.compile(r'(?<!\\)\\n')
msg = expr.sub('\n', msg)
msg = msg.replace('\\\\', '\\')
# s12 = 'test\\ntest\\\\ntest'
# s13 = re.sub('\n', s12)
# s14 s13.replace('\\\\', '\\')
# s14
# 'test\ntest\\ntest'
return msg
def get_uf_chatstate(chatstate):
"""
Remove chatstate jargon and returns user friendly messages
"""
if chatstate == 'active':
return _('is paying attention to the conversation')
if chatstate == 'inactive':
return _('is doing something else')
if chatstate == 'composing':
return _('is composing a message…')
if chatstate == 'paused':
#paused means he or she was composing but has stopped for a while
return _('paused composing a message')
if chatstate == 'gone':
return _('has closed the chat window or tab')
return ''
def is_in_path(command, return_abs_path=False):
"""
Return True if 'command' is found in one of the directories in the user's
path. If 'return_abs_path' is True, return the absolute path of the first
found command instead. Return False otherwise and on errors
"""
for directory in os.getenv('PATH').split(os.pathsep):
try:
if command in os.listdir(directory):
if return_abs_path:
return os.path.join(directory, command)
return True
except OSError:
# If the user has non directories in his path
pass
return False
def exec_command(command, use_shell=False, posix=True):
"""
execute a command. if use_shell is True, we run the command as is it was
typed in a console. So it may be dangerous if you are not sure about what
is executed.
"""
if use_shell:
subprocess.Popen('%s &' % command, shell=True).wait()
else:
args = shlex.split(command, posix=posix)
p = subprocess.Popen(args)
app.thread_interface(p.wait)
def build_command(executable, parameter):
# we add to the parameter (can hold path with spaces)
# "" so we have good parsing from shell
parameter = parameter.replace('"', '\\"') # but first escape "
command = '%s "%s"' % (executable, parameter)
return command
def get_file_path_from_dnd_dropped_uri(uri):
path = urllib.parse.unquote(uri) # escape special chars
path = path.strip('\r\n\x00') # remove \r\n and NULL
# get the path to file
if re.match('^file:///[a-zA-Z]:/', path): # windows
path = path[8:] # 8 is len('file:///')
elif path.startswith('file://'): # nautilus, rox
path = path[7:] # 7 is len('file://')
elif path.startswith('file:'): # xffm
path = path[5:] # 5 is len('file:')
return path
def get_xmpp_show(show):
if show in ('online', 'offline'):
return None
return show
def sanitize_filename(filename):
"""
Make sure the filename we will write does contain only acceptable and latin
characters, and is not too long (in that case hash it)
"""
# 48 is the limit
if len(filename) > 48:
hash_ = hashlib.md5(filename.encode('utf-8'))
filename = base64.b64encode(hash_.digest()).decode('utf-8')
# make it latin chars only
filename = punycode_encode(filename).decode('utf-8')
filename = filename.replace('/', '_')
if os.name == 'nt':
filename = filename.replace('?', '_').replace(':', '_')\
.replace('\\', '_').replace('"', "'").replace('|', '_')\
.replace('*', '_').replace('<', '_').replace('>', '_')
return filename
def reduce_chars_newlines(text, max_chars=0, max_lines=0):
"""
Cut the chars after 'max_chars' on each line and show only the first
'max_lines'
If any of the params is not present (None or 0) the action on it is not
performed
"""
def _cut_if_long(string):
if len(string) > max_chars:
string = string[:max_chars - 3] + ''
return string
if max_lines == 0:
lines = text.split('\n')
else:
lines = text.split('\n', max_lines)[:max_lines]
if max_chars > 0:
if lines:
lines = [_cut_if_long(e) for e in lines]
if lines:
reduced_text = '\n'.join(lines)
if reduced_text != text:
reduced_text += ''
else:
reduced_text = ''
return reduced_text
def get_account_status(account):
status = reduce_chars_newlines(account['status_line'], 100, 1)
return status
def datetime_tuple(timestamp):
"""
Convert timestamp using strptime and the format: %Y%m%dT%H:%M:%S
Because of various datetime formats are used the following exceptions
are handled:
- Optional milliseconds appened to the string are removed
- Optional Z (that means UTC) appened to the string are removed
- XEP-082 datetime strings have all '-' cahrs removed to meet
the above format.
"""
date, tim = timestamp.split('T', 1)
date = date.replace('-', '')
tim = tim.replace('z', '')
tim = tim.replace('Z', '')
zone = None
if '+' in tim:
sign = -1
tim, zone = tim.split('+', 1)
if '-' in tim:
sign = 1
tim, zone = tim.split('-', 1)
tim = tim.split('.')[0]
tim = time.strptime(date + 'T' + tim, '%Y%m%dT%H:%M:%S')
if zone:
zone = zone.replace(':', '')
tim = datetime.fromtimestamp(time.mktime(tim))
if len(zone) > 2:
zone = time.strptime(zone, '%H%M')
else:
zone = time.strptime(zone, '%H')
zone = timedelta(hours=zone.tm_hour, minutes=zone.tm_min)
tim += zone * sign
tim = tim.timetuple()
return tim
from gajim.common import app
if app.is_installed('PYCURL'):
import pycurl
from io import StringIO
def convert_bytes(string):
suffix = ''
# IEC standard says KiB = 1024 bytes KB = 1000 bytes
# but do we use the standard?
use_kib_mib = app.config.get('use_kib_mib')
align = 1024.
bytes_ = float(string)
if bytes_ >= align:
bytes_ = round(bytes_/align, 1)
if bytes_ >= align:
bytes_ = round(bytes_/align, 1)
if bytes_ >= align:
bytes_ = round(bytes_/align, 1)
if use_kib_mib:
#GiB means gibibyte
suffix = _('%s GiB')
else:
#GB means gigabyte
suffix = _('%s GB')
else:
if use_kib_mib:
#MiB means mibibyte
suffix = _('%s MiB')
else:
#MB means megabyte
suffix = _('%s MB')
else:
if use_kib_mib:
#KiB means kibibyte
suffix = _('%s KiB')
else:
#KB means kilo bytes
suffix = _('%s KB')
else:
#B means bytes
suffix = _('%s B')
return suffix % str(bytes_)
def get_contact_dict_for_account(account):
"""
Create a dict of jid, nick -> contact with all contacts of account.
Can be used for completion lists
"""
contacts_dict = {}
for jid in app.contacts.get_jid_list(account):
contact = app.contacts.get_contact_with_highest_priority(account,
jid)
contacts_dict[jid] = contact
name = contact.name
if name in contacts_dict:
contact1 = contacts_dict[name]
del contacts_dict[name]
contacts_dict['%s (%s)' % (name, contact1.jid)] = contact1
contacts_dict['%s (%s)' % (name, jid)] = contact
elif contact.name:
if contact.name == app.get_nick_from_jid(jid):
del contacts_dict[jid]
contacts_dict[name] = contact
return contacts_dict
def launch_browser_mailer(kind, uri):
# kind = 'url' or 'mail'
if kind == 'url' and uri.startswith('file://'):
launch_file_manager(uri)
return
if kind in ('mail', 'sth_at_sth') and not uri.startswith('mailto:'):
uri = 'mailto:' + uri
if kind == 'url' and uri.startswith('www.'):
uri = 'http://' + uri
if not app.config.get('autodetect_browser_mailer'):
if kind == 'url':
command = app.config.get('custombrowser')
elif kind in ('mail', 'sth_at_sth'):
command = app.config.get('custommailapp')
if command == '': # if no app is configured
return
command = build_command(command, uri)
try:
exec_command(command)
except Exception:
pass
else:
webbrowser.open(uri)
def launch_file_manager(path_to_open):
if os.name == 'nt':
try:
os.startfile(path_to_open) # if pywin32 is installed we open
except Exception:
pass
else:
if not app.config.get('autodetect_browser_mailer'):
command = app.config.get('custom_file_manager')
if command == '': # if no app is configured
return
else:
command = 'xdg-open'
command = build_command(command, path_to_open)
try:
exec_command(command)
except Exception:
pass
def play_sound(event):
if not app.config.get('sounds_on'):
return
path_to_soundfile = app.config.get_per('soundevents', event, 'path')
play_sound_file(path_to_soundfile)
def check_soundfile_path(file_, dirs=None):
"""
Check if the sound file exists
:param file_: the file to check, absolute or relative to 'dirs' path
:param dirs: list of knows paths to fallback if the file doesn't exists
(eg: ~/.gajim/sounds/, DATADIR/sounds...).
:return the path to file or None if it doesn't exists.
"""
if dirs is None:
dirs = [configpaths.get('MY_DATA'),
configpaths.get('DATA')]
if not file_:
return None
if os.path.exists(file_):
return file_
for d in dirs:
d = os.path.join(d, 'sounds', file_)
if os.path.exists(d):
return d
return None
def strip_soundfile_path(file_, dirs=None, abs_=True):
"""
Remove knowns paths from a sound file
Filechooser returns absolute path. If path is a known fallback path, we remove it.
So config have no hardcoded path to DATA_DIR and text in textfield is shorther.
param: file_: the filename to strip.
param: dirs: list of knowns paths from which the filename should be stripped.
param: abs_: force absolute path on dirs
"""
if not file_:
return None
if dirs is None:
dirs = [configpaths.get('MY_DATA'),
configpaths.get('DATA')]
name = os.path.basename(file_)
for d in dirs:
d = os.path.join(d, 'sounds', name)
if abs_:
d = os.path.abspath(d)
if file_ == d:
return name
return file_
def play_sound_file(path_to_soundfile):
if path_to_soundfile == 'beep':
exec_command('beep')
return
path_to_soundfile = check_soundfile_path(path_to_soundfile)
if path_to_soundfile is None:
return
if sys.platform == 'win32' and HAS_SOUND:
try:
winsound.PlaySound(path_to_soundfile,
winsound.SND_FILENAME|winsound.SND_ASYNC)
except Exception:
log.exception('Sound Playback Error')
elif sys.platform == 'linux':
if app.config.get('soundplayer') == '':
def _oss_play():
sndfile = wave.open(path_to_soundfile, 'rb')
nc, sw, fr, nf, _comptype, _compname = sndfile.getparams()
dev = oss.open('/dev/dsp', 'w')
dev.setparameters(sw * 8, nc, fr)
dev.write(sndfile.readframes(nf))
sndfile.close()
dev.close()
app.thread_interface(_oss_play)
return
player = app.config.get('soundplayer')
command = build_command(player, path_to_soundfile)
exec_command(command)
elif sys.platform == 'darwin' and HAS_SOUND:
sound = NSSound.alloc()
sound.initWithContentsOfFile_byReference_(path_to_soundfile, True)
sound.play()
def get_global_show():
maxi = 0
for account in app.connections:
if not app.config.get_per('accounts', account,
'sync_with_global_status'):
continue
connected = app.connections[account].connected
if connected > maxi:
maxi = connected
return app.SHOW_LIST[maxi]
def get_global_status():
maxi = 0
for account in app.connections:
if not app.config.get_per('accounts', account,
'sync_with_global_status'):
continue
connected = app.connections[account].connected
if connected > maxi:
maxi = connected
status = app.connections[account].status
return status
def statuses_unified():
"""
Test if all statuses are the same
"""
reference = None
for account in app.connections:
if not app.config.get_per('accounts', account,
'sync_with_global_status'):
continue
if reference is None:
reference = app.connections[account].connected
elif reference != app.connections[account].connected:
return False
return True
def get_icon_name_to_show(contact, account=None):
"""
Get the icon name to show in online, away, requested, etc
"""
if account and app.events.get_nb_roster_events(account, contact.jid):
return 'event'
if account and app.events.get_nb_roster_events(
account, contact.get_full_jid()):
return 'event'
if account and account in app.interface.minimized_controls and \
contact.jid in app.interface.minimized_controls[account] and app.interface.\
minimized_controls[account][contact.jid].get_nb_unread_pm() > 0:
return 'event'
if account and contact.jid in app.gc_connected[account]:
if app.gc_connected[account][contact.jid]:
return 'muc_active'
return 'muc_inactive'
if contact.jid.find('@') <= 0: # if not '@' or '@' starts the jid ==> agent
return contact.show
if contact.sub in ('both', 'to'):
return contact.show
if contact.ask == 'subscribe':
return 'requested'
transport = app.get_transport_name_from_jid(contact.jid)
if transport:
return contact.show
if contact.show in app.SHOW_LIST:
return contact.show
return 'not in roster'
def get_full_jid_from_iq(iq_obj):
"""
Return the full jid (with resource) from an iq
"""
jid = iq_obj.getFrom()
if jid is None:
return None
return parse_jid(str(iq_obj.getFrom()))
def get_jid_from_iq(iq_obj):
"""
Return the jid (without resource) from an iq
"""
jid = get_full_jid_from_iq(iq_obj)
return app.get_jid_without_resource(jid)
def get_auth_sha(sid, initiator, target):
"""
Return sha of sid + initiator + target used for proxy auth
"""
return hashlib.sha1(("%s%s%s" % (sid, initiator, target)).encode('utf-8')).\
hexdigest()
def remove_invalid_xml_chars(string):
if string:
string = re.sub(app.interface.invalid_XML_chars_re, '', string)
return string
def get_random_string_16():
"""
Create random string of length 16
"""
rng = list(range(65, 90))
rng.extend(range(48, 57))
char_sequence = [chr(e) for e in rng]
from random import sample
return ''.join(sample(char_sequence, 16))
def get_os_info():
if app.os_info:
return app.os_info
app.os_info = 'N/A'
if os.name == 'nt' or sys.platform == 'darwin':
import platform
app.os_info = platform.system() + " " + platform.release()
elif os.name == 'posix':
try:
import distro
app.os_info = distro.name(pretty=True)
except ImportError:
import platform
app.os_info = platform.system()
return app.os_info
def allow_showing_notification(account, type_='notify_on_new_message',
is_first_message=True):
"""
Is it allowed to show nofication?
Check OUR status and if we allow notifications for that status type is the
option that need to be True e.g.: notify_on_signing is_first_message: set it
to false when it's not the first message
"""
if type_ and (not app.config.get(type_) or not is_first_message):
return False
if app.config.get('autopopupaway'): # always show notification
return True
if app.connections[account].connected in (2, 3): # we're online or chat
return True
return False
def allow_popup_window(account):
"""
Is it allowed to popup windows?
"""
autopopup = app.config.get('autopopup')
autopopupaway = app.config.get('autopopupaway')
if autopopup and (autopopupaway or \
app.connections[account].connected in (2, 3)): # we're online or chat
return True
return False
def allow_sound_notification(account, sound_event):
if app.config.get('sounddnd') or app.connections[account].connected != \
app.SHOW_LIST.index('dnd') and app.config.get_per('soundevents',
sound_event, 'enabled'):
return True
return False
def get_chat_control(account, contact):
full_jid_with_resource = contact.jid
if contact.resource:
full_jid_with_resource += '/' + contact.resource
highest_contact = app.contacts.get_contact_with_highest_priority(
account, contact.jid)
# Look for a chat control that has the given resource, or default to
# one without resource
ctrl = app.interface.msg_win_mgr.get_control(full_jid_with_resource,
account)
if ctrl:
return ctrl
if (highest_contact and
highest_contact.resource and
contact.resource != highest_contact.resource):
return None
# unknown contact or offline message
return app.interface.msg_win_mgr.get_control(contact.jid, account)
def get_notification_icon_tooltip_dict():
"""
Return a dict of the form {acct: {'show': show, 'message': message,
'event_lines': [list of text lines to show in tooltip]}
"""
# How many events must there be before they're shown summarized, not per-user
max_ungrouped_events = 10
accounts = get_accounts_info()
# Gather events. (With accounts, when there are more.)
for account in accounts:
account_name = account['name']
account['event_lines'] = []
# Gather events per-account
pending_events = app.events.get_events(account=account_name)
messages, non_messages, total_messages, total_non_messages = {}, {}, 0, 0
for jid in pending_events:
for event in pending_events[jid]:
if event.type_.count('file') > 0:
# This is a non-messagee event.
messages[jid] = non_messages.get(jid, 0) + 1
total_non_messages = total_non_messages + 1
else:
# This is a message.
messages[jid] = messages.get(jid, 0) + 1
total_messages = total_messages + 1
# Display unread messages numbers, if any
if total_messages > 0:
if total_messages > max_ungrouped_events:
text = ngettext(
'%d message pending',
'%d messages pending',
total_messages, total_messages, total_messages)
account['event_lines'].append(text)
else:
for jid in messages:
text = ngettext(
'%d message pending',
'%d messages pending',
messages[jid], messages[jid], messages[jid])
contact = app.contacts.get_first_contact_from_jid(
account['name'], jid)
text += ' '
if jid in app.gc_connected[account['name']]:
text += _('from room %s') % (jid)
elif contact:
name = contact.get_shown_name()
text += _('from user %s') % (name)
else:
text += _('from %s') % (jid)
account['event_lines'].append(text)
# Display unseen events numbers, if any
if total_non_messages > 0:
if total_non_messages > max_ungrouped_events:
text = ngettext(
'%d event pending',
'%d events pending',
total_non_messages, total_non_messages, total_non_messages)
account['event_lines'].append(text)
else:
for jid in non_messages:
text = ngettext('%d event pending', '%d events pending',
non_messages[jid], non_messages[jid], non_messages[jid])
text += ' ' + _('from user %s') % (jid)
account[account]['event_lines'].append(text)
return accounts
def get_notification_icon_tooltip_text():
text = None
# How many events must there be before they're shown summarized, not per-user
# max_ungrouped_events = 10
# Character which should be used to indent in the tooltip.
indent_with = ' '
accounts = get_notification_icon_tooltip_dict()
if not accounts:
# No configured account
return _('Gajim')
# at least one account present
# Is there more that one account?
if len(accounts) == 1:
show_more_accounts = False
else:
show_more_accounts = True
# If there is only one account, its status is shown on the first line.
if show_more_accounts:
text = _('Gajim')
else:
text = _('Gajim - %s') % (get_account_status(accounts[0]))
# Gather and display events. (With accounts, when there are more.)
for account in accounts:
account_name = account['name']
# Set account status, if not set above
if show_more_accounts:
message = '\n' + indent_with + ' %s - %s'
text += message % (account_name, get_account_status(account))
# Account list shown, messages need to be indented more
indent_how = 2
else:
# If no account list is shown, messages could have default indenting.
indent_how = 1
for line in account['event_lines']:
text += '\n' + indent_with * indent_how + ' '
text += line
return text
def get_accounts_info():
"""
Helper for notification icon tooltip
"""
accounts = []
accounts_list = sorted(app.contacts.get_accounts())
for account in accounts_list:
status_idx = app.connections[account].connected
# uncomment the following to hide offline accounts
# if status_idx == 0: continue
status = app.SHOW_LIST[status_idx]
message = app.connections[account].status
single_line = get_uf_show(status)
if message is None:
message = ''
else:
message = message.strip()
if message != '':
single_line += ': ' + message
account_label = app.get_account_label(account)
accounts.append({'name': account,
'account_label': account_label,
'status_line': single_line,
'show': status,
'message': message})
return accounts
def get_current_show(account):
if account not in app.connections:
return 'offline'
status = app.connections[account].connected
return app.SHOW_LIST[status]
def get_iconset_path(iconset):
if os.path.isdir(os.path.join(configpaths.get('DATA'), 'iconsets', iconset)):
return os.path.join(configpaths.get('DATA'), 'iconsets', iconset)
if os.path.isdir(os.path.join(configpaths.get('MY_ICONSETS'), iconset)):
return os.path.join(configpaths.get('MY_ICONSETS'), iconset)
def get_mood_iconset_path(iconset):
if os.path.isdir(os.path.join(configpaths.get('DATA'), 'moods', iconset)):
return os.path.join(configpaths.get('DATA'), 'moods', iconset)
if os.path.isdir(
os.path.join(configpaths.get('MY_MOOD_ICONSETS'), iconset)):
return os.path.join(configpaths.get('MY_MOOD_ICONSETS'), iconset)
def get_activity_iconset_path(iconset):
if os.path.isdir(os.path.join(configpaths.get('DATA'), 'activities', iconset)):
return os.path.join(configpaths.get('DATA'), 'activities', iconset)
if os.path.isdir(os.path.join(configpaths.get('MY_ACTIVITY_ICONSETS'),
iconset)):
return os.path.join(configpaths.get('MY_ACTIVITY_ICONSETS'), iconset)
def get_transport_path(transport):
if os.path.isdir(os.path.join(configpaths.get('DATA'), 'iconsets', 'transports',
transport)):
return os.path.join(configpaths.get('DATA'), 'iconsets', 'transports', transport)
if os.path.isdir(os.path.join(configpaths.get('MY_ICONSETS'), 'transports',
transport)):
return os.path.join(configpaths.get('MY_ICONSETS'), 'transports', transport)
# No transport folder found, use default jabber one
return get_iconset_path(app.config.get('iconset'))
def prepare_and_validate_gpg_keyID(account, jid, keyID):
"""
Return an eight char long keyID that can be used with for GPG encryption
with this contact
If the given keyID is None, return UNKNOWN; if the key does not match the
assigned key XXXXXXXXMISMATCH is returned. If the key is trusted and not yet
assigned, assign it.
"""
if app.connections[account].USE_GPG:
if keyID and len(keyID) == 16:
keyID = keyID[8:]
attached_keys = app.config.get_per('accounts', account,
'attached_gpg_keys').split()
if jid in attached_keys and keyID:
attachedkeyID = attached_keys[attached_keys.index(jid) + 1]
if attachedkeyID != keyID:
# Get signing subkeys for the attached key
subkeys = []
for key in app.connections[account].gpg.list_keys():
if key['keyid'][8:] == attachedkeyID:
subkeys = [subkey[0][8:] for subkey in key['subkeys'] \
if subkey[1] == 's']
break
if keyID not in subkeys:
# Mismatch! Another gpg key was expected
keyID += 'MISMATCH'
elif jid in attached_keys:
# An unsigned presence, just use the assigned key
keyID = attached_keys[attached_keys.index(jid) + 1]
elif keyID:
full_key = app.connections[account].ask_gpg_keys(keyID=keyID)
# Assign the corresponding key, if we have it in our keyring
if full_key:
for u in app.contacts.get_contacts(account, jid):
u.keyID = keyID
keys_str = app.config.get_per('accounts', account,
'attached_gpg_keys')
keys_str += jid + ' ' + keyID + ' '
app.config.set_per('accounts', account, 'attached_gpg_keys',
keys_str)
elif keyID is None:
keyID = 'UNKNOWN'
return keyID
def update_optional_features(account=None):
if account:
accounts = [account]
else:
accounts = [a for a in app.connections]
for a in accounts:
app.gajim_optional_features[a] = []
if app.config.get_per('accounts', a, 'subscribe_mood'):
app.gajim_optional_features[a].append(nbxmpp.NS_MOOD + '+notify')
if app.config.get_per('accounts', a, 'subscribe_activity'):
app.gajim_optional_features[a].append(nbxmpp.NS_ACTIVITY + \
'+notify')
if app.config.get_per('accounts', a, 'publish_tune'):
app.gajim_optional_features[a].append(nbxmpp.NS_TUNE)
if app.config.get_per('accounts', a, 'publish_location'):
app.gajim_optional_features[a].append(nbxmpp.NS_LOCATION)
if app.config.get_per('accounts', a, 'subscribe_tune'):
app.gajim_optional_features[a].append(nbxmpp.NS_TUNE + '+notify')
if app.config.get_per('accounts', a, 'subscribe_nick'):
app.gajim_optional_features[a].append(nbxmpp.NS_NICK + '+notify')
if app.config.get_per('accounts', a, 'subscribe_location'):
app.gajim_optional_features[a].append(nbxmpp.NS_LOCATION + \
'+notify')
if app.config.get('outgoing_chat_state_notifactions') != 'disabled':
app.gajim_optional_features[a].append(nbxmpp.NS_CHATSTATES)
if not app.config.get('ignore_incoming_xhtml'):
app.gajim_optional_features[a].append(nbxmpp.NS_XHTML_IM)
if app.config.get_per('accounts', a, 'answer_receipts'):
app.gajim_optional_features[a].append(nbxmpp.NS_RECEIPTS)
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE)
if app.is_installed('FARSTREAM'):
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_RTP)
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_RTP_AUDIO)
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_RTP_VIDEO)
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_ICE_UDP)
app.gajim_optional_features[a].append(
nbxmpp.NS_JINGLE_FILE_TRANSFER_5)
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_XTLS)
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_BYTESTREAM)
app.gajim_optional_features[a].append(nbxmpp.NS_JINGLE_IBB)
# Give plugins the possibility to add their features
app.plugin_manager.extension_point('update_caps', a)
app.caps_hash[a] = caps_cache.compute_caps_hash([app.gajim_identity],
app.gajim_common_features + app.gajim_optional_features[a])
# re-send presence with new hash
connected = app.connections[a].connected
if connected > 1 and app.SHOW_LIST[connected] != 'invisible':
app.connections[a].change_status(app.SHOW_LIST[connected],
app.connections[a].status)
def jid_is_blocked(account, jid):
con = app.connections[account]
return (jid in con.get_module('Blocking').blocked or
jid in con.get_module('PrivacyLists').blocked_contacts or
con.get_module('PrivacyLists').blocked_all)
def group_is_blocked(account, group):
con = app.connections[account]
return (group in con.get_module('PrivacyLists').blocked_groups or
con.get_module('PrivacyLists').blocked_all)
def get_subscription_request_msg(account=None):
s = app.config.get_per('accounts', account, 'subscription_request_msg')
if s:
return s
s = _('I would like to add you to my contact list.')
if account:
s = _('Hello, I am $name.') + ' ' + s
name = app.connections[account].get_module('VCardTemp').get_vard_name()
nick = app.nicks[account]
if name and nick:
name += ' (%s)' % nick
elif nick:
name = nick
s = Template(s).safe_substitute({'name': name})
return s
def replace_dataform_media(form, stanza):
found = False
for field in form.getTags('field'):
for media in field.getTags('media'):
for uri in media.getTags('uri'):
uri_data = uri.getData()
if uri_data.startswith('cid:'):
uri_data = uri_data[4:]
for data in stanza.getTags('data', namespace=nbxmpp.NS_BOB):
if data.getAttr('cid') == uri_data:
uri.setData(data.getData())
found = True
return found
def get_proxy_info(account):
p = app.config.get_per('accounts', account, 'proxy')
if not p:
if app.config.get_per('accounts', account, 'use_env_http_proxy'):
try:
try:
env_http_proxy = os.environ['HTTP_PROXY']
except Exception:
env_http_proxy = os.environ['http_proxy']
env_http_proxy = env_http_proxy.strip('"')
# Dispose of the http:// prefix
env_http_proxy = env_http_proxy.split('://')[-1]
env_http_proxy = env_http_proxy.split('@')
if len(env_http_proxy) == 2:
login = env_http_proxy[0].split(':')
addr = env_http_proxy[1].split(':')
else:
login = ['', '']
addr = env_http_proxy[0].split(':')
proxy = {'host': addr[0], 'type' : 'http', 'user':login[0]}
if len(addr) == 2:
proxy['port'] = addr[1]
else:
proxy['port'] = 3128
if len(login) == 2:
proxy['pass'] = login[1]
proxy['useauth'] = True
else:
proxy['pass'] = ''
return proxy
except Exception:
proxy = None
p = app.config.get('global_proxy')
if p and p in app.config.get_per('proxies'):
proxy = {}
proxyptr = app.config.get_per('proxies', p)
if not proxyptr:
return proxy
for key in proxyptr.keys():
proxy[key] = proxyptr[key]
return proxy
def _get_img_direct(attrs):
"""
Download an image. This function should be launched in a separated thread.
"""
mem = b''
alt = ''
max_size = 2*1024*1024
if 'max_size' in attrs:
max_size = attrs['max_size']
# Wait maximum 10s for connection
socket.setdefaulttimeout(10)
try:
req = urllib.request.Request(attrs['src'])
req.add_header('User-Agent', 'Gajim ' + app.version)
f = urllib.request.urlopen(req)
except Exception as ex:
log.debug('Error loading image %s ', attrs['src'] + str(ex))
alt = attrs.get('alt', 'Broken image')
else:
# Wait 2s between each byte
try:
f.fp._sock.fp._sock.settimeout(2)
except Exception:
pass
# On a slow internet connection with ~1000kbps you need ~10 seconds for 1 MB
deadline = time.time() + (10 * (max_size / 1048576))
while True:
if time.time() > deadline:
log.debug('Timeout loading image %s ', attrs['src'])
mem = ''
alt = attrs.get('alt', '')
if alt:
alt += '\n'
alt += _('Timeout loading image')
break
try:
temp = f.read(100)
except socket.timeout as ex:
log.debug('Timeout loading image %s ', attrs['src'] + str(ex))
alt = attrs.get('alt', '')
if alt:
alt += '\n'
alt += _('Timeout loading image')
break
if temp:
mem += temp
else:
break
if len(mem) > max_size:
alt = attrs.get('alt', '')
if alt:
alt += '\n'
alt += _('Image is too big')
break
f.close()
return (mem, alt)
def _get_img_proxy(attrs, proxy):
"""
Download an image through a proxy. This function should be launched in a
separated thread.
"""
if not app.is_installed('PYCURL'):
return '', _('PyCURL is not installed')
alt, max_size = '', 2*1024*1024
if 'max_size' in attrs:
max_size = attrs['max_size']
try:
b = StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, attrs['src'].encode('utf-8'))
c.setopt(pycurl.FOLLOWLOCATION, 1)
# Wait maximum 10s for connection
c.setopt(pycurl.CONNECTTIMEOUT, 10)
# On a slow internet connection with ~1000kbps you need ~10 seconds for 1 MB
c.setopt(pycurl.TIMEOUT, 10 * (max_size / 1048576))
c.setopt(pycurl.MAXFILESIZE, max_size)
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.USERAGENT, 'Gajim ' + app.version)
# set proxy
c.setopt(pycurl.PROXY, proxy['host'].encode('utf-8'))
c.setopt(pycurl.PROXYPORT, proxy['port'])
if proxy['useauth']:
c.setopt(pycurl.PROXYUSERPWD, proxy['user'].encode('utf-8')\
+ ':' + proxy['pass'].encode('utf-8'))
c.setopt(pycurl.PROXYAUTH, pycurl.HTTPAUTH_ANY)
if proxy['type'] == 'http':
c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_HTTP)
elif proxy['type'] == 'socks5':
c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5)
c.close()
t = b.getvalue()
return (t, attrs.get('alt', ''))
except pycurl.error as ex:
alt = attrs.get('alt', '')
if alt:
alt += '\n'
if ex.errno == pycurl.E_FILESIZE_EXCEEDED:
alt += _('Image is too big')
elif ex.errno == pycurl.E_OPERATION_TIMEOUTED:
alt += _('Timeout loading image')
else:
alt += _('Error loading image')
except Exception as ex:
log.debug('Error loading image %s ', attrs['src'] + str(ex))
alt = attrs.get('alt', 'Broken image')
return ('', alt)
def download_image(account, attrs):
proxy = get_proxy_info(account)
if proxy and proxy['type'] in ('http', 'socks5'):
return _get_img_proxy(attrs, proxy)
return _get_img_direct(attrs)
def version_condition(current_version, required_version):
if V(current_version) < V(required_version):
return False
return True
def get_available_emoticon_themes():
emoticons_themes = ['font']
files = []
dir_iterator = os.scandir(configpaths.get('EMOTICONS'))
for folder in dir_iterator:
if not folder.is_dir():
continue
file_iterator = os.scandir(folder.path)
for theme in file_iterator:
if theme.is_file():
files.append(theme.name)
if os.path.isdir(configpaths.get('MY_EMOTS')):
files += os.listdir(configpaths.get('MY_EMOTS'))
for file in files:
if file.endswith('.png'):
emoticons_themes.append(file[:-4])
emoticons_themes.sort()
return emoticons_themes
def call_counter(func):
def helper(self, restart=False):
if restart:
self._connect_machine_calls = 0
self._connect_machine_calls += 1
return func(self, restart=False)
return helper
def get_sync_threshold(jid, archive_info):
if archive_info is None or archive_info.sync_threshold is None:
if muc_caps_cache.supports(jid, 'muc#roomconfig_membersonly'):
threshold = app.config.get('private_room_sync_threshold')
else:
threshold = app.config.get('public_room_sync_threshold')
app.logger.set_archive_infos(jid, sync_threshold=threshold)
return threshold
return archive_info.sync_threshold