## common/GnuPG.py ## ## Contributors for this file: ## - Yann Leboulanger ## - Nikos Kouremenos ## ## Copyright (C) 2003-2004 Yann Leboulanger ## Vincent Hanquez ## Copyright (C) 2005 Yann Leboulanger ## Vincent Hanquez ## Nikos Kouremenos ## Dimitur Kirov ## Travis Shirk ## Norman Rasmussen ## ## This file is part of Gajim. ## ## Gajim is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published ## by the Free Software Foundation; version 3 only. ## ## Gajim is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with Gajim. If not, see . ## import gajim from os import tmpfile from common import helpers if gajim.HAVE_GPG: import GnuPGInterface class GnuPG(GnuPGInterface.GnuPG): def __init__(self, use_agent = False): GnuPGInterface.GnuPG.__init__(self) self.use_agent = use_agent self._setup_my_options() def _setup_my_options(self): self.options.armor = 1 self.options.meta_interactive = 0 self.options.extra_args.append('--no-secmem-warning') if self.use_agent: self.options.extra_args.append('--use-agent') def _read_response(self, child_stdout): # Internal method: reads all the output from GPG, taking notice # only of lines that begin with the magic [GNUPG:] prefix. # (See doc/DETAILS in the GPG distribution for info on GPG's # output when --status-fd is specified.) # # Returns a dictionary, mapping GPG's keywords to the arguments # for that keyword. resp = {} while 1: line = helpers.temp_failure_retry(child_stdout.readline) if line == "": break line = line.rstrip() if line[0:9] == '[GNUPG:] ': # Chop off the prefix line = line[9:] L = line.split(None, 1) keyword = L[0] if len(L) > 1: resp[ keyword ] = L[1] else: resp[ keyword ] = "" return resp def encrypt(self, str, recipients): self.options.recipients = recipients # a list! proc = self.run(['--encrypt'], create_fhs=['stdin', 'stdout', 'status', 'stderr']) proc.handles['stdin'].write(str) try: proc.handles['stdin'].close() except IOError: pass output = proc.handles['stdout'].read() try: proc.handles['stdout'].close() except IOError: pass stat = proc.handles['status'] resp = self._read_response(stat) try: proc.handles['status'].close() except IOError: pass error = proc.handles['stderr'].read() proc.handles['stderr'].close() try: proc.wait() except IOError: pass if 'BEGIN_ENCRYPTION' in resp and 'END_ENCRYPTION' in resp: # Encryption succeeded, even if there is output on stderr. Maybe # verbose is on error = '' return self._stripHeaderFooter(output), error def decrypt(self, str, keyID): proc = self.run(['--decrypt', '-q', '-u %s'%keyID], create_fhs=['stdin', 'stdout']) enc = self._addHeaderFooter(str, 'MESSAGE') proc.handles['stdin'].write(enc) proc.handles['stdin'].close() output = proc.handles['stdout'].read() proc.handles['stdout'].close() try: proc.wait() except IOError: pass return output def sign(self, str, keyID): proc = self.run(['-b', '-u %s'%keyID], create_fhs=['stdin', 'stdout', 'status', 'stderr']) proc.handles['stdin'].write(str) try: proc.handles['stdin'].close() except IOError: pass output = proc.handles['stdout'].read() try: proc.handles['stdout'].close() proc.handles['stderr'].close() except IOError: pass stat = proc.handles['status'] resp = self._read_response(stat) try: proc.handles['status'].close() except IOError: pass try: proc.wait() except IOError: pass if resp.has_key('GOOD_PASSPHRASE') or resp.has_key('SIG_CREATED'): return self._stripHeaderFooter(output) return 'BAD_PASSPHRASE' def verify(self, str, sign): if str is None: return '' f = tmpfile() fd = f.fileno() f.write(str) f.seek(0) proc = self.run(['--verify', '--enable-special-filenames', '-', '-&%s'%fd], create_fhs=['stdin', 'status', 'stderr']) f.close() sign = self._addHeaderFooter(sign, 'SIGNATURE') proc.handles['stdin'].write(sign) proc.handles['stdin'].close() proc.handles['stderr'].close() stat = proc.handles['status'] resp = self._read_response(stat) proc.handles['status'].close() try: proc.wait() except IOError: pass keyid = '' if resp.has_key('GOODSIG'): keyid = resp['GOODSIG'].split()[0] return keyid def get_keys(self, secret = False): if secret: opt = '--list-secret-keys' else: opt = '--list-keys' proc = self.run(['--with-colons', opt], create_fhs=['stdout']) output = proc.handles['stdout'].read() proc.handles['stdout'].close() keys = {} lines = output.split('\n') for line in lines: sline = line.split(':') if (sline[0] == 'sec' and secret) or \ (sline[0] == 'pub' and not secret): # decode escaped chars name = eval('"' + sline[9].replace('"', '\\"') + '"') # make it unicode instance keys[sline[4][8:]] = helpers.decode_string(name) return keys try: proc.wait() except IOError: pass def get_secret_keys(self): return self.get_keys(True) def _stripHeaderFooter(self, data): """Remove header and footer from data""" if not data: return '' lines = data.split('\n') while lines[0] != '': lines.remove(lines[0]) while lines[0] == '': lines.remove(lines[0]) i = 0 for line in lines: if line: if line[0] == '-': break i = i+1 line = '\n'.join(lines[0:i]) return line def _addHeaderFooter(self, data, type): """Add header and footer from data""" out = "-----BEGIN PGP %s-----\n" % type out = out + "Version: PGP\n" out = out + "\n" out = out + data + "\n" out = out + "-----END PGP %s-----\n" % type return out