From 6cd8e07fae50264d7c891411a9fcdfd87b303793 Mon Sep 17 00:00:00 2001 From: Yann Leboulanger Date: Mon, 29 Nov 2010 18:44:22 +0100 Subject: [PATCH] switch from GnuPGInterface to python-gnupg, so gpg is available under windows. Fixes #5096, #3615, #1890, #996 --- debian/rules | 1 - src/common/GnuPG.py | 198 ++------ src/common/GnuPGInterface.py | 673 ------------------------- src/common/connection.py | 19 +- src/common/gajim.py | 2 +- src/common/gnupg.py | 921 +++++++++++++++++++++++++++++++++++ src/features_window.py | 6 +- 7 files changed, 976 insertions(+), 844 deletions(-) delete mode 100644 src/common/GnuPGInterface.py create mode 100644 src/common/gnupg.py diff --git a/debian/rules b/debian/rules index bc351b438..a78957c1e 100755 --- a/debian/rules +++ b/debian/rules @@ -12,6 +12,5 @@ DEB_MAKE_BUILD_TARGET := all DEB_MAKE_INSTALL_TARGET = install DESTDIR=$(DEB_DESTDIR) binary-install/gajim:: - rm $(DEB_DESTDIR)/usr/share/gajim/src/common/GnuPGInterface.py* dh_pysupport -pgajim convert $(DEB_DESTDIR)/usr/share/icons/hicolor/64x64/apps/gajim.png -resize 32x32 $(DEB_DESTDIR)/usr/share/pixmaps/gajim.xpm diff --git a/src/common/GnuPG.py b/src/common/GnuPG.py index 32c1d4452..d029b8a5c 100644 --- a/src/common/GnuPG.py +++ b/src/common/GnuPG.py @@ -23,17 +23,17 @@ ## import gajim +import os from os import tmpfile -from common import helpers if gajim.HAVE_GPG: - import GnuPGInterface + import gnupg - class GnuPG(GnuPGInterface.GnuPG): - def __init__(self, use_agent = False): - GnuPGInterface.GnuPG.__init__(self) + class GnuPG(gnupg.GPG): + def __init__(self, use_agent=False): + gnupg.GPG.__init__(self) + self.passphrase = None self.use_agent = use_agent - self._setup_my_options() self.always_trust = False def _setup_my_options(self): @@ -46,166 +46,56 @@ if gajim.HAVE_GPG: if self.use_agent: self.options.extra_args.append('--use-agent') - def _read_response(self, child_stdout): - # Internal method: reads all the output from GPG, taking notice - # only of lines that begin with the magic [GNUPG:] prefix. - # (See doc/DETAILS in the GPG distribution for info on GPG's - # output when --status-fd is specified.) - # - # Returns a dictionary, mapping GPG's keywords to the arguments - # for that keyword. - - resp = {} - while True: - line = helpers.temp_failure_retry(child_stdout.readline) - if line == "": break - line = line.rstrip() - if line[0:9] == '[GNUPG:] ': - # Chop off the prefix - line = line[9:] - L = line.split(None, 1) - keyword = L[0] - if len(L) > 1: - resp[ keyword ] = L[1] - else: - resp[ keyword ] = "" - return resp - def encrypt(self, str_, recipients, always_trust=False): - self.options.recipients = recipients # a list! + result = super(GnuPG, self).encrypt(str_, recipients, + always_trust=always_trust, passphrase=self.passphrase) - opt = ['--encrypt'] - if always_trust or self.always_trust: - opt.append('--always-trust') - proc = self.run(opt, create_fhs=['stdin', 'stdout', 'status', - 'stderr']) - proc.handles['stdin'].write(str_) - try: - proc.handles['stdin'].close() - except IOError: - pass - - output = proc.handles['stdout'].read() - try: - proc.handles['stdout'].close() - except IOError: - pass - - stat = proc.handles['status'] - resp = self._read_response(stat) - try: - proc.handles['status'].close() - except IOError: - pass - - error = proc.handles['stderr'].read() - proc.handles['stderr'].close() - - try: proc.wait() - except IOError: pass - if 'INV_RECP' in resp and resp['INV_RECP'].split()[0] == '10': - # unusable recipient "Key not trusted" + if result.status == 'invalid recipient': return '', 'NOT_TRUSTED' - if 'BEGIN_ENCRYPTION' in resp and 'END_ENCRYPTION' in resp: - # Encryption succeeded, even if there is output on stderr. Maybe - # verbose is on + + if result.ok: error = '' - return self._stripHeaderFooter(output), helpers.decode_string(error) + else: + error = result.status + + return self._stripHeaderFooter(str(result)), error def decrypt(self, str_, keyID): - proc = self.run(['--decrypt', '-q', '-u %s'%keyID], create_fhs=['stdin', 'stdout']) - enc = self._addHeaderFooter(str_, 'MESSAGE') - proc.handles['stdin'].write(enc) - proc.handles['stdin'].close() + data = self._addHeaderFooter(str_, 'MESSAGE') + result = super(GnuPG, self).decrypt(data, + passphrase=self.passphrase) - output = proc.handles['stdout'].read() - proc.handles['stdout'].close() - - try: proc.wait() - except IOError: pass - return output + return str(result) def sign(self, str_, keyID): - proc = self.run(['-b', '-u %s'%keyID], create_fhs=['stdin', 'stdout', 'status', 'stderr']) - proc.handles['stdin'].write(str_) - try: - proc.handles['stdin'].close() - except IOError: - pass + result = super(GnuPG, self).sign(str_, keyid=keyID, detach=True, + passphrase=self.passphrase) - output = proc.handles['stdout'].read() - try: - proc.handles['stdout'].close() - proc.handles['stderr'].close() - except IOError: - pass - - stat = proc.handles['status'] - resp = self._read_response(stat) - try: - proc.handles['status'].close() - except IOError: - pass - - try: proc.wait() - except IOError: pass - if 'GOOD_PASSPHRASE' in resp or 'SIG_CREATED' in resp: - return self._stripHeaderFooter(output) - if 'KEYEXPIRED' in resp: - return 'KEYEXPIRED' + if result.fingerprint: + return self._stripHeaderFooter(str(result)) +# if 'KEYEXPIRED' in resp: +# return 'KEYEXPIRED' return 'BAD_PASSPHRASE' def verify(self, str_, sign): if str_ is None: return '' - f = tmpfile() - fd = f.fileno() - f.write(str_) - f.seek(0) + data = '-----BEGIN PGP SIGNED MESSAGE-----' + os.linesep + data = data + 'Hash: SHA1' + os.linesep + os.linesep + data = data + str_ + os.linesep + data = data + self._addHeaderFooter(sign, 'SIGNATURE') + result = super(GnuPG, self).verify(data) - proc = self.run(['--verify', '--enable-special-filenames', '-', '-&%s'%fd], create_fhs=['stdin', 'status', 'stderr']) - - f.close() - sign = self._addHeaderFooter(sign, 'SIGNATURE') - proc.handles['stdin'].write(sign) - proc.handles['stdin'].close() - proc.handles['stderr'].close() - - stat = proc.handles['status'] - resp = self._read_response(stat) - proc.handles['status'].close() - - try: proc.wait() - except IOError: pass - - keyid = '' - if 'GOODSIG' in resp: - keyid = resp['GOODSIG'].split()[0] - return keyid - - def get_keys(self, secret = False): - if secret: - opt = '--list-secret-keys' - else: - opt = '--list-keys' - proc = self.run(['--with-colons', opt], - create_fhs=['stdout']) - output = proc.handles['stdout'].read() - proc.handles['stdout'].close() - - try: proc.wait() - except IOError: pass + if result.valid: + return result.key_id + return '' + def get_keys(self, secret=False): keys = {} - lines = output.split('\n') - for line in lines: - sline = line.split(':') - if (sline[0] == 'sec' and secret) or \ - (sline[0] == 'pub' and not secret): - # decode escaped chars - name = eval('"' + sline[9].replace('"', '\\"') + '"') - # make it unicode instance - keys[sline[4][8:]] = helpers.decode_string(name) + result = super(GnuPG, self).list_keys(secret=secret) + for key in result: + # Take first not empty uid + keys[key['keyid'][8:]] = [uid for uid in key['uids'] if uid][0] return keys def get_secret_keys(self): @@ -216,7 +106,7 @@ if gajim.HAVE_GPG: Remove header and footer from data """ if not data: return '' - lines = data.split('\n') + lines = data.splitlines() while lines[0] != '': lines.remove(lines[0]) while lines[0] == '': @@ -233,9 +123,9 @@ if gajim.HAVE_GPG: """ Add header and footer from data """ - out = "-----BEGIN PGP %s-----\n" % type_ - out = out + "Version: PGP\n" - out = out + "\n" - out = out + data + "\n" - out = out + "-----END PGP %s-----\n" % type_ + out = "-----BEGIN PGP %s-----" % type_ + os.linesep + out = out + "Version: PGP" + os.linesep + out = out + os.linesep + out = out + data + os.linesep + out = out + "-----END PGP %s-----" % type_ + os.linesep return out diff --git a/src/common/GnuPGInterface.py b/src/common/GnuPGInterface.py deleted file mode 100644 index 4b41a65da..000000000 --- a/src/common/GnuPGInterface.py +++ /dev/null @@ -1,673 +0,0 @@ -# -*- coding:utf-8 -*- -## src/common/GnuPGInterface.py -## -## Copyright (C) 2001 Frank J. Tobin -## Copyright (C) 2005 Nikos Kouremenos -## Copyright (C) 2006-2010 Yann Leboulanger -## Copyright (C) 2008 Jean-Marie Traissard -## -## This file is part of Gajim. -## -## Gajim is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published -## by the Free Software Foundation; version 3 only. -## -## Gajim is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with Gajim. If not, see . -## - -""" -Interface to GNU Privacy Guard (GnuPG) - -GnuPGInterface is a Python module to interface with GnuPG. -It concentrates on interacting with GnuPG via filehandles, -providing access to control GnuPG via versatile and extensible means. - -This module is based on GnuPG::Interface, a Perl module by the same author. - -Normally, using this module will involve creating a -GnuPG object, setting some options in it's 'options' data member -(which is of type Options), creating some pipes -to talk with GnuPG, and then calling the run() method, which will -connect those pipes to the GnuPG process. run() returns a -Process object, which contains the filehandles to talk to GnuPG with. - -Example code: - ->>> import GnuPGInterface ->>> ->>> plaintext = "Three blind mice" ->>> passphrase = "This is the passphrase" ->>> ->>> gnupg = GnuPGInterface.GnuPG() ->>> gnupg.options.armor = 1 ->>> gnupg.options.meta_interactive = 0 ->>> gnupg.options.extra_args.append('--no-secmem-warning') ->>> ->>> # Normally we might specify something in ->>> # gnupg.options.recipients, like ->>> # gnupg.options.recipients = [ '0xABCD1234', 'bob@foo.bar' ] ->>> # but since we're doing symmetric-only encryption, it's not needed. ->>> # If you are doing standard, public-key encryption, using ->>> # --encrypt, you will need to specify recipients before ->>> # calling gnupg.run() ->>> ->>> # First we'll encrypt the test_text input symmetrically ->>> p1 = gnupg.run(['--symmetric'], -... create_fhs=['stdin', 'stdout', 'passphrase']) ->>> ->>> p1.handles['passphrase'].write(passphrase) ->>> p1.handles['passphrase'].close() ->>> ->>> p1.handles['stdin'].write(plaintext) ->>> p1.handles['stdin'].close() ->>> ->>> ciphertext = p1.handles['stdout'].read() ->>> p1.handles['stdout'].close() ->>> ->>> # process cleanup ->>> p1.wait() ->>> ->>> # Now we'll decrypt what we just encrypted it, ->>> # using the convience method to get the ->>> # passphrase to GnuPG ->>> gnupg.passphrase = passphrase ->>> ->>> p2 = gnupg.run(['--decrypt'], create_fhs=['stdin', 'stdout']) ->>> ->>> p2.handles['stdin'].write(ciphertext) ->>> p2.handles['stdin'].close() ->>> ->>> decrypted_plaintext = p2.handles['stdout'].read() ->>> p2.handles['stdout'].close() ->>> ->>> # process cleanup ->>> p2.wait() ->>> ->>> # Our decrypted plaintext: ->>> decrypted_plaintext -'Three blind mice' ->>> ->>> # ...and see it's the same as what we orignally encrypted ->>> assert decrypted_plaintext == plaintext, \ - "GnuPG decrypted output does not match original input" ->>> ->>> ->>> ################################################## ->>> # Now let's trying using run()'s attach_fhs paramter ->>> ->>> # we're assuming we're running on a unix... ->>> input = open('/etc/motd') ->>> ->>> p1 = gnupg.run(['--symmetric'], create_fhs=['stdout'], -... attach_fhs={'stdin': input}) ->>> ->>> # GnuPG will read the stdin from /etc/motd ->>> ciphertext = p1.handles['stdout'].read() ->>> ->>> # process cleanup ->>> p1.wait() ->>> ->>> # Now let's run the output through GnuPG ->>> # We'll write the output to a temporary file, ->>> import tempfile ->>> temp = tempfile.TemporaryFile() ->>> ->>> p2 = gnupg.run(['--decrypt'], create_fhs=['stdin'], -... attach_fhs={'stdout': temp}) ->>> ->>> # give GnuPG our encrypted stuff from the first run ->>> p2.handles['stdin'].write(ciphertext) ->>> p2.handles['stdin'].close() ->>> ->>> # process cleanup ->>> p2.wait() ->>> ->>> # rewind the tempfile and see what GnuPG gave us ->>> temp.seek(0) ->>> decrypted_plaintext = temp.read() ->>> ->>> # compare what GnuPG decrypted with our original input ->>> input.seek(0) ->>> input_data = input.read() ->>> ->>> assert decrypted_plaintext == input_data, \ - "GnuPG decrypted output does not match original input" - -To do things like public-key encryption, simply pass do something -like: - -gnupg.passphrase = 'My passphrase' -gnupg.options.recipients = [ 'bob@foobar.com' ] -gnupg.run( ['--sign', '--encrypt'], create_fhs=..., attach_fhs=...) - -Here is an example of subclassing GnuPGInterface.GnuPG, -so that it has an encrypt_string() method that returns -ciphertext. - ->>> import GnuPGInterface ->>> ->>> class MyGnuPG(GnuPGInterface.GnuPG): -... -... def __init__(self): -... GnuPGInterface.GnuPG.__init__(self) -... self.setup_my_options() -... -... def setup_my_options(self): -... self.options.armor = 1 -... self.options.meta_interactive = 0 -... self.options.extra_args.append('--no-secmem-warning') -... -... def encrypt_string(self, string, recipients): -... gnupg.options.recipients = recipients # a list! -... -... proc = gnupg.run(['--encrypt'], create_fhs=['stdin', 'stdout']) -... -... proc.handles['stdin'].write(string) -... proc.handles['stdin'].close() -... -... output = proc.handles['stdout'].read() -... proc.handles['stdout'].close() -... -... proc.wait() -... return output -... ->>> gnupg = MyGnuPG() ->>> ciphertext = gnupg.encrypt_string("The secret", ['0x260C4FA3']) ->>> ->>> # just a small sanity test here for doctest ->>> import types ->>> assert isinstance(ciphertext, types.StringType), \ - "What GnuPG gave back is not a string!" - -Here is an example of generating a key: ->>> import GnuPGInterface ->>> gnupg = GnuPGInterface.GnuPG() ->>> gnupg.options.meta_interactive = 0 ->>> ->>> # We will be creative and use the logger filehandle to capture ->>> # what GnuPG says this time, instead stderr; no stdout to listen to, ->>> # but we capture logger to surpress the dry-run command. ->>> # We also have to capture stdout since otherwise doctest complains; ->>> # Normally you can let stdout through when generating a key. ->>> ->>> proc = gnupg.run(['--gen-key'], create_fhs=['stdin', 'stdout', -... 'logger']) ->>> ->>> proc.handles['stdin'].write('''Key-Type: DSA -... Key-Length: 1024 -... # We are only testing syntax this time, so dry-run -... %dry-run -... Subkey-Type: ELG-E -... Subkey-Length: 1024 -... Name-Real: Joe Tester -... Name-Comment: with stupid passphrase -... Name-Email: joe@foo.bar -... Expire-Date: 2y -... Passphrase: abc -... %pubring foo.pub -... %secring foo.sec -... ''') ->>> ->>> proc.handles['stdin'].close() ->>> ->>> report = proc.handles['logger'].read() ->>> proc.handles['logger'].close() ->>> ->>> proc.wait() -""" - -import os -import sys -import fcntl - -__author__ = "Frank J. Tobin, ftobin@neverending.org" -__version__ = "0.3.2" -__revision__ = "$Id: GnuPGInterface.py,v 1.22 2002/01/11 20:22:04 ftobin Exp $" - -# "standard" filehandles attached to processes -_stds = [ 'stdin', 'stdout', 'stderr' ] - -# the permissions each type of fh needs to be opened with -_fd_modes = { 'stdin': 'w', - 'stdout': 'r', - 'stderr': 'r', - 'passphrase': 'w', - 'command': 'w', - 'logger': 'r', - 'status': 'r' - } - -# correlation between handle names and the arguments we'll pass -_fd_options = { 'passphrase': '--passphrase-fd', - 'logger': '--logger-fd', - 'status': '--status-fd', - 'command': '--command-fd' } - -class GnuPG: - """ - Class instances represent GnuPG - - Instance attributes of a GnuPG object are: - - * call -- string to call GnuPG with. Defaults to "gpg" - - * passphrase -- Since it is a common operation - to pass in a passphrase to GnuPG, - and working with the passphrase filehandle mechanism directly - can be mundane, if set, the passphrase attribute - works in a special manner. If the passphrase attribute is set, - and no passphrase file object is sent in to run(), - then GnuPG instnace will take care of sending the passphrase to - GnuPG, the executable instead of having the user sent it in manually. - - * options -- Object of type GnuPGInterface.Options. - Attribute-setting in options determines - the command-line options used when calling GnuPG. - """ - - def __init__(self): - self.call = 'gpg' - self.passphrase = None - self.options = Options() - - def run(self, gnupg_commands, args=None, create_fhs=None, attach_fhs=None): - """ - Calls GnuPG with the list of string commands gnupg_commands, complete - with prefixing dashes - - For example, gnupg_commands could be - '["--sign", "--encrypt"]' - Returns a GnuPGInterface.Process object. - - args is an optional list of GnuPG command arguments (not options), - such as keyID's to export, filenames to process, etc. - - create_fhs is an optional list of GnuPG filehandle - names that will be set as keys of the returned Process object's - 'handles' attribute. The generated filehandles can be used - to communicate with GnuPG via standard input, standard output, - the status-fd, passphrase-fd, etc. - - Valid GnuPG filehandle names are: - * stdin - * stdout - * stderr - * status - * passphase - * command - * logger - - The purpose of each filehandle is described in the GnuPG - documentation. - - attach_fhs is an optional dictionary with GnuPG filehandle - names mapping to opened files. GnuPG will read or write - to the file accordingly. For example, if 'my_file' is an - opened file and 'attach_fhs[stdin] is my_file', then GnuPG - will read its standard input from my_file. This is useful - if you want GnuPG to read/write to/from an existing file. - For instance: - - f = open("encrypted.gpg") - gnupg.run(["--decrypt"], attach_fhs={'stdin': f}) - - Using attach_fhs also helps avoid system buffering - issues that can arise when using create_fhs, which - can cause the process to deadlock. - - If not mentioned in create_fhs or attach_fhs, - GnuPG filehandles which are a std* (stdin, stdout, stderr) - are defaulted to the running process' version of handle. - Otherwise, that type of handle is simply not used when calling GnuPG. - For example, if you do not care about getting data from GnuPG's - status filehandle, simply do not specify it. - - run() returns a Process() object which has a 'handles' - which is a dictionary mapping from the handle name - (such as 'stdin' or 'stdout') to the respective - newly-created FileObject connected to the running GnuPG process. - For instance, if the call was - - process = gnupg.run(["--decrypt"], stdin=1) - - after run returns 'process.handles["stdin"]' - is a FileObject connected to GnuPG's standard input, - and can be written to. - """ - if args is None: args = [] - if create_fhs is None: create_fhs = [] - if attach_fhs is None: attach_fhs = {} - - for std in _stds: - if std not in attach_fhs \ - and std not in create_fhs: - attach_fhs.setdefault(std, getattr(sys, std)) - - handle_passphrase = 0 - - if self.passphrase is not None \ - and 'passphrase' not in attach_fhs \ - and 'passphrase' not in create_fhs: - handle_passphrase = 1 - create_fhs.append('passphrase') - - process = self._attach_fork_exec(gnupg_commands, args, - create_fhs, attach_fhs) - - if handle_passphrase: - passphrase_fh = process.handles['passphrase'] - passphrase_fh.write( self.passphrase ) - passphrase_fh.close() - del process.handles['passphrase'] - - return process - - - def _attach_fork_exec(self, gnupg_commands, args, create_fhs, attach_fhs): - """ - This is like run(), but without the passphrase-helping (note that run() - calls this) - """ - process = Process() - - for fh_name in create_fhs + attach_fhs.keys(): - if fh_name not in _fd_modes: - raise KeyError, \ - "unrecognized filehandle name '%s'; must be one of %s" \ - % (fh_name, _fd_modes.keys()) - - for fh_name in create_fhs: - # make sure the user doesn't specify a filehandle - # to be created *and* attached - if fh_name in attach_fhs: - raise ValueError, \ - "cannot have filehandle '%s' in both create_fhs and attach_fhs" \ - % fh_name - - pipe = os.pipe() - # fix by drt@un.bewaff.net noting - # that since pipes are unidirectional on some systems, - # so we have to 'turn the pipe around' - # if we are writing - if _fd_modes[fh_name] == 'w': pipe = (pipe[1], pipe[0]) - process._pipes[fh_name] = Pipe(pipe[0], pipe[1], 0) - - for fh_name, fh in attach_fhs.items(): - process._pipes[fh_name] = Pipe(fh.fileno(), fh.fileno(), 1) - - process.pid = os.fork() - - if process.pid == 0: self._as_child(process, gnupg_commands, args) - return self._as_parent(process) - - - def _as_parent(self, process): - """ - Stuff run after forking in parent - """ - for k, p in process._pipes.items(): - if not p.direct: - os.close(p.child) - process.handles[k] = os.fdopen(p.parent, _fd_modes[k]) - - # user doesn't need these - del process._pipes - - return process - - - def _as_child(self, process, gnupg_commands, args): - """ - Stuff run after forking in child - """ - # child - for std in _stds: - p = process._pipes[std] - os.dup2( p.child, getattr(sys, "__%s__" % std).fileno() ) - - for k, p in process._pipes.items(): - if p.direct and k not in _stds: - # we want the fh to stay open after execing - fcntl.fcntl( p.child, fcntl.F_SETFD, 0 ) - - fd_args = [] - - for k, p in process._pipes.items(): - # set command-line options for non-standard fds - if k not in _stds: - fd_args.extend([ _fd_options[k], "%d" % p.child ]) - - if not p.direct: os.close(p.parent) - - command = [ self.call ] + fd_args + self.options.get_args() \ - + gnupg_commands + args - - os.execvp( command[0], command ) - - -class Pipe: - """ - Simple struct holding stuff about pipes we use - """ - - def __init__(self, parent, child, direct): - self.parent = parent - self.child = child - self.direct = direct - - -class Options: - """ - Objects of this class encompass options passed to GnuPG. - This class is responsible for determining command-line arguments - which are based on options. It can be said that a GnuPG - object has-a Options object in its options attribute. - - Attributes which correlate directly to GnuPG options: - - Each option here defaults to false or None, and is described in - GnuPG documentation. - - Booleans (set these attributes to booleans) - - * armor - * no_greeting - * no_verbose - * quiet - * batch - * always_trust - * rfc1991 - * openpgp - * force_v3_sigs - * no_options - * textmode - - Strings (set these attributes to strings) - - * homedir - * default_key - * comment - * compress_algo - * options - - Lists (set these attributes to lists) - - * recipients (***NOTE*** plural of 'recipient') - * encrypt_to - - Meta options - - Meta options are options provided by this module that do - not correlate directly to any GnuPG option by name, - but are rather bundle of options used to accomplish - a specific goal, such as obtaining compatibility with PGP 5. - The actual arguments each of these reflects may change with time. Each - defaults to false unless otherwise specified. - - meta_pgp_5_compatible -- If true, arguments are generated to try - to be compatible with PGP 5.x. - - meta_pgp_2_compatible -- If true, arguments are generated to try - to be compatible with PGP 2.x. - - meta_interactive -- If false, arguments are generated to try to - help the using program use GnuPG in a non-interactive - environment, such as CGI scripts. Default is true. - - extra_args -- Extra option arguments may be passed in - via the attribute extra_args, a list. - - >>> import GnuPGInterface - >>> - >>> gnupg = GnuPGInterface.GnuPG() - >>> gnupg.options.armor = 1 - >>> gnupg.options.recipients = ['Alice', 'Bob'] - >>> gnupg.options.extra_args = ['--no-secmem-warning'] - >>> - >>> # no need for users to call this normally; just for show here - >>> gnupg.options.get_args() - ['--armor', '--recipient', 'Alice', '--recipient', 'Bob', '--no-secmem-warning'] - """ - def __init__(self): - # booleans - self.armor = 0 - self.no_greeting = 0 - self.verbose = 0 - self.no_verbose = 0 - self.quiet = 0 - self.batch = 0 - self.always_trust = 0 - self.rfc1991 = 0 - self.openpgp = 0 - self.force_v3_sigs = 0 - self.no_options = 0 - self.textmode = 0 - - # meta-option booleans - self.meta_pgp_5_compatible = 0 - self.meta_pgp_2_compatible = 0 - self.meta_interactive = 1 - - # strings - self.homedir = None - self.default_key = None - self.comment = None - self.compress_algo = None - self.options = None - - # lists - self.encrypt_to = [] - self.recipients = [] - - # miscellaneous arguments - self.extra_args = [] - - def get_args( self ): - """ - Generate a list of GnuPG arguments based upon attributes - """ - return self.get_meta_args() + self.get_standard_args() + self.extra_args - - def get_standard_args( self ): - """ - Generate a list of standard, non-meta or extra arguments - """ - args = [] - if self.homedir is not None: - args.extend( [ '--homedir', self.homedir ] ) - if self.options is not None: - args.extend( [ '--options', self.options ] ) - if self.comment is not None: - args.extend( [ '--comment', self.comment ] ) - if self.compress_algo is not None: - args.extend( [ '--compress-algo', self.compress_algo ] ) - if self.default_key is not None: - args.extend( [ '--default-key', self.default_key ] ) - - if self.no_options: args.append( '--no-options' ) - if self.armor: args.append( '--armor' ) - if self.textmode: args.append( '--textmode' ) - if self.no_greeting: args.append( '--no-greeting' ) - if self.verbose: args.append( '--verbose' ) - if self.no_verbose: args.append( '--no-verbose' ) - if self.quiet: args.append( '--quiet' ) - if self.batch: args.append( '--batch' ) - if self.always_trust: args.append( '--always-trust' ) - if self.force_v3_sigs: args.append( '--force-v3-sigs' ) - if self.rfc1991: args.append( '--rfc1991' ) - if self.openpgp: args.append( '--openpgp' ) - - for r in self.recipients: args.extend( [ '--recipient', r ] ) - for r in self.encrypt_to: args.extend( [ '--encrypt-to', r ] ) - - return args - - def get_meta_args( self ): - """ - Get a list of generated meta-arguments - """ - args = [] - - if self.meta_pgp_5_compatible: args.extend( [ '--compress-algo', '1', - '--force-v3-sigs' - ] ) - if self.meta_pgp_2_compatible: args.append( '--rfc1991' ) - if not self.meta_interactive: args.extend( [ '--batch', '--no-tty' ] ) - - return args - - -class Process: - """ - Objects of this class encompass properties of a GnuPG process spawned by - GnuPG.run() - - # gnupg is a GnuPG object - process = gnupg.run( [ '--decrypt' ], stdout = 1 ) - out = process.handles['stdout'].read() - ... - os.waitpid( process.pid, 0 ) - - Data Attributes - - handles -- This is a map of filehandle-names to - the file handles, if any, that were requested via run() and hence - are connected to the running GnuPG process. Valid names - of this map are only those handles that were requested. - - pid -- The PID of the spawned GnuPG process. - Useful to know, since once should call - os.waitpid() to clean up the process, especially - if multiple calls are made to run(). - """ - - def __init__(self): - self._pipes = {} - self.handles = {} - self.pid = None - self._waited = None - - def wait(self): - """ - Wait on the process to exit, allowing for child cleanup. Will raise an - IOError if the process exits non-zero - """ - e = os.waitpid(self.pid, 0)[1] - if e != 0: - raise IOError, "GnuPG exited non-zero, with code %d" % (e << 8) - -def _run_doctests(): - import doctest, GnuPGInterface - return doctest.testmod(GnuPGInterface) - -# deprecated -GnuPGInterface = GnuPG - -if __name__ == '__main__': - _run_doctests() diff --git a/src/common/connection.py b/src/common/connection.py index 317cf4388..83acc9fc6 100644 --- a/src/common/connection.py +++ b/src/common/connection.py @@ -218,13 +218,12 @@ class CommonConnection: if self.gpg.passphrase is None and not use_gpg_agent: # We didn't set a passphrase return None - if self.gpg.passphrase is not None or use_gpg_agent: - signed = self.gpg.sign(msg, keyID) - if signed == 'BAD_PASSPHRASE': - self.USE_GPG = False - signed = '' - gajim.nec.push_incoming_event(BadGPGPassphraseEvent(None, - conn=self)) + signed = self.gpg.sign(msg, keyID) + if signed == 'BAD_PASSPHRASE': + self.USE_GPG = False + signed = '' + gajim.nec.push_incoming_event(BadGPGPassphraseEvent(None, + conn=self)) return signed def _on_disconnected(self): @@ -596,14 +595,12 @@ class CommonConnection: def ask_gpg_keys(self): if self.gpg: - keys = self.gpg.get_keys() - return keys + return self.gpg.get_keys() return None def ask_gpg_secrete_keys(self): if self.gpg: - keys = self.gpg.get_secret_keys() - return keys + return self.gpg.get_secret_keys() return None def load_roster_from_db(self): diff --git a/src/common/gajim.py b/src/common/gajim.py index 7b6ba9049..48762829a 100644 --- a/src/common/gajim.py +++ b/src/common/gajim.py @@ -150,7 +150,7 @@ except ImportError: HAVE_GPG = True try: - import GnuPGInterface + import gnupg except ImportError: HAVE_GPG = False else: diff --git a/src/common/gnupg.py b/src/common/gnupg.py new file mode 100644 index 000000000..19f12ad17 --- /dev/null +++ b/src/common/gnupg.py @@ -0,0 +1,921 @@ +""" A wrapper for the 'gpg' command:: + +Portions of this module are derived from A.M. Kuchling's well-designed +GPG.py, using Richard Jones' updated version 1.3, which can be found +in the pycrypto CVS repository on Sourceforge: + +http://pycrypto.cvs.sourceforge.net/viewvc/pycrypto/gpg/GPG.py + +This module is *not* forward-compatible with amk's; some of the +old interface has changed. For instance, since I've added decrypt +functionality, I elected to initialize with a 'gnupghome' argument +instead of 'keyring', so that gpg can find both the public and secret +keyrings. I've also altered some of the returned objects in order for +the caller to not have to know as much about the internals of the +result classes. + +While the rest of ISconf is released under the GPL, I am releasing +this single file under the same terms that A.M. Kuchling used for +pycrypto. + +Steve Traugott, stevegt@terraluna.org +Thu Jun 23 21:27:20 PDT 2005 + +This version of the module has been modified from Steve Traugott's version +(see http://trac.t7a.org/isconf/browser/trunk/lib/python/isconf/GPG.py) by +Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork() +and so does not work on Windows). Renamed to gnupg.py to avoid confusion with +the previous versions. + +Modifications Copyright (C) 2008-2010 Vinay Sajip. All rights reserved. + +A unittest harness (test_gnupg.py) has also been added. +""" +import locale + +__author__ = "Vinay Sajip" +__date__ = "$08-Oct-2010 23:01:07$" + +try: + from io import StringIO + from io import TextIOWrapper + from io import BufferedReader + from io import BufferedWriter +except ImportError: + from cStringIO import StringIO + class BufferedReader: pass + class BufferedWriter: pass + +import locale +import logging +import os +import socket +from subprocess import Popen +from subprocess import PIPE +import sys +import threading + +try: + import logging.NullHandler as NullHandler +except ImportError: + class NullHandler(logging.Handler): + def handle(self, record): + pass +try: + unicode + _py3k = False +except NameError: + _py3k = True + +logger = logging.getLogger(__name__) +if not logger.handlers: + logger.addHandler(NullHandler()) + +def _copy_data(instream, outstream): + # Copy one stream to another + sent = 0 + if hasattr(sys.stdin, 'encoding'): + enc = sys.stdin.encoding + else: + enc = 'ascii' + while True: + data = instream.read(1024) + if len(data) == 0: + break + sent += len(data) + logger.debug("sending chunk (%d): %r", sent, data[:256]) + try: + outstream.write(data) + except UnicodeError: + outstream.write(data.encode(enc)) + except: + # Can sometimes get 'broken pipe' errors even when the data has all + # been sent + logger.exception('Error sending data') + break + try: + outstream.close() + except IOError: + logger.warning('Exception occurred while closing: ignored', exc_info=1) + logger.debug("closed output, %d bytes sent", sent) + +def _threaded_copy_data(instream, outstream): + wr = threading.Thread(target=_copy_data, args=(instream, outstream)) + wr.setDaemon(True) + logger.debug('data copier: %r, %r, %r', wr, instream, outstream) + wr.start() + return wr + +def _write_passphrase(stream, passphrase, encoding): + passphrase = '%s\n' % passphrase + passphrase = passphrase.encode(encoding) + stream.write(passphrase) + logger.debug("Wrote passphrase: %r", passphrase) + +def _is_sequence(instance): + return isinstance(instance,list) or isinstance(instance,tuple) + +def _wrap_input(inp): + if isinstance(inp, BufferedWriter): + oldinp = inp + inp = TextIOWrapper(inp) + logger.debug('wrapped input: %r -> %r', oldinp, inp) + return inp + +def _wrap_output(outp): + if isinstance(outp, BufferedReader): + oldoutp = outp + outp = TextIOWrapper(outp) + logger.debug('wrapped output: %r -> %r', oldoutp, outp) + return outp + +#The following is needed for Python2.7 :-( +def _make_file(s): + try: + rv = StringIO(s) + except (TypeError, UnicodeError): + from io import BytesIO + rv = BytesIO(s) + return rv + +def _make_binary_stream(s, encoding): + try: + if _py3k: + if isinstance(s, str): + s = s.encode(encoding) + else: + if type(s) is not str: + s = s.encode(encoding) + from io import BytesIO + rv = BytesIO(s) + except ImportError: + rv = StringIO(s) + return rv + +class GPG(object): + "Encapsulate access to the gpg executable" + def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False): + """Initialize a GPG process wrapper. Options are: + + gpgbinary -- full pathname for GPG binary. + + gnupghome -- full pathname to where we can find the public and + private keyrings. Default is whatever gpg defaults to. + """ + self.gpgbinary = gpgbinary + self.gnupghome = gnupghome + self.verbose = verbose + self.encoding = locale.getpreferredencoding() + if self.encoding is None: # This happens on Jython! + self.encoding = sys.stdin.encoding + if gnupghome and not os.path.isdir(self.gnupghome): + os.makedirs(self.gnupghome,0x1C0) + p = self._open_subprocess(["--version"]) + result = Verify() # any result will do for this + self._collect_output(p, result) + if p.returncode != 0: + raise ValueError("Error invoking gpg: %s: %s" % (p.returncode, + result.stderr)) + + def _open_subprocess(self, args, passphrase=False): + # Internal method: open a pipe to a GPG subprocess and return + # the file objects for communicating with it. + cmd = [self.gpgbinary, '--status-fd 2 --no-tty'] + if self.gnupghome: + cmd.append('--homedir "%s" ' % self.gnupghome) + if passphrase: + cmd.append('--batch --passphrase-fd 0') + + cmd.extend(args) + cmd = ' '.join(cmd) + if self.verbose: + print(cmd) + logger.debug("%s", cmd) + return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) + + def _read_response(self, stream, result): + # Internal method: reads all the output from GPG, taking notice + # only of lines that begin with the magic [GNUPG:] prefix. + # + # Calls methods on the response object for each valid token found, + # with the arg being the remainder of the status line. + lines = [] + while True: + line = stream.readline() + lines.append(line) + if self.verbose: + print(line) + logger.debug("%s", line.rstrip()) + if line == "": break + line = line.rstrip() + if line[0:9] == '[GNUPG:] ': + # Chop off the prefix + line = line[9:] + L = line.split(None, 1) + keyword = L[0] + if len(L) > 1: + value = L[1] + else: + value = "" + result.handle_status(keyword, value) + result.stderr = ''.join(lines) + + def _read_data(self, stream, result): + # Read the contents of the file from GPG's stdout + chunks = [] + while True: + data = stream.read(1024) + if len(data) == 0: + break + logger.debug("chunk: %r" % data[:256]) + chunks.append(data) + if _py3k: + # Join using b'' or '', as appropriate + result.data = type(data)().join(chunks) + else: + result.data = ''.join(chunks) + + def _collect_output(self, process, result, writer=None): + """ + Drain the subprocesses output streams, writing the collected output + to the result. If a writer thread (writing to the subprocess) is given, + make sure it's joined before returning. + """ + stderr = _wrap_output(process.stderr) + rr = threading.Thread(target=self._read_response, args=(stderr, result)) + rr.setDaemon(True) + logger.debug('stderr reader: %r', rr) + rr.start() + + stdout = process.stdout # _wrap_output(process.stdout) + dr = threading.Thread(target=self._read_data, args=(stdout, result)) + dr.setDaemon(True) + logger.debug('stdout reader: %r', dr) + dr.start() + + dr.join() + rr.join() + if writer is not None: + writer.join() + process.wait() + + def _handle_io(self, args, file, result, passphrase=None, binary=False): + "Handle a call to GPG - pass input data, collect output data" + # Handle a basic data call - pass data to GPG, handle the output + # including status information. Garbage In, Garbage Out :) + p = self._open_subprocess(args, passphrase is not None) + if not binary and not isinstance(file, BufferedReader): + stdin = _wrap_input(p.stdin) + else: + stdin = p.stdin + if passphrase: + _write_passphrase(stdin, passphrase, self.encoding) + writer = _threaded_copy_data(file, stdin) + self._collect_output(p, result, writer) + return result + + # + # SIGNATURE METHODS + # + def sign(self, message, **kwargs): + """sign message""" + file = _make_binary_stream(message, self.encoding) + return self.sign_file(file, **kwargs) + + def sign_file(self, file, keyid=None, passphrase=None, clearsign=True, + detach=False): + """sign file""" + logger.debug("sign_file: %s", file) + args = ["-sa"] + # You can't specify detach-sign and clearsign together: gpg ignores + # the detach-sign in that case. + if detach: + args.append("--detach-sign") + elif clearsign: + args.append("--clearsign") + if keyid: + args.append("--default-key %s" % keyid) + result = Sign(self.encoding) + #We could use _handle_io here except for the fact that if the + #passphrase is bad, gpg bails and you can't write the message. + #self._handle_io(args, _make_file(message), result, passphrase=passphrase) + p = self._open_subprocess(args, passphrase is not None) + try: + stdin = p.stdin + if passphrase: + _write_passphrase(stdin, passphrase, self.encoding) + writer = _threaded_copy_data(file, stdin) + except IOError: + logging.exception("error writing message") + writer = None + self._collect_output(p, result, writer) + return result + + def verify(self, data): + """Verify the signature on the contents of the string 'data' + + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input(Passphrase='foo') + >>> key = gpg.gen_key(input) + >>> assert key + >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar') + >>> assert not sig + >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo') + >>> assert sig + >>> verify = gpg.verify(sig.data) + >>> assert verify + + """ + return self.verify_file(_make_binary_stream(data, self.encoding)) + + def verify_file(self, file, data_filename=None): + "Verify the signature on the contents of the file-like object 'file'" + logger.debug('verify_file: %r, %r', file, data_filename) + result = Verify() + args = ['--verify'] + if data_filename is None: + self._handle_io(args, file, result, binary=True) + else: + logger.debug('Handling detached verification') + import tempfile + fd, fn = tempfile.mkstemp(prefix='pygpg') + s = file.read() + logger.debug('Wrote to temp file: %r', s) + os.write(fd, s) + os.close(fd) + args.append(fn) + args.append(data_filename) + try: + p = self._open_subprocess(args) + self._collect_output(p, result) + finally: + os.unlink(fn) + return result + + # + # KEY MANAGEMENT + # + + def import_keys(self, key_data): + """ import the key_data into our keyring + + >>> import shutil + >>> shutil.rmtree("keys") + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input() + >>> result = gpg.gen_key(input) + >>> print1 = result.fingerprint + >>> result = gpg.gen_key(input) + >>> print2 = result.fingerprint + >>> pubkey1 = gpg.export_keys(print1) + >>> seckey1 = gpg.export_keys(print1,secret=True) + >>> seckeys = gpg.list_keys(secret=True) + >>> pubkeys = gpg.list_keys() + >>> assert print1 in seckeys.fingerprints + >>> assert print1 in pubkeys.fingerprints + >>> str(gpg.delete_keys(print1)) + 'Must delete secret key first' + >>> str(gpg.delete_keys(print1,secret=True)) + 'ok' + >>> str(gpg.delete_keys(print1)) + 'ok' + >>> str(gpg.delete_keys("nosuchkey")) + 'No such key' + >>> seckeys = gpg.list_keys(secret=True) + >>> pubkeys = gpg.list_keys() + >>> assert not print1 in seckeys.fingerprints + >>> assert not print1 in pubkeys.fingerprints + >>> result = gpg.import_keys('foo') + >>> assert not result + >>> result = gpg.import_keys(pubkey1) + >>> pubkeys = gpg.list_keys() + >>> seckeys = gpg.list_keys(secret=True) + >>> assert not print1 in seckeys.fingerprints + >>> assert print1 in pubkeys.fingerprints + >>> result = gpg.import_keys(seckey1) + >>> assert result + >>> seckeys = gpg.list_keys(secret=True) + >>> pubkeys = gpg.list_keys() + >>> assert print1 in seckeys.fingerprints + >>> assert print1 in pubkeys.fingerprints + >>> assert print2 in pubkeys.fingerprints + + """ + result = ImportResult() + logger.debug('import_keys: %r', key_data[:256]) + data = _make_binary_stream(key_data, self.encoding) + self._handle_io(['--import'], data, result, binary=True) + logger.debug('import_keys result: %r', result.__dict__) + return result + + def delete_keys(self, fingerprints, secret=False): + which='key' + if secret: + which='secret-key' + if _is_sequence(fingerprints): + fingerprints = ' '.join(fingerprints) + args = ["--batch --delete-%s %s" % (which, fingerprints)] + result = DeleteResult() + p = self._open_subprocess(args) + self._collect_output(p, result) + return result + + def export_keys(self, keyids, secret=False): + "export the indicated keys. 'keyid' is anything gpg accepts" + which='' + if secret: + which='-secret-key' + if _is_sequence(keyids): + keyids = ' '.join(keyids) + args = ["--armor --export%s %s" % (which, keyids)] + p = self._open_subprocess(args) + # gpg --export produces no status-fd output; stdout will be + # empty in case of failure + #stdout, stderr = p.communicate() + result = DeleteResult() # any result will do + self._collect_output(p, result) + logger.debug('export_keys result: %r', result.data) + return result.data.decode(self.encoding) + + def list_keys(self, secret=False): + """ list the keys currently in the keyring + + >>> import shutil + >>> shutil.rmtree("keys") + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input() + >>> result = gpg.gen_key(input) + >>> print1 = result.fingerprint + >>> result = gpg.gen_key(input) + >>> print2 = result.fingerprint + >>> pubkeys = gpg.list_keys() + >>> assert print1 in pubkeys.fingerprints + >>> assert print2 in pubkeys.fingerprints + + """ + + which='keys' + if secret: + which='secret-keys' + args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which) + args = [args] + p = self._open_subprocess(args) + + # there might be some status thingumy here I should handle... (amk) + # ...nope, unless you care about expired sigs or keys (stevegt) + + # Get the response information + result = ListKeys() + self._collect_output(p, result) + lines = result.data.decode(self.encoding).splitlines() + valid_keywords = 'pub uid sec fpr'.split() + for line in lines: + if self.verbose: + print(line) + logger.debug("line: %r", line.rstrip()) + if not line: + break + L = line.strip().split(':') + if not L: + continue + keyword = L[0] + if keyword in valid_keywords: + getattr(result, keyword)(L) + return result + + def gen_key(self, input): + """Generate a key; you might use gen_key_input() to create the + control input. + + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input() + >>> result = gpg.gen_key(input) + >>> assert result + >>> result = gpg.gen_key('foo') + >>> assert not result + + """ + args = ["--gen-key --batch"] + result = GenKey() + file = _make_file(input) + self._handle_io(args, file, result) + return result + + def gen_key_input(self, **kwargs): + """ + Generate --gen-key input per gpg doc/DETAILS + """ + parms = {} + for key, val in list(kwargs.items()): + key = key.replace('_','-').title() + parms[key] = val + parms.setdefault('Key-Type','RSA') + parms.setdefault('Key-Length',1024) + parms.setdefault('Name-Real', "Autogenerated Key") + parms.setdefault('Name-Comment', "Generated by gnupg.py") + try: + logname = os.environ['LOGNAME'] + except KeyError: + logname = os.environ['USERNAME'] + hostname = socket.gethostname() + parms.setdefault('Name-Email', "%s@%s" % (logname.replace(' ', '_'), + hostname)) + out = "Key-Type: %s\n" % parms.pop('Key-Type') + for key, val in list(parms.items()): + out += "%s: %s\n" % (key, val) + out += "%commit\n" + return out + + # Key-Type: RSA + # Key-Length: 1024 + # Name-Real: ISdlink Server on %s + # Name-Comment: Created by %s + # Name-Email: isdlink@%s + # Expire-Date: 0 + # %commit + # + # + # Key-Type: DSA + # Key-Length: 1024 + # Subkey-Type: ELG-E + # Subkey-Length: 1024 + # Name-Real: Joe Tester + # Name-Comment: with stupid passphrase + # Name-Email: joe@foo.bar + # Expire-Date: 0 + # Passphrase: abc + # %pubring foo.pub + # %secring foo.sec + # %commit + + # + # ENCRYPTION + # + def encrypt_file(self, file, recipients, sign=None, + always_trust=False, passphrase=None, + armor=True, output=None): + "Encrypt the message read from the file-like object 'file'" + args = ['--encrypt'] + if armor: # create ascii-armored output - set to False for binary output + args.append('--armor') + if output: # write the output to a file with the specified name + if os.path.exists(output): + os.remove(output) # to avoid overwrite confirmation message + args.append('--output %s' % output) + if not _is_sequence(recipients): + recipients = (recipients,) + for recipient in recipients: + args.append('--recipient %s' % recipient) + if sign: + args.append("--sign --default-key %s" % sign) + if always_trust: + args.append("--always-trust") + result = Crypt(self.encoding) + self._handle_io(args, file, result, passphrase=passphrase, binary=True) + logger.debug('encrypt result: %r', result.data) + return result + + def encrypt(self, data, recipients, **kwargs): + """Encrypt the message contained in the string 'data' + + >>> import shutil + >>> if os.path.exists("keys"): + ... shutil.rmtree("keys") + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input(passphrase='foo') + >>> result = gpg.gen_key(input) + >>> print1 = result.fingerprint + >>> input = gpg.gen_key_input() + >>> result = gpg.gen_key(input) + >>> print2 = result.fingerprint + >>> result = gpg.encrypt("hello",print2) + >>> message = str(result) + >>> assert message != 'hello' + >>> result = gpg.decrypt(message) + >>> assert result + >>> str(result) + 'hello' + >>> result = gpg.encrypt("hello again",print1) + >>> message = str(result) + >>> result = gpg.decrypt(message) + >>> result.status + 'need passphrase' + >>> result = gpg.decrypt(message,passphrase='bar') + >>> result.status + 'decryption failed' + >>> assert not result + >>> result = gpg.decrypt(message,passphrase='foo') + >>> result.status + 'decryption ok' + >>> str(result) + 'hello again' + >>> result = gpg.encrypt("signed hello",print2,sign=print1) + >>> result.status + 'need passphrase' + >>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo') + >>> result.status + 'encryption ok' + >>> message = str(result) + >>> result = gpg.decrypt(message) + >>> result.status + 'decryption ok' + >>> assert result.fingerprint == print1 + + """ + data = _make_binary_stream(data, self.encoding) + return self.encrypt_file(data, recipients, **kwargs) + + def decrypt(self, message, **kwargs): + data = _make_binary_stream(message, self.encoding) + return self.decrypt_file(data, **kwargs) + + def decrypt_file(self, file, always_trust=False, passphrase=None, + output=None): + args = ["--decrypt"] + if output: # write the output to a file with the specified name + if os.path.exists(output): + os.remove(output) # to avoid overwrite confirmation message + args.append('--output %s' % output) + if always_trust: + args.append("--always-trust") + result = Crypt(self.encoding) + self._handle_io(args, file, result, passphrase, binary=True) + logger.debug('decrypt result: %r', result.data) + return result + +class Verify(object): + "Handle status messages for --verify" + + def __init__(self): + self.valid = False + self.fingerprint = self.creation_date = self.timestamp = None + self.signature_id = self.key_id = None + self.username = None + + def __nonzero__(self): + return self.valid + + __bool__ = __nonzero__ + + def handle_status(self, key, value): + if key in ("TRUST_UNDEFINED", "TRUST_NEVER", "TRUST_MARGINAL", + "TRUST_FULLY", "TRUST_ULTIMATE", "RSA_OR_IDEA"): + pass + elif key in ("PLAINTEXT", "PLAINTEXT_LENGTH"): + pass + elif key == "BADSIG": + self.valid = False + self.key_id, self.username = value.split(None, 1) + elif key == "GOODSIG": + self.valid = True + self.key_id, self.username = value.split(None, 1) + elif key == "VALIDSIG": + (self.fingerprint, + self.creation_date, + self.sig_timestamp, + self.expire_timestamp) = value.split()[:4] + elif key == "SIG_ID": + (self.signature_id, + self.creation_date, self.timestamp) = value.split() + else: + raise ValueError("Unknown status message: %r" % key) + +class ImportResult(object): + "Handle status messages for --import" + + counts = '''count no_user_id imported imported_rsa unchanged + n_uids n_subk n_sigs n_revoc sec_read sec_imported + sec_dups not_imported'''.split() + def __init__(self): + self.imported = [] + self.results = [] + self.fingerprints = [] + for result in self.counts: + setattr(self, result, None) + + def __nonzero__(self): + if self.not_imported: return False + if not self.fingerprints: return False + return True + + __bool__ = __nonzero__ + + ok_reason = { + '0': 'Not actually changed', + '1': 'Entirely new key', + '2': 'New user IDs', + '4': 'New signatures', + '8': 'New subkeys', + '16': 'Contains private key', + } + + problem_reason = { + '0': 'No specific reason given', + '1': 'Invalid Certificate', + '2': 'Issuer Certificate missing', + '3': 'Certificate Chain too long', + '4': 'Error storing certificate', + } + + def handle_status(self, key, value): + if key == "IMPORTED": + # this duplicates info we already see in import_ok & import_problem + pass + elif key == "NODATA": + self.results.append({'fingerprint': None, + 'problem': '0', 'text': 'No valid data found'}) + elif key == "IMPORT_OK": + reason, fingerprint = value.split() + reasons = [] + for code, text in list(self.ok_reason.items()): + if int(reason) | int(code) == int(reason): + reasons.append(text) + reasontext = '\n'.join(reasons) + "\n" + self.results.append({'fingerprint': fingerprint, + 'ok': reason, 'text': reasontext}) + self.fingerprints.append(fingerprint) + elif key == "IMPORT_PROBLEM": + try: + reason, fingerprint = value.split() + except: + reason = value + fingerprint = '' + self.results.append({'fingerprint': fingerprint, + 'problem': reason, 'text': self.problem_reason[reason]}) + elif key == "IMPORT_RES": + import_res = value.split() + for i in range(len(self.counts)): + setattr(self, self.counts[i], int(import_res[i])) + else: + raise ValueError("Unknown status message: %r" % key) + + def summary(self): + l = [] + l.append('%d imported'%self.imported) + if self.not_imported: + l.append('%d not imported'%self.not_imported) + return ', '.join(l) + +class ListKeys(list): + ''' Handle status messages for --list-keys. + + Handle pub and uid (relating the latter to the former). + + Don't care about (info from src/DETAILS): + + crt = X.509 certificate + crs = X.509 certificate and private key available + sub = subkey (secondary key) + ssb = secret subkey (secondary key) + uat = user attribute (same as user id except for field 10). + sig = signature + rev = revocation signature + pkd = public key data (special field format, see below) + grp = reserved for gpgsm + rvk = revocation key + ''' + def __init__(self): + self.curkey = None + self.fingerprints = [] + + def key(self, args): + vars = (""" + type trust length algo keyid date expires dummy ownertrust uid + """).split() + self.curkey = {} + for i in range(len(vars)): + self.curkey[vars[i]] = args[i] + self.curkey['uids'] = [self.curkey['uid']] + del self.curkey['uid'] + self.append(self.curkey) + + pub = sec = key + + def fpr(self, args): + self.curkey['fingerprint'] = args[9] + self.fingerprints.append(args[9]) + + def uid(self, args): + self.curkey['uids'].append(args[9]) + + def handle_status(self, key, value): + pass + +class Crypt(Verify): + "Handle status messages for --encrypt and --decrypt" + def __init__(self, encoding): + Verify.__init__(self) + self.data = '' + self.ok = False + self.status = '' + self.encoding = encoding + + def __nonzero__(self): + if self.ok: return True + return False + + __bool__ = __nonzero__ + + def __str__(self): + return self.data.decode(self.encoding) + + def handle_status(self, key, value): + if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION", + "BEGIN_SIGNING", "NO_SECKEY"): + pass + elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", + "DECRYPTION_FAILED"): + self.status = key.replace("_", " ").lower() + elif key == "NEED_PASSPHRASE_SYM": + self.status = 'need symmetric passphrase' + elif key == "BEGIN_DECRYPTION": + self.status = 'decryption incomplete' + elif key == "BEGIN_ENCRYPTION": + self.status = 'encryption incomplete' + elif key == "DECRYPTION_OKAY": + self.status = 'decryption ok' + self.ok = True + elif key == "END_ENCRYPTION": + self.status = 'encryption ok' + self.ok = True + elif key == "INV_RECP": + self.status = 'invalid recipient' + elif key == "KEYEXPIRED": + self.status = 'key expired' + elif key == "SIG_CREATED": + self.status = 'sig created' + elif key == "SIGEXPIRED": + self.status = 'sig expired' + else: + Verify.handle_status(self, key, value) + +class GenKey(object): + "Handle status messages for --gen-key" + def __init__(self): + self.type = None + self.fingerprint = None + + def __nonzero__(self): + if self.fingerprint: return True + return False + + __bool__ = __nonzero__ + + def __str__(self): + return self.fingerprint or '' + + def handle_status(self, key, value): + if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA"): + pass + elif key == "KEY_CREATED": + (self.type,self.fingerprint) = value.split() + else: + raise ValueError("Unknown status message: %r" % key) + +class DeleteResult(object): + "Handle status messages for --delete-key and --delete-secret-key" + def __init__(self): + self.status = 'ok' + + def __str__(self): + return self.status + + problem_reason = { + '1': 'No such key', + '2': 'Must delete secret key first', + '3': 'Ambigious specification', + } + + def handle_status(self, key, value): + if key == "DELETE_PROBLEM": + self.status = self.problem_reason.get(value, + "Unknown error: %r" % value) + else: + raise ValueError("Unknown status message: %r" % key) + +class Sign(object): + "Handle status messages for --sign" + def __init__(self, encoding): + self.type = None + self.fingerprint = None + self.encoding = encoding + + def __nonzero__(self): + return self.fingerprint is not None + + __bool__ = __nonzero__ + + def __str__(self): + return self.data.decode(self.encoding) + + def handle_status(self, key, value): + if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", + "GOOD_PASSPHRASE", "BEGIN_SIGNING"): + pass + elif key == "SIG_CREATED": + (self.type, + algo, hashalgo, cls, + self.timestamp, self.fingerprint + ) = value.split() + else: + raise ValueError("Unknown status message: %r" % key) diff --git a/src/features_window.py b/src/features_window.py index 28e70ed51..33de37e2f 100644 --- a/src/features_window.py +++ b/src/features_window.py @@ -60,8 +60,8 @@ class FeaturesWindow: _('Feature not available under Windows.')), _('OpenGPG message encryption'): (self.gpg_available, _('Encrypting chat messages with gpg keys.'), - _('Requires gpg and python-GnuPGInterface.'), - _('Feature not available under Windows.')), + _('Requires gpg and python-gnupg (http://code.google.com/p/python-gnupg/).'), + _('Requires gpg.exe in PATH.')), _('Network-manager'): (self.network_manager_available, _('Autodetection of network status.'), _('Requires gnome-network-manager and python-dbus.'), @@ -181,8 +181,6 @@ class FeaturesWindow: return dbus_support.supported def gpg_available(self): - if os.name == 'nt': - return False return gajim.HAVE_GPG def network_manager_available(self):