# Copyright (C) 2003-2014 Yann Leboulanger # Copyright (C) 2005-2006 Dimitur Kirov # Nikos Kouremenos # Copyright (C) 2006 Alex Mauer # Copyright (C) 2006-2007 Travis Shirk # Copyright (C) 2006-2008 Jean-Marie Traissard # Copyright (C) 2007 Lukas Petrovicky # James Newton # Julien Pivotto # Copyright (C) 2007-2008 Stephan Erb # Copyright (C) 2008 Brendan Taylor # Jonathan Schleifer # # This file is part of Gajim. # # Gajim is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation; version 3 only. # # Gajim is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Gajim. If not, see . import sys import re import os import subprocess import urllib import webbrowser import errno import select import base64 import hashlib import shlex import socket import time import logging import json import shutil import collections from datetime import datetime, timedelta from distutils.version import LooseVersion as V from encodings.punycode import punycode_encode from string import Template import nbxmpp from nbxmpp.stringprepare import nameprep import precis_i18n.codec # pylint: disable=unused-import from gajim.common import caps_cache from gajim.common import configpaths from gajim.common.i18n import Q_ from gajim.common.i18n import _ from gajim.common.i18n import ngettext log = logging.getLogger('gajim.c.helpers') special_groups = (_('Transports'), _('Not in Roster'), _('Observers'), _('Groupchats')) class InvalidFormat(Exception): pass def decompose_jid(jidstring): user = None server = None resource = None # Search for delimiters user_sep = jidstring.find('@') res_sep = jidstring.find('/') if user_sep == -1: if res_sep == -1: # host server = jidstring else: # host/resource server = jidstring[0:res_sep] resource = jidstring[res_sep + 1:] else: if res_sep == -1: # user@host user = jidstring[0:user_sep] server = jidstring[user_sep + 1:] else: if user_sep < res_sep: # user@host/resource user = jidstring[0:user_sep] server = jidstring[user_sep + 1:user_sep + (res_sep - user_sep)] resource = jidstring[res_sep + 1:] else: # server/resource (with an @ in resource) server = jidstring[0:res_sep] resource = jidstring[res_sep + 1:] return user, server, resource def parse_jid(jidstring): """ Perform stringprep on all JID fragments from a string and return the full jid """ # This function comes from http://svn.twistedmatrix.com/cvs/trunk/twisted/words/protocols/jabber/jid.py return prep(*decompose_jid(jidstring)) def idn_to_ascii(host): """ Convert IDN (Internationalized Domain Names) to ACE (ASCII-compatible encoding) """ from encodings import idna labels = idna.dots.split(host) converted_labels = [] for label in labels: if label: converted_labels.append(idna.ToASCII(label).decode('utf-8')) else: converted_labels.append('') return ".".join(converted_labels) def ascii_to_idn(host): """ Convert ACE (ASCII-compatible encoding) to IDN (Internationalized Domain Names) """ from encodings import idna labels = idna.dots.split(host) converted_labels = [] for label in labels: converted_labels.append(idna.ToUnicode(label)) return ".".join(converted_labels) def puny_encode_url(url): _url = url if '//' not in _url: _url = '//' + _url try: o = urllib.parse.urlparse(_url) p_loc = idn_to_ascii(o.netloc) except Exception: log.debug('urlparse failed: %s', url) return False return url.replace(o.netloc, p_loc) def parse_resource(resource): """ Perform stringprep on resource and return it """ if resource: try: return resource.encode('OpaqueString').decode('utf-8') except UnicodeError: raise InvalidFormat('Invalid character in resource.') def prep(user, server, resource): """ Perform stringprep on all JID fragments and return the full jid """ # This function comes from #http://svn.twistedmatrix.com/cvs/trunk/twisted/words/protocols/jabber/jid.py ip_address = False try: socket.inet_aton(server) ip_address = True except socket.error: pass if not ip_address and hasattr(socket, 'inet_pton'): try: socket.inet_pton(socket.AF_INET6, server.strip('[]')) server = '[%s]' % server.strip('[]') ip_address = True except (socket.error, ValueError): pass if not ip_address: if server is not None: if server.endswith('.'): # RFC7622, 3.2 server = server[:-1] if not server or len(server.encode('utf-8')) > 1023: raise InvalidFormat(_('Server must be between 1 and 1023 bytes')) try: server = nameprep.prepare(server) except UnicodeError: raise InvalidFormat(_('Invalid character in hostname.')) else: raise InvalidFormat(_('Server address required.')) if user is not None: if not user or len(user.encode('utf-8')) > 1023: raise InvalidFormat(_('Username must be between 1 and 1023 bytes')) try: user = user.encode('UsernameCaseMapped').decode('utf-8') except UnicodeError: raise InvalidFormat(_('Invalid character in username.')) else: user = None if resource is not None: if not resource or len(resource.encode('utf-8')) > 1023: raise InvalidFormat(_('Resource must be between 1 and 1023 bytes')) try: resource = resource.encode('OpaqueString').decode('utf-8') except UnicodeError: raise InvalidFormat(_('Invalid character in resource.')) else: resource = None if user: if resource: return '%s@%s/%s' % (user, server, resource) return '%s@%s' % (user, server) if resource: return '%s/%s' % (server, resource) return server def windowsify(s): if os.name == 'nt': return s.capitalize() return s def temp_failure_retry(func, *args, **kwargs): while True: try: return func(*args, **kwargs) except (os.error, IOError, select.error) as ex: if ex.errno == errno.EINTR: continue else: raise def get_uf_show(show, use_mnemonic=False): """ Return a userfriendly string for dnd/xa/chat and make all strings translatable If use_mnemonic is True, it adds _ so GUI should call with True for accessibility issues """ if show == 'dnd': if use_mnemonic: uf_show = _('_Busy') else: uf_show = _('Busy') elif show == 'xa': if use_mnemonic: uf_show = _('_Not Available') else: uf_show = _('Not Available') elif show == 'chat': if use_mnemonic: uf_show = _('_Free for Chat') else: uf_show = _('Free for Chat') elif show == 'online': if use_mnemonic: uf_show = Q_('?user status:_Available') else: uf_show = Q_('?user status:Available') elif show == 'connecting': uf_show = _('Connecting') elif show == 'away': if use_mnemonic: uf_show = _('A_way') else: uf_show = _('Away') elif show == 'offline': if use_mnemonic: uf_show = _('_Offline') else: uf_show = _('Offline') elif show == 'invisible': if use_mnemonic: uf_show = _('_Invisible') else: uf_show = _('Invisible') elif show == 'not in roster': uf_show = _('Not in Roster') elif show == 'requested': uf_show = Q_('?contact has status:Unknown') else: uf_show = Q_('?contact has status:Has errors') return uf_show def get_css_show_color(show): if show in ('online', 'chat', 'invisible'): return 'status-online' if show in ('offline', 'not in roster', 'requested'): return None if show in ('xa', 'dnd'): return 'status-dnd' if show == 'away': return 'status-away' def get_uf_sub(sub): if sub == 'none': uf_sub = Q_('?Subscription we already have:None') elif sub == 'to': uf_sub = _('To') elif sub == 'from': uf_sub = _('From') elif sub == 'both': uf_sub = _('Both') else: uf_sub = _('Unknown') return uf_sub def get_uf_ask(ask): if ask is None: uf_ask = Q_('?Ask (for Subscription):None') elif ask == 'subscribe': uf_ask = _('Subscribe') else: uf_ask = ask return uf_ask def get_uf_role(role, plural=False): ''' plural determines if you get Moderators or Moderator''' if role.value == 'none': role_name = Q_('?Group Chat Contact Role:None') elif role.value == 'moderator': if plural: role_name = _('Moderators') else: role_name = _('Moderator') elif role.value == 'participant': if plural: role_name = _('Participants') else: role_name = _('Participant') elif role.value == 'visitor': if plural: role_name = _('Visitors') else: role_name = _('Visitor') return role_name def get_uf_affiliation(affiliation): '''Get a nice and translated affilition for muc''' if affiliation.value == 'none': affiliation_name = Q_('?Group Chat Contact Affiliation:None') elif affiliation.value == 'owner': affiliation_name = _('Owner') elif affiliation.value == 'admin': affiliation_name = _('Administrator') elif affiliation.value == 'member': affiliation_name = _('Member') else: # Argl ! An unknown affiliation ! affiliation_name = affiliation.capitalize() return affiliation_name def get_sorted_keys(adict): keys = sorted(adict.keys()) return keys def to_one_line(msg): msg = msg.replace('\\', '\\\\') msg = msg.replace('\n', '\\n') # s1 = 'test\ntest\\ntest' # s11 = s1.replace('\\', '\\\\') # s12 = s11.replace('\n', '\\n') # s12 # 'test\\ntest\\\\ntest' return msg def from_one_line(msg): # (? 48: hash_ = hashlib.md5(filename.encode('utf-8')) filename = base64.b64encode(hash_.digest()).decode('utf-8') # make it latin chars only filename = punycode_encode(filename).decode('utf-8') filename = filename.replace('/', '_') if os.name == 'nt': filename = filename.replace('?', '_').replace(':', '_')\ .replace('\\', '_').replace('"', "'").replace('|', '_')\ .replace('*', '_').replace('<', '_').replace('>', '_') return filename def reduce_chars_newlines(text, max_chars=0, max_lines=0): """ Cut the chars after 'max_chars' on each line and show only the first 'max_lines' If any of the params is not present (None or 0) the action on it is not performed """ def _cut_if_long(string): if len(string) > max_chars: string = string[:max_chars - 3] + '…' return string if max_lines == 0: lines = text.split('\n') else: lines = text.split('\n', max_lines)[:max_lines] if max_chars > 0: if lines: lines = [_cut_if_long(e) for e in lines] if lines: reduced_text = '\n'.join(lines) if reduced_text != text: reduced_text += '…' else: reduced_text = '' return reduced_text def get_account_status(account): status = reduce_chars_newlines(account['status_line'], 100, 1) return status def datetime_tuple(timestamp): """ Convert timestamp using strptime and the format: %Y%m%dT%H:%M:%S Because of various datetime formats are used the following exceptions are handled: - Optional milliseconds appened to the string are removed - Optional Z (that means UTC) appened to the string are removed - XEP-082 datetime strings have all '-' cahrs removed to meet the above format. """ date, tim = timestamp.split('T', 1) date = date.replace('-', '') tim = tim.replace('z', '') tim = tim.replace('Z', '') zone = None if '+' in tim: sign = -1 tim, zone = tim.split('+', 1) if '-' in tim: sign = 1 tim, zone = tim.split('-', 1) tim = tim.split('.')[0] tim = time.strptime(date + 'T' + tim, '%Y%m%dT%H:%M:%S') if zone: zone = zone.replace(':', '') tim = datetime.fromtimestamp(time.mktime(tim)) if len(zone) > 2: zone = time.strptime(zone, '%H%M') else: zone = time.strptime(zone, '%H') zone = timedelta(hours=zone.tm_hour, minutes=zone.tm_min) tim += zone * sign tim = tim.timetuple() return tim from gajim.common import app if app.is_installed('PYCURL'): import pycurl from io import StringIO def convert_bytes(string): suffix = '' # IEC standard says KiB = 1024 bytes KB = 1000 bytes # but do we use the standard? use_kib_mib = app.config.get('use_kib_mib') align = 1024. bytes_ = float(string) if bytes_ >= align: bytes_ = round(bytes_/align, 1) if bytes_ >= align: bytes_ = round(bytes_/align, 1) if bytes_ >= align: bytes_ = round(bytes_/align, 1) if use_kib_mib: #GiB means gibibyte suffix = _('%s GiB') else: #GB means gigabyte suffix = _('%s GB') else: if use_kib_mib: #MiB means mibibyte suffix = _('%s MiB') else: #MB means megabyte suffix = _('%s MB') else: if use_kib_mib: #KiB means kibibyte suffix = _('%s KiB') else: #KB means kilo bytes suffix = _('%s KB') else: #B means bytes suffix = _('%s B') return suffix % str(bytes_) def get_contact_dict_for_account(account): """ Create a dict of jid, nick -> contact with all contacts of account. Can be used for completion lists """ contacts_dict = {} for jid in app.contacts.get_jid_list(account): contact = app.contacts.get_contact_with_highest_priority(account, jid) contacts_dict[jid] = contact name = contact.name if name in contacts_dict: contact1 = contacts_dict[name] del contacts_dict[name] contacts_dict['%s (%s)' % (name, contact1.jid)] = contact1 contacts_dict['%s (%s)' % (name, jid)] = contact elif contact.name: if contact.name == app.get_nick_from_jid(jid): del contacts_dict[jid] contacts_dict[name] = contact return contacts_dict def launch_browser_mailer(kind, uri): # kind = 'url' or 'mail' if kind == 'url' and uri.startswith('file://'): launch_file_manager(uri) return if kind in ('mail', 'sth_at_sth') and not uri.startswith('mailto:'): uri = 'mailto:' + uri if kind == 'url' and uri.startswith('www.'): uri = 'http://' + uri if not app.config.get('autodetect_browser_mailer'): if kind == 'url': command = app.config.get('custombrowser') elif kind in ('mail', 'sth_at_sth'): command = app.config.get('custommailapp') if command == '': # if no app is configured return command = build_command(command, uri) try: exec_command(command) except Exception: pass else: webbrowser.open(uri) def launch_file_manager(path_to_open): if os.name == 'nt': try: os.startfile(path_to_open) # if pywin32 is installed we open except Exception: pass else: if not app.config.get('autodetect_browser_mailer'): command = app.config.get('custom_file_manager') if command == '': # if no app is configured return else: command = 'xdg-open' command = build_command(command, path_to_open) try: exec_command(command) except Exception: pass def play_sound(event): if not app.config.get('sounds_on'): return path_to_soundfile = app.config.get_per('soundevents', event, 'path') play_sound_file(path_to_soundfile) def check_soundfile_path(file_, dirs=None): """ Check if the sound file exists :param file_: the file to check, absolute or relative to 'dirs' path :param dirs: list of knows paths to fallback if the file doesn't exists (eg: ~/.gajim/sounds/, DATADIR/sounds...). :return the path to file or None if it doesn't exists. """ if dirs is None: dirs = [configpaths.get('MY_DATA'), configpaths.get('DATA')] if not file_: return None if os.path.exists(file_): return file_ for d in dirs: d = os.path.join(d, 'sounds', file_) if os.path.exists(d): return d return None def strip_soundfile_path(file_, dirs=None, abs_=True): """ Remove knowns paths from a sound file Filechooser returns absolute path. If path is a known fallback path, we remove it. So config have no hardcoded path to DATA_DIR and text in textfield is shorther. param: file_: the filename to strip. param: dirs: list of knowns paths from which the filename should be stripped. param: abs_: force absolute path on dirs """ if not file_: return None if dirs is None: dirs = [configpaths.get('MY_DATA'), configpaths.get('DATA')] name = os.path.basename(file_) for d in dirs: d = os.path.join(d, 'sounds', name) if abs_: d = os.path.abspath(d) if file_ == d: return name return file_ def play_sound_file(path_to_soundfile): path_to_soundfile = check_soundfile_path(path_to_soundfile) if path_to_soundfile is None: return if sys.platform == 'win32': import winsound try: winsound.PlaySound(path_to_soundfile, winsound.SND_FILENAME|winsound.SND_ASYNC) except Exception: log.exception('Sound Playback Error') elif sys.platform == 'darwin': try: from AppKit import NSSound except ImportError: log.exception('Sound Playback Error') return sound = NSSound.alloc() sound.initWithContentsOfFile_byReference_(path_to_soundfile, True) sound.play() elif app.config.get('soundplayer') == '': try: import wave import ossaudiodev except Exception: log.exception('Sound Playback Error') return def _oss_play(): sndfile = wave.open(path_to_soundfile, 'rb') nc, sw, fr, nf, _comptype, _compname = sndfile.getparams() dev = ossaudiodev.open('/dev/dsp', 'w') dev.setparameters(sw * 8, nc, fr) dev.write(sndfile.readframes(nf)) sndfile.close() dev.close() app.thread_interface(_oss_play) else: player = app.config.get('soundplayer') command = build_command(player, path_to_soundfile) exec_command(command) def get_global_show(): maxi = 0 for account in app.connections: if not app.config.get_per('accounts', account, 'sync_with_global_status'): continue connected = app.connections[account].connected if connected > maxi: maxi = connected return app.SHOW_LIST[maxi] def get_global_status(): maxi = 0 for account in app.connections: if not app.config.get_per('accounts', account, 'sync_with_global_status'): continue connected = app.connections[account].connected if connected > maxi: maxi = connected status = app.connections[account].status return status def statuses_unified(): """ Test if all statuses are the same """ reference = None for account in app.connections: if not app.config.get_per('accounts', account, 'sync_with_global_status'): continue if reference is None: reference = app.connections[account].connected elif reference != app.connections[account].connected: return False return True def get_icon_name_to_show(contact, account=None): """ Get the icon name to show in online, away, requested, etc """ if account and app.events.get_nb_roster_events(account, contact.jid): return 'event' if account and app.events.get_nb_roster_events( account, contact.get_full_jid()): return 'event' if account and account in app.interface.minimized_controls and \ contact.jid in app.interface.minimized_controls[account] and app.interface.\ minimized_controls[account][contact.jid].get_nb_unread_pm() > 0: return 'event' if account and contact.jid in app.gc_connected[account]: if app.gc_connected[account][contact.jid]: return 'muc-active' return 'muc-inactive' if contact.jid.find('@') <= 0: # if not '@' or '@' starts the jid ==> agent return contact.show if contact.sub in ('both', 'to'): return contact.show if contact.ask == 'subscribe': return 'requested' transport = app.get_transport_name_from_jid(contact.jid) if transport: return contact.show if contact.show in app.SHOW_LIST: return contact.show return 'notinroster' def get_full_jid_from_iq(iq_obj): """ Return the full jid (with resource) from an iq """ jid = iq_obj.getFrom() if jid is None: return None return parse_jid(str(iq_obj.getFrom())) def get_jid_from_iq(iq_obj): """ Return the jid (without resource) from an iq """ jid = get_full_jid_from_iq(iq_obj) return app.get_jid_without_resource(jid) def get_auth_sha(sid, initiator, target): """ Return sha of sid + initiator + target used for proxy auth """ return hashlib.sha1(("%s%s%s" % (sid, initiator, target)).encode('utf-8')).\ hexdigest() def remove_invalid_xml_chars(string): if string: string = re.sub(app.interface.invalid_XML_chars_re, '', string) return string def get_random_string_16(): """ Create random string of length 16 """ rng = list(range(65, 90)) rng.extend(range(48, 57)) char_sequence = [chr(e) for e in rng] from random import sample return ''.join(sample(char_sequence, 16)) def get_os_info(): if app.os_info: return app.os_info app.os_info = 'N/A' if os.name == 'nt' or sys.platform == 'darwin': import platform app.os_info = platform.system() + " " + platform.release() elif os.name == 'posix': try: import distro app.os_info = distro.name(pretty=True) except ImportError: import platform app.os_info = platform.system() return app.os_info def allow_showing_notification(account, type_='notify_on_new_message', is_first_message=True): """ Is it allowed to show nofication? Check OUR status and if we allow notifications for that status type is the option that need to be True e.g.: notify_on_signing is_first_message: set it to false when it's not the first message """ if type_ and (not app.config.get(type_) or not is_first_message): return False if app.config.get('autopopupaway'): # always show notification return True if app.connections[account].connected in (2, 3): # we're online or chat return True return False def allow_popup_window(account): """ Is it allowed to popup windows? """ autopopup = app.config.get('autopopup') autopopupaway = app.config.get('autopopupaway') if autopopup and (autopopupaway or \ app.connections[account].connected in (2, 3)): # we're online or chat return True return False def allow_sound_notification(account, sound_event): if app.config.get('sounddnd') or app.connections[account].connected != \ app.SHOW_LIST.index('dnd') and app.config.get_per('soundevents', sound_event, 'enabled'): return True return False def get_chat_control(account, contact): full_jid_with_resource = contact.jid if contact.resource: full_jid_with_resource += '/' + contact.resource highest_contact = app.contacts.get_contact_with_highest_priority( account, contact.jid) # Look for a chat control that has the given resource, or default to # one without resource ctrl = app.interface.msg_win_mgr.get_control(full_jid_with_resource, account) if ctrl: return ctrl if (highest_contact and highest_contact.resource and contact.resource != highest_contact.resource): return None # unknown contact or offline message return app.interface.msg_win_mgr.get_control(contact.jid, account) def get_notification_icon_tooltip_dict(): """ Return a dict of the form {acct: {'show': show, 'message': message, 'event_lines': [list of text lines to show in tooltip]} """ # How many events must there be before they're shown summarized, not per-user max_ungrouped_events = 10 accounts = get_accounts_info() # Gather events. (With accounts, when there are more.) for account in accounts: account_name = account['name'] account['event_lines'] = [] # Gather events per-account pending_events = app.events.get_events(account=account_name) messages, non_messages, total_messages, total_non_messages = {}, {}, 0, 0 for jid in pending_events: for event in pending_events[jid]: if event.type_.count('file') > 0: # This is a non-messagee event. messages[jid] = non_messages.get(jid, 0) + 1 total_non_messages = total_non_messages + 1 else: # This is a message. messages[jid] = messages.get(jid, 0) + 1 total_messages = total_messages + 1 # Display unread messages numbers, if any if total_messages > 0: if total_messages > max_ungrouped_events: text = ngettext( '%d message pending', '%d messages pending', total_messages, total_messages, total_messages) account['event_lines'].append(text) else: for jid in messages: text = ngettext( '%d message pending', '%d messages pending', messages[jid], messages[jid], messages[jid]) contact = app.contacts.get_first_contact_from_jid( account['name'], jid) text += ' ' if jid in app.gc_connected[account['name']]: text += _('from room %s') % (jid) elif contact: name = contact.get_shown_name() text += _('from user %s') % (name) else: text += _('from %s') % (jid) account['event_lines'].append(text) # Display unseen events numbers, if any if total_non_messages > 0: if total_non_messages > max_ungrouped_events: text = ngettext( '%d event pending', '%d events pending', total_non_messages, total_non_messages, total_non_messages) account['event_lines'].append(text) else: for jid in non_messages: text = ngettext('%d event pending', '%d events pending', non_messages[jid], non_messages[jid], non_messages[jid]) text += ' ' + _('from user %s') % (jid) account[account]['event_lines'].append(text) return accounts def get_notification_icon_tooltip_text(): text = None # How many events must there be before they're shown summarized, not per-user # max_ungrouped_events = 10 # Character which should be used to indent in the tooltip. indent_with = ' ' accounts = get_notification_icon_tooltip_dict() if not accounts: # No configured account return _('Gajim') # at least one account present # Is there more that one account? if len(accounts) == 1: show_more_accounts = False else: show_more_accounts = True # If there is only one account, its status is shown on the first line. if show_more_accounts: text = _('Gajim') else: text = _('Gajim - %s') % (get_account_status(accounts[0])) # Gather and display events. (With accounts, when there are more.) for account in accounts: account_name = account['name'] # Set account status, if not set above if show_more_accounts: message = '\n' + indent_with + ' %s - %s' text += message % (account_name, get_account_status(account)) # Account list shown, messages need to be indented more indent_how = 2 else: # If no account list is shown, messages could have default indenting. indent_how = 1 for line in account['event_lines']: text += '\n' + indent_with * indent_how + ' ' text += line return text def get_accounts_info(): """ Helper for notification icon tooltip """ accounts = [] accounts_list = sorted(app.contacts.get_accounts()) for account in accounts_list: status_idx = app.connections[account].connected # uncomment the following to hide offline accounts # if status_idx == 0: continue status = app.SHOW_LIST[status_idx] message = app.connections[account].status single_line = get_uf_show(status) if message is None: message = '' else: message = message.strip() if message != '': single_line += ': ' + message account_label = app.get_account_label(account) accounts.append({'name': account, 'account_label': account_label, 'status_line': single_line, 'show': status, 'message': message}) return accounts def get_current_show(account): if account not in app.connections: return 'offline' status = app.connections[account].connected return app.SHOW_LIST[status] def prepare_and_validate_gpg_keyID(account, jid, keyID): """ Return an eight char long keyID that can be used with for GPG encryption with this contact If the given keyID is None, return UNKNOWN; if the key does not match the assigned key XXXXXXXXMISMATCH is returned. If the key is trusted and not yet assigned, assign it. """ if app.connections[account].USE_GPG: if keyID and len(keyID) == 16: keyID = keyID[8:] attached_keys = app.config.get_per('accounts', account, 'attached_gpg_keys').split() if jid in attached_keys and keyID: attachedkeyID = attached_keys[attached_keys.index(jid) + 1] if attachedkeyID != keyID: # Get signing subkeys for the attached key subkeys = [] for key in app.connections[account].gpg.list_keys(): if key['keyid'][8:] == attachedkeyID: subkeys = [subkey[0][8:] for subkey in key['subkeys'] \ if subkey[1] == 's'] break if keyID not in subkeys: # Mismatch! Another gpg key was expected keyID += 'MISMATCH' elif jid in attached_keys: # An unsigned presence, just use the assigned key keyID = attached_keys[attached_keys.index(jid) + 1] elif keyID: full_key = app.connections[account].ask_gpg_keys(keyID=keyID) # Assign the corresponding key, if we have it in our keyring if full_key: for u in app.contacts.get_contacts(account, jid): u.keyID = keyID keys_str = app.config.get_per('accounts', account, 'attached_gpg_keys') keys_str += jid + ' ' + keyID + ' ' app.config.set_per('accounts', account, 'attached_gpg_keys', keys_str) elif keyID is None: keyID = 'UNKNOWN' return keyID def update_optional_features(account=None): if account is not None: accounts = [account] else: accounts = app.connections.keys() for account_ in accounts: features = [] app.gajim_optional_features[account_] = features if app.config.get_per('accounts', account_, 'subscribe_mood'): features.append(nbxmpp.NS_MOOD + '+notify') if app.config.get_per('accounts', account_, 'subscribe_activity'): features.append(nbxmpp.NS_ACTIVITY + '+notify') if app.config.get_per('accounts', account_, 'subscribe_tune'): features.append(nbxmpp.NS_TUNE + '+notify') if app.config.get_per('accounts', account_, 'subscribe_nick'): features.append(nbxmpp.NS_NICK + '+notify') if app.config.get_per('accounts', account_, 'subscribe_location'): features.append(nbxmpp.NS_LOCATION + '+notify') if app.config.get('outgoing_chat_state_notifactions') != 'disabled': features.append(nbxmpp.NS_CHATSTATES) if not app.config.get('ignore_incoming_xhtml'): features.append(nbxmpp.NS_XHTML_IM) if app.config.get_per('accounts', account_, 'answer_receipts'): features.append(nbxmpp.NS_RECEIPTS) if app.is_installed('FARSTREAM'): features.append(nbxmpp.NS_JINGLE_RTP) features.append(nbxmpp.NS_JINGLE_RTP_AUDIO) features.append(nbxmpp.NS_JINGLE_RTP_VIDEO) features.append(nbxmpp.NS_JINGLE_ICE_UDP) # Give plugins the possibility to add their features app.plugin_manager.extension_point('update_caps', account_) app.caps_hash[account_] = caps_cache.compute_caps_hash( [app.gajim_identity], app.gajim_common_features + features) # re-send presence with new hash connected = app.connections[account_].connected if connected > 1 and app.SHOW_LIST[connected] != 'invisible': app.connections[account_].change_status( app.SHOW_LIST[connected], app.connections[account_].status) def jid_is_blocked(account, jid): con = app.connections[account] return (jid in con.get_module('Blocking').blocked or jid in con.get_module('PrivacyLists').blocked_contacts or con.get_module('PrivacyLists').blocked_all) def group_is_blocked(account, group): con = app.connections[account] return (group in con.get_module('PrivacyLists').blocked_groups or con.get_module('PrivacyLists').blocked_all) def get_subscription_request_msg(account=None): s = app.config.get_per('accounts', account, 'subscription_request_msg') if s: return s s = _('I would like to add you to my contact list.') if account: s = _('Hello, I am $name.') + ' ' + s name = app.connections[account].get_module('VCardTemp').get_vard_name() nick = app.nicks[account] if name and nick: name += ' (%s)' % nick elif nick: name = nick s = Template(s).safe_substitute({'name': name}) return s def replace_dataform_media(form, stanza): found = False for field in form.getTags('field'): for media in field.getTags('media'): for uri in media.getTags('uri'): uri_data = uri.getData() if uri_data.startswith('cid:'): uri_data = uri_data[4:] for data in stanza.getTags('data', namespace=nbxmpp.NS_BOB): if data.getAttr('cid') == uri_data: uri.setData(data.getData()) found = True return found def get_proxy_info(account): p = app.config.get_per('accounts', account, 'proxy') if not p: if app.config.get_per('accounts', account, 'use_env_http_proxy'): try: try: env_http_proxy = os.environ['HTTP_PROXY'] except Exception: env_http_proxy = os.environ['http_proxy'] env_http_proxy = env_http_proxy.strip('"') # Dispose of the http:// prefix env_http_proxy = env_http_proxy.split('://')[-1] env_http_proxy = env_http_proxy.split('@') if len(env_http_proxy) == 2: login = env_http_proxy[0].split(':') addr = env_http_proxy[1].split(':') else: login = ['', ''] addr = env_http_proxy[0].split(':') proxy = {'host': addr[0], 'type' : 'http', 'user':login[0]} if len(addr) == 2: proxy['port'] = addr[1] else: proxy['port'] = 3128 if len(login) == 2: proxy['pass'] = login[1] proxy['useauth'] = True else: proxy['pass'] = '' return proxy except Exception: proxy = None p = app.config.get('global_proxy') if p and p in app.config.get_per('proxies'): proxy = {} proxyptr = app.config.get_per('proxies', p) if not proxyptr: return proxy for key in proxyptr.keys(): proxy[key] = proxyptr[key] return proxy def _get_img_direct(attrs): """ Download an image. This function should be launched in a separated thread. """ mem = b'' alt = '' max_size = 2*1024*1024 if 'max_size' in attrs: max_size = attrs['max_size'] # Wait maximum 10s for connection socket.setdefaulttimeout(10) try: req = urllib.request.Request(attrs['src']) req.add_header('User-Agent', 'Gajim ' + app.version) f = urllib.request.urlopen(req) except Exception as ex: log.debug('Error loading image %s ', attrs['src'] + str(ex)) alt = attrs.get('alt', 'Broken image') else: # Wait 2s between each byte try: f.fp._sock.fp._sock.settimeout(2) except Exception: pass # On a slow internet connection with ~1000kbps you need ~10 seconds for 1 MB deadline = time.time() + (10 * (max_size / 1048576)) while True: if time.time() > deadline: log.debug('Timeout loading image %s ', attrs['src']) mem = '' alt = attrs.get('alt', '') if alt: alt += '\n' alt += _('Timeout loading image') break try: temp = f.read(100) except socket.timeout as ex: log.debug('Timeout loading image %s ', attrs['src'] + str(ex)) alt = attrs.get('alt', '') if alt: alt += '\n' alt += _('Timeout loading image') break if temp: mem += temp else: break if len(mem) > max_size: alt = attrs.get('alt', '') if alt: alt += '\n' alt += _('Image is too big') break f.close() return (mem, alt) def _get_img_proxy(attrs, proxy): """ Download an image through a proxy. This function should be launched in a separated thread. """ if not app.is_installed('PYCURL'): return '', _('PyCURL is not installed') alt, max_size = '', 2*1024*1024 if 'max_size' in attrs: max_size = attrs['max_size'] try: b = StringIO() c = pycurl.Curl() c.setopt(pycurl.URL, attrs['src'].encode('utf-8')) c.setopt(pycurl.FOLLOWLOCATION, 1) # Wait maximum 10s for connection c.setopt(pycurl.CONNECTTIMEOUT, 10) # On a slow internet connection with ~1000kbps you need ~10 seconds for 1 MB c.setopt(pycurl.TIMEOUT, 10 * (max_size / 1048576)) c.setopt(pycurl.MAXFILESIZE, max_size) c.setopt(pycurl.WRITEFUNCTION, b.write) c.setopt(pycurl.USERAGENT, 'Gajim ' + app.version) # set proxy c.setopt(pycurl.PROXY, proxy['host'].encode('utf-8')) c.setopt(pycurl.PROXYPORT, proxy['port']) if proxy['useauth']: c.setopt(pycurl.PROXYUSERPWD, proxy['user'].encode('utf-8')\ + ':' + proxy['pass'].encode('utf-8')) c.setopt(pycurl.PROXYAUTH, pycurl.HTTPAUTH_ANY) if proxy['type'] == 'http': c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_HTTP) elif proxy['type'] == 'socks5': c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5) c.close() t = b.getvalue() return (t, attrs.get('alt', '')) except pycurl.error as ex: alt = attrs.get('alt', '') if alt: alt += '\n' if ex.errno == pycurl.E_FILESIZE_EXCEEDED: alt += _('Image is too big') elif ex.errno == pycurl.E_OPERATION_TIMEOUTED: alt += _('Timeout loading image') else: alt += _('Error loading image') except Exception as ex: log.debug('Error loading image %s ', attrs['src'] + str(ex)) alt = attrs.get('alt', 'Broken image') return ('', alt) def download_image(account, attrs): proxy = get_proxy_info(account) if proxy and proxy['type'] in ('http', 'socks5'): return _get_img_proxy(attrs, proxy) return _get_img_direct(attrs) def version_condition(current_version, required_version): if V(current_version) < V(required_version): return False return True def get_available_emoticon_themes(): emoticons_themes = ['font'] files = [] dir_iterator = os.scandir(configpaths.get('EMOTICONS')) for folder in dir_iterator: if not folder.is_dir(): continue file_iterator = os.scandir(folder.path) for theme in file_iterator: if theme.is_file(): files.append(theme.name) if os.path.isdir(configpaths.get('MY_EMOTS')): files += os.listdir(configpaths.get('MY_EMOTS')) for file in files: if file.endswith('.png'): emoticons_themes.append(file[:-4]) emoticons_themes.sort() return emoticons_themes def call_counter(func): def helper(self, restart=False): if restart: self._connect_machine_calls = 0 self._connect_machine_calls += 1 return func(self, restart=False) return helper def get_sync_threshold(jid, archive_info): cache = caps_cache.muc_caps_cache if archive_info is None or archive_info.sync_threshold is None: if cache.supports(jid, 'muc#roomconfig_membersonly'): threshold = app.config.get('private_room_sync_threshold') else: threshold = app.config.get('public_room_sync_threshold') app.logger.set_archive_infos(jid, sync_threshold=threshold) return threshold return archive_info.sync_threshold def load_json(path, key=None, default=None): try: with open(path, 'r') as file: json_dict = json.loads(file.read()) except Exception: log.exception('Parsing error') return default if key is None: return json_dict return json_dict.get(key, default) def ignore_contact(account, jid): jid = str(jid) known_contact = app.contacts.get_contacts(account, jid) ignore = app.config.get_per('accounts', account, 'ignore_unknown_contacts') if ignore and not known_contact: log.info('Ignore unknown contact %s', jid) return True return False class AdditionalDataDict(collections.UserDict): def __init__(self, initialdata=None): collections.UserDict.__init__(self, initialdata) @staticmethod def _get_path_childs(full_path): path_childs = [full_path] if ':' in full_path: path_childs = full_path.split(':') return path_childs def set_value(self, full_path, key, value): path_childs = self._get_path_childs(full_path) _dict = self.data for path in path_childs: try: _dict = _dict[path] except KeyError: _dict[path] = {} _dict = _dict[path] _dict[key] = value def get_value(self, full_path, key, default=None): path_childs = self._get_path_childs(full_path) _dict = self.data for path in path_childs: try: _dict = _dict[path] except KeyError: return default try: return _dict[key] except KeyError: return default def remove_value(self, full_path, key): path_childs = self._get_path_childs(full_path) _dict = self.data for path in path_childs: try: _dict = _dict[path] except KeyError: return try: del _dict[key] except KeyError: return