switch from GnuPGInterface to python-gnupg, so gpg is available under windows. Fixes #5096, #3615, #1890, #996

This commit is contained in:
Yann Leboulanger 2010-11-29 18:44:22 +01:00
parent 1136ae5b16
commit 6cd8e07fae
7 changed files with 976 additions and 844 deletions

1
debian/rules vendored
View File

@ -12,6 +12,5 @@ DEB_MAKE_BUILD_TARGET := all
DEB_MAKE_INSTALL_TARGET = install DESTDIR=$(DEB_DESTDIR)
binary-install/gajim::
rm $(DEB_DESTDIR)/usr/share/gajim/src/common/GnuPGInterface.py*
dh_pysupport -pgajim
convert $(DEB_DESTDIR)/usr/share/icons/hicolor/64x64/apps/gajim.png -resize 32x32 $(DEB_DESTDIR)/usr/share/pixmaps/gajim.xpm

View File

@ -23,17 +23,17 @@
##
import gajim
import os
from os import tmpfile
from common import helpers
if gajim.HAVE_GPG:
import GnuPGInterface
import gnupg
class GnuPG(GnuPGInterface.GnuPG):
def __init__(self, use_agent = False):
GnuPGInterface.GnuPG.__init__(self)
class GnuPG(gnupg.GPG):
def __init__(self, use_agent=False):
gnupg.GPG.__init__(self)
self.passphrase = None
self.use_agent = use_agent
self._setup_my_options()
self.always_trust = False
def _setup_my_options(self):
@ -46,166 +46,56 @@ if gajim.HAVE_GPG:
if self.use_agent:
self.options.extra_args.append('--use-agent')
def _read_response(self, child_stdout):
# Internal method: reads all the output from GPG, taking notice
# only of lines that begin with the magic [GNUPG:] prefix.
# (See doc/DETAILS in the GPG distribution for info on GPG's
# output when --status-fd is specified.)
#
# Returns a dictionary, mapping GPG's keywords to the arguments
# for that keyword.
resp = {}
while True:
line = helpers.temp_failure_retry(child_stdout.readline)
if line == "": break
line = line.rstrip()
if line[0:9] == '[GNUPG:] ':
# Chop off the prefix
line = line[9:]
L = line.split(None, 1)
keyword = L[0]
if len(L) > 1:
resp[ keyword ] = L[1]
else:
resp[ keyword ] = ""
return resp
def encrypt(self, str_, recipients, always_trust=False):
self.options.recipients = recipients # a list!
result = super(GnuPG, self).encrypt(str_, recipients,
always_trust=always_trust, passphrase=self.passphrase)
opt = ['--encrypt']
if always_trust or self.always_trust:
opt.append('--always-trust')
proc = self.run(opt, create_fhs=['stdin', 'stdout', 'status',
'stderr'])
proc.handles['stdin'].write(str_)
try:
proc.handles['stdin'].close()
except IOError:
pass
output = proc.handles['stdout'].read()
try:
proc.handles['stdout'].close()
except IOError:
pass
stat = proc.handles['status']
resp = self._read_response(stat)
try:
proc.handles['status'].close()
except IOError:
pass
error = proc.handles['stderr'].read()
proc.handles['stderr'].close()
try: proc.wait()
except IOError: pass
if 'INV_RECP' in resp and resp['INV_RECP'].split()[0] == '10':
# unusable recipient "Key not trusted"
if result.status == 'invalid recipient':
return '', 'NOT_TRUSTED'
if 'BEGIN_ENCRYPTION' in resp and 'END_ENCRYPTION' in resp:
# Encryption succeeded, even if there is output on stderr. Maybe
# verbose is on
if result.ok:
error = ''
return self._stripHeaderFooter(output), helpers.decode_string(error)
else:
error = result.status
return self._stripHeaderFooter(str(result)), error
def decrypt(self, str_, keyID):
proc = self.run(['--decrypt', '-q', '-u %s'%keyID], create_fhs=['stdin', 'stdout'])
enc = self._addHeaderFooter(str_, 'MESSAGE')
proc.handles['stdin'].write(enc)
proc.handles['stdin'].close()
data = self._addHeaderFooter(str_, 'MESSAGE')
result = super(GnuPG, self).decrypt(data,
passphrase=self.passphrase)
output = proc.handles['stdout'].read()
proc.handles['stdout'].close()
try: proc.wait()
except IOError: pass
return output
return str(result)
def sign(self, str_, keyID):
proc = self.run(['-b', '-u %s'%keyID], create_fhs=['stdin', 'stdout', 'status', 'stderr'])
proc.handles['stdin'].write(str_)
try:
proc.handles['stdin'].close()
except IOError:
pass
result = super(GnuPG, self).sign(str_, keyid=keyID, detach=True,
passphrase=self.passphrase)
output = proc.handles['stdout'].read()
try:
proc.handles['stdout'].close()
proc.handles['stderr'].close()
except IOError:
pass
stat = proc.handles['status']
resp = self._read_response(stat)
try:
proc.handles['status'].close()
except IOError:
pass
try: proc.wait()
except IOError: pass
if 'GOOD_PASSPHRASE' in resp or 'SIG_CREATED' in resp:
return self._stripHeaderFooter(output)
if 'KEYEXPIRED' in resp:
return 'KEYEXPIRED'
if result.fingerprint:
return self._stripHeaderFooter(str(result))
# if 'KEYEXPIRED' in resp:
# return 'KEYEXPIRED'
return 'BAD_PASSPHRASE'
def verify(self, str_, sign):
if str_ is None:
return ''
f = tmpfile()
fd = f.fileno()
f.write(str_)
f.seek(0)
data = '-----BEGIN PGP SIGNED MESSAGE-----' + os.linesep
data = data + 'Hash: SHA1' + os.linesep + os.linesep
data = data + str_ + os.linesep
data = data + self._addHeaderFooter(sign, 'SIGNATURE')
result = super(GnuPG, self).verify(data)
proc = self.run(['--verify', '--enable-special-filenames', '-', '-&%s'%fd], create_fhs=['stdin', 'status', 'stderr'])
f.close()
sign = self._addHeaderFooter(sign, 'SIGNATURE')
proc.handles['stdin'].write(sign)
proc.handles['stdin'].close()
proc.handles['stderr'].close()
stat = proc.handles['status']
resp = self._read_response(stat)
proc.handles['status'].close()
try: proc.wait()
except IOError: pass
keyid = ''
if 'GOODSIG' in resp:
keyid = resp['GOODSIG'].split()[0]
return keyid
def get_keys(self, secret = False):
if secret:
opt = '--list-secret-keys'
else:
opt = '--list-keys'
proc = self.run(['--with-colons', opt],
create_fhs=['stdout'])
output = proc.handles['stdout'].read()
proc.handles['stdout'].close()
try: proc.wait()
except IOError: pass
if result.valid:
return result.key_id
return ''
def get_keys(self, secret=False):
keys = {}
lines = output.split('\n')
for line in lines:
sline = line.split(':')
if (sline[0] == 'sec' and secret) or \
(sline[0] == 'pub' and not secret):
# decode escaped chars
name = eval('"' + sline[9].replace('"', '\\"') + '"')
# make it unicode instance
keys[sline[4][8:]] = helpers.decode_string(name)
result = super(GnuPG, self).list_keys(secret=secret)
for key in result:
# Take first not empty uid
keys[key['keyid'][8:]] = [uid for uid in key['uids'] if uid][0]
return keys
def get_secret_keys(self):
@ -216,7 +106,7 @@ if gajim.HAVE_GPG:
Remove header and footer from data
"""
if not data: return ''
lines = data.split('\n')
lines = data.splitlines()
while lines[0] != '':
lines.remove(lines[0])
while lines[0] == '':
@ -233,9 +123,9 @@ if gajim.HAVE_GPG:
"""
Add header and footer from data
"""
out = "-----BEGIN PGP %s-----\n" % type_
out = out + "Version: PGP\n"
out = out + "\n"
out = out + data + "\n"
out = out + "-----END PGP %s-----\n" % type_
out = "-----BEGIN PGP %s-----" % type_ + os.linesep
out = out + "Version: PGP" + os.linesep
out = out + os.linesep
out = out + data + os.linesep
out = out + "-----END PGP %s-----" % type_ + os.linesep
return out

View File

@ -1,673 +0,0 @@
# -*- coding:utf-8 -*-
## src/common/GnuPGInterface.py
##
## Copyright (C) 2001 Frank J. Tobin <ftobin AT neverending.org>
## Copyright (C) 2005 Nikos Kouremenos <kourem AT gmail.com>
## Copyright (C) 2006-2010 Yann Leboulanger <asterix AT lagaule.org>
## Copyright (C) 2008 Jean-Marie Traissard <jim AT lapin.org>
##
## This file is part of Gajim.
##
## Gajim is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published
## by the Free Software Foundation; version 3 only.
##
## Gajim is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Gajim. If not, see <http://www.gnu.org/licenses/>.
##
"""
Interface to GNU Privacy Guard (GnuPG)
GnuPGInterface is a Python module to interface with GnuPG.
It concentrates on interacting with GnuPG via filehandles,
providing access to control GnuPG via versatile and extensible means.
This module is based on GnuPG::Interface, a Perl module by the same author.
Normally, using this module will involve creating a
GnuPG object, setting some options in it's 'options' data member
(which is of type Options), creating some pipes
to talk with GnuPG, and then calling the run() method, which will
connect those pipes to the GnuPG process. run() returns a
Process object, which contains the filehandles to talk to GnuPG with.
Example code:
>>> import GnuPGInterface
>>>
>>> plaintext = "Three blind mice"
>>> passphrase = "This is the passphrase"
>>>
>>> gnupg = GnuPGInterface.GnuPG()
>>> gnupg.options.armor = 1
>>> gnupg.options.meta_interactive = 0
>>> gnupg.options.extra_args.append('--no-secmem-warning')
>>>
>>> # Normally we might specify something in
>>> # gnupg.options.recipients, like
>>> # gnupg.options.recipients = [ '0xABCD1234', 'bob@foo.bar' ]
>>> # but since we're doing symmetric-only encryption, it's not needed.
>>> # If you are doing standard, public-key encryption, using
>>> # --encrypt, you will need to specify recipients before
>>> # calling gnupg.run()
>>>
>>> # First we'll encrypt the test_text input symmetrically
>>> p1 = gnupg.run(['--symmetric'],
... create_fhs=['stdin', 'stdout', 'passphrase'])
>>>
>>> p1.handles['passphrase'].write(passphrase)
>>> p1.handles['passphrase'].close()
>>>
>>> p1.handles['stdin'].write(plaintext)
>>> p1.handles['stdin'].close()
>>>
>>> ciphertext = p1.handles['stdout'].read()
>>> p1.handles['stdout'].close()
>>>
>>> # process cleanup
>>> p1.wait()
>>>
>>> # Now we'll decrypt what we just encrypted it,
>>> # using the convience method to get the
>>> # passphrase to GnuPG
>>> gnupg.passphrase = passphrase
>>>
>>> p2 = gnupg.run(['--decrypt'], create_fhs=['stdin', 'stdout'])
>>>
>>> p2.handles['stdin'].write(ciphertext)
>>> p2.handles['stdin'].close()
>>>
>>> decrypted_plaintext = p2.handles['stdout'].read()
>>> p2.handles['stdout'].close()
>>>
>>> # process cleanup
>>> p2.wait()
>>>
>>> # Our decrypted plaintext:
>>> decrypted_plaintext
'Three blind mice'
>>>
>>> # ...and see it's the same as what we orignally encrypted
>>> assert decrypted_plaintext == plaintext, \
"GnuPG decrypted output does not match original input"
>>>
>>>
>>> ##################################################
>>> # Now let's trying using run()'s attach_fhs paramter
>>>
>>> # we're assuming we're running on a unix...
>>> input = open('/etc/motd')
>>>
>>> p1 = gnupg.run(['--symmetric'], create_fhs=['stdout'],
... attach_fhs={'stdin': input})
>>>
>>> # GnuPG will read the stdin from /etc/motd
>>> ciphertext = p1.handles['stdout'].read()
>>>
>>> # process cleanup
>>> p1.wait()
>>>
>>> # Now let's run the output through GnuPG
>>> # We'll write the output to a temporary file,
>>> import tempfile
>>> temp = tempfile.TemporaryFile()
>>>
>>> p2 = gnupg.run(['--decrypt'], create_fhs=['stdin'],
... attach_fhs={'stdout': temp})
>>>
>>> # give GnuPG our encrypted stuff from the first run
>>> p2.handles['stdin'].write(ciphertext)
>>> p2.handles['stdin'].close()
>>>
>>> # process cleanup
>>> p2.wait()
>>>
>>> # rewind the tempfile and see what GnuPG gave us
>>> temp.seek(0)
>>> decrypted_plaintext = temp.read()
>>>
>>> # compare what GnuPG decrypted with our original input
>>> input.seek(0)
>>> input_data = input.read()
>>>
>>> assert decrypted_plaintext == input_data, \
"GnuPG decrypted output does not match original input"
To do things like public-key encryption, simply pass do something
like:
gnupg.passphrase = 'My passphrase'
gnupg.options.recipients = [ 'bob@foobar.com' ]
gnupg.run( ['--sign', '--encrypt'], create_fhs=..., attach_fhs=...)
Here is an example of subclassing GnuPGInterface.GnuPG,
so that it has an encrypt_string() method that returns
ciphertext.
>>> import GnuPGInterface
>>>
>>> class MyGnuPG(GnuPGInterface.GnuPG):
...
... def __init__(self):
... GnuPGInterface.GnuPG.__init__(self)
... self.setup_my_options()
...
... def setup_my_options(self):
... self.options.armor = 1
... self.options.meta_interactive = 0
... self.options.extra_args.append('--no-secmem-warning')
...
... def encrypt_string(self, string, recipients):
... gnupg.options.recipients = recipients # a list!
...
... proc = gnupg.run(['--encrypt'], create_fhs=['stdin', 'stdout'])
...
... proc.handles['stdin'].write(string)
... proc.handles['stdin'].close()
...
... output = proc.handles['stdout'].read()
... proc.handles['stdout'].close()
...
... proc.wait()
... return output
...
>>> gnupg = MyGnuPG()
>>> ciphertext = gnupg.encrypt_string("The secret", ['0x260C4FA3'])
>>>
>>> # just a small sanity test here for doctest
>>> import types
>>> assert isinstance(ciphertext, types.StringType), \
"What GnuPG gave back is not a string!"
Here is an example of generating a key:
>>> import GnuPGInterface
>>> gnupg = GnuPGInterface.GnuPG()
>>> gnupg.options.meta_interactive = 0
>>>
>>> # We will be creative and use the logger filehandle to capture
>>> # what GnuPG says this time, instead stderr; no stdout to listen to,
>>> # but we capture logger to surpress the dry-run command.
>>> # We also have to capture stdout since otherwise doctest complains;
>>> # Normally you can let stdout through when generating a key.
>>>
>>> proc = gnupg.run(['--gen-key'], create_fhs=['stdin', 'stdout',
... 'logger'])
>>>
>>> proc.handles['stdin'].write('''Key-Type: DSA
... Key-Length: 1024
... # We are only testing syntax this time, so dry-run
... %dry-run
... Subkey-Type: ELG-E
... Subkey-Length: 1024
... Name-Real: Joe Tester
... Name-Comment: with stupid passphrase
... Name-Email: joe@foo.bar
... Expire-Date: 2y
... Passphrase: abc
... %pubring foo.pub
... %secring foo.sec
... ''')
>>>
>>> proc.handles['stdin'].close()
>>>
>>> report = proc.handles['logger'].read()
>>> proc.handles['logger'].close()
>>>
>>> proc.wait()
"""
import os
import sys
import fcntl
__author__ = "Frank J. Tobin, ftobin@neverending.org"
__version__ = "0.3.2"
__revision__ = "$Id: GnuPGInterface.py,v 1.22 2002/01/11 20:22:04 ftobin Exp $"
# "standard" filehandles attached to processes
_stds = [ 'stdin', 'stdout', 'stderr' ]
# the permissions each type of fh needs to be opened with
_fd_modes = { 'stdin': 'w',
'stdout': 'r',
'stderr': 'r',
'passphrase': 'w',
'command': 'w',
'logger': 'r',
'status': 'r'
}
# correlation between handle names and the arguments we'll pass
_fd_options = { 'passphrase': '--passphrase-fd',
'logger': '--logger-fd',
'status': '--status-fd',
'command': '--command-fd' }
class GnuPG:
"""
Class instances represent GnuPG
Instance attributes of a GnuPG object are:
* call -- string to call GnuPG with. Defaults to "gpg"
* passphrase -- Since it is a common operation
to pass in a passphrase to GnuPG,
and working with the passphrase filehandle mechanism directly
can be mundane, if set, the passphrase attribute
works in a special manner. If the passphrase attribute is set,
and no passphrase file object is sent in to run(),
then GnuPG instnace will take care of sending the passphrase to
GnuPG, the executable instead of having the user sent it in manually.
* options -- Object of type GnuPGInterface.Options.
Attribute-setting in options determines
the command-line options used when calling GnuPG.
"""
def __init__(self):
self.call = 'gpg'
self.passphrase = None
self.options = Options()
def run(self, gnupg_commands, args=None, create_fhs=None, attach_fhs=None):
"""
Calls GnuPG with the list of string commands gnupg_commands, complete
with prefixing dashes
For example, gnupg_commands could be
'["--sign", "--encrypt"]'
Returns a GnuPGInterface.Process object.
args is an optional list of GnuPG command arguments (not options),
such as keyID's to export, filenames to process, etc.
create_fhs is an optional list of GnuPG filehandle
names that will be set as keys of the returned Process object's
'handles' attribute. The generated filehandles can be used
to communicate with GnuPG via standard input, standard output,
the status-fd, passphrase-fd, etc.
Valid GnuPG filehandle names are:
* stdin
* stdout
* stderr
* status
* passphase
* command
* logger
The purpose of each filehandle is described in the GnuPG
documentation.
attach_fhs is an optional dictionary with GnuPG filehandle
names mapping to opened files. GnuPG will read or write
to the file accordingly. For example, if 'my_file' is an
opened file and 'attach_fhs[stdin] is my_file', then GnuPG
will read its standard input from my_file. This is useful
if you want GnuPG to read/write to/from an existing file.
For instance:
f = open("encrypted.gpg")
gnupg.run(["--decrypt"], attach_fhs={'stdin': f})
Using attach_fhs also helps avoid system buffering
issues that can arise when using create_fhs, which
can cause the process to deadlock.
If not mentioned in create_fhs or attach_fhs,
GnuPG filehandles which are a std* (stdin, stdout, stderr)
are defaulted to the running process' version of handle.
Otherwise, that type of handle is simply not used when calling GnuPG.
For example, if you do not care about getting data from GnuPG's
status filehandle, simply do not specify it.
run() returns a Process() object which has a 'handles'
which is a dictionary mapping from the handle name
(such as 'stdin' or 'stdout') to the respective
newly-created FileObject connected to the running GnuPG process.
For instance, if the call was
process = gnupg.run(["--decrypt"], stdin=1)
after run returns 'process.handles["stdin"]'
is a FileObject connected to GnuPG's standard input,
and can be written to.
"""
if args is None: args = []
if create_fhs is None: create_fhs = []
if attach_fhs is None: attach_fhs = {}
for std in _stds:
if std not in attach_fhs \
and std not in create_fhs:
attach_fhs.setdefault(std, getattr(sys, std))
handle_passphrase = 0
if self.passphrase is not None \
and 'passphrase' not in attach_fhs \
and 'passphrase' not in create_fhs:
handle_passphrase = 1
create_fhs.append('passphrase')
process = self._attach_fork_exec(gnupg_commands, args,
create_fhs, attach_fhs)
if handle_passphrase:
passphrase_fh = process.handles['passphrase']
passphrase_fh.write( self.passphrase )
passphrase_fh.close()
del process.handles['passphrase']
return process
def _attach_fork_exec(self, gnupg_commands, args, create_fhs, attach_fhs):
"""
This is like run(), but without the passphrase-helping (note that run()
calls this)
"""
process = Process()
for fh_name in create_fhs + attach_fhs.keys():
if fh_name not in _fd_modes:
raise KeyError, \
"unrecognized filehandle name '%s'; must be one of %s" \
% (fh_name, _fd_modes.keys())
for fh_name in create_fhs:
# make sure the user doesn't specify a filehandle
# to be created *and* attached
if fh_name in attach_fhs:
raise ValueError, \
"cannot have filehandle '%s' in both create_fhs and attach_fhs" \
% fh_name
pipe = os.pipe()
# fix by drt@un.bewaff.net noting
# that since pipes are unidirectional on some systems,
# so we have to 'turn the pipe around'
# if we are writing
if _fd_modes[fh_name] == 'w': pipe = (pipe[1], pipe[0])
process._pipes[fh_name] = Pipe(pipe[0], pipe[1], 0)
for fh_name, fh in attach_fhs.items():
process._pipes[fh_name] = Pipe(fh.fileno(), fh.fileno(), 1)
process.pid = os.fork()
if process.pid == 0: self._as_child(process, gnupg_commands, args)
return self._as_parent(process)
def _as_parent(self, process):
"""
Stuff run after forking in parent
"""
for k, p in process._pipes.items():
if not p.direct:
os.close(p.child)
process.handles[k] = os.fdopen(p.parent, _fd_modes[k])
# user doesn't need these
del process._pipes
return process
def _as_child(self, process, gnupg_commands, args):
"""
Stuff run after forking in child
"""
# child
for std in _stds:
p = process._pipes[std]
os.dup2( p.child, getattr(sys, "__%s__" % std).fileno() )
for k, p in process._pipes.items():
if p.direct and k not in _stds:
# we want the fh to stay open after execing
fcntl.fcntl( p.child, fcntl.F_SETFD, 0 )
fd_args = []
for k, p in process._pipes.items():
# set command-line options for non-standard fds
if k not in _stds:
fd_args.extend([ _fd_options[k], "%d" % p.child ])
if not p.direct: os.close(p.parent)
command = [ self.call ] + fd_args + self.options.get_args() \
+ gnupg_commands + args
os.execvp( command[0], command )
class Pipe:
"""
Simple struct holding stuff about pipes we use
"""
def __init__(self, parent, child, direct):
self.parent = parent
self.child = child
self.direct = direct
class Options:
"""
Objects of this class encompass options passed to GnuPG.
This class is responsible for determining command-line arguments
which are based on options. It can be said that a GnuPG
object has-a Options object in its options attribute.
Attributes which correlate directly to GnuPG options:
Each option here defaults to false or None, and is described in
GnuPG documentation.
Booleans (set these attributes to booleans)
* armor
* no_greeting
* no_verbose
* quiet
* batch
* always_trust
* rfc1991
* openpgp
* force_v3_sigs
* no_options
* textmode
Strings (set these attributes to strings)
* homedir
* default_key
* comment
* compress_algo
* options
Lists (set these attributes to lists)
* recipients (***NOTE*** plural of 'recipient')
* encrypt_to
Meta options
Meta options are options provided by this module that do
not correlate directly to any GnuPG option by name,
but are rather bundle of options used to accomplish
a specific goal, such as obtaining compatibility with PGP 5.
The actual arguments each of these reflects may change with time. Each
defaults to false unless otherwise specified.
meta_pgp_5_compatible -- If true, arguments are generated to try
to be compatible with PGP 5.x.
meta_pgp_2_compatible -- If true, arguments are generated to try
to be compatible with PGP 2.x.
meta_interactive -- If false, arguments are generated to try to
help the using program use GnuPG in a non-interactive
environment, such as CGI scripts. Default is true.
extra_args -- Extra option arguments may be passed in
via the attribute extra_args, a list.
>>> import GnuPGInterface
>>>
>>> gnupg = GnuPGInterface.GnuPG()
>>> gnupg.options.armor = 1
>>> gnupg.options.recipients = ['Alice', 'Bob']
>>> gnupg.options.extra_args = ['--no-secmem-warning']
>>>
>>> # no need for users to call this normally; just for show here
>>> gnupg.options.get_args()
['--armor', '--recipient', 'Alice', '--recipient', 'Bob', '--no-secmem-warning']
"""
def __init__(self):
# booleans
self.armor = 0
self.no_greeting = 0
self.verbose = 0
self.no_verbose = 0
self.quiet = 0
self.batch = 0
self.always_trust = 0
self.rfc1991 = 0
self.openpgp = 0
self.force_v3_sigs = 0
self.no_options = 0
self.textmode = 0
# meta-option booleans
self.meta_pgp_5_compatible = 0
self.meta_pgp_2_compatible = 0
self.meta_interactive = 1
# strings
self.homedir = None
self.default_key = None
self.comment = None
self.compress_algo = None
self.options = None
# lists
self.encrypt_to = []
self.recipients = []
# miscellaneous arguments
self.extra_args = []
def get_args( self ):
"""
Generate a list of GnuPG arguments based upon attributes
"""
return self.get_meta_args() + self.get_standard_args() + self.extra_args
def get_standard_args( self ):
"""
Generate a list of standard, non-meta or extra arguments
"""
args = []
if self.homedir is not None:
args.extend( [ '--homedir', self.homedir ] )
if self.options is not None:
args.extend( [ '--options', self.options ] )
if self.comment is not None:
args.extend( [ '--comment', self.comment ] )
if self.compress_algo is not None:
args.extend( [ '--compress-algo', self.compress_algo ] )
if self.default_key is not None:
args.extend( [ '--default-key', self.default_key ] )
if self.no_options: args.append( '--no-options' )
if self.armor: args.append( '--armor' )
if self.textmode: args.append( '--textmode' )
if self.no_greeting: args.append( '--no-greeting' )
if self.verbose: args.append( '--verbose' )
if self.no_verbose: args.append( '--no-verbose' )
if self.quiet: args.append( '--quiet' )
if self.batch: args.append( '--batch' )
if self.always_trust: args.append( '--always-trust' )
if self.force_v3_sigs: args.append( '--force-v3-sigs' )
if self.rfc1991: args.append( '--rfc1991' )
if self.openpgp: args.append( '--openpgp' )
for r in self.recipients: args.extend( [ '--recipient', r ] )
for r in self.encrypt_to: args.extend( [ '--encrypt-to', r ] )
return args
def get_meta_args( self ):
"""
Get a list of generated meta-arguments
"""
args = []
if self.meta_pgp_5_compatible: args.extend( [ '--compress-algo', '1',
'--force-v3-sigs'
] )
if self.meta_pgp_2_compatible: args.append( '--rfc1991' )
if not self.meta_interactive: args.extend( [ '--batch', '--no-tty' ] )
return args
class Process:
"""
Objects of this class encompass properties of a GnuPG process spawned by
GnuPG.run()
# gnupg is a GnuPG object
process = gnupg.run( [ '--decrypt' ], stdout = 1 )
out = process.handles['stdout'].read()
...
os.waitpid( process.pid, 0 )
Data Attributes
handles -- This is a map of filehandle-names to
the file handles, if any, that were requested via run() and hence
are connected to the running GnuPG process. Valid names
of this map are only those handles that were requested.
pid -- The PID of the spawned GnuPG process.
Useful to know, since once should call
os.waitpid() to clean up the process, especially
if multiple calls are made to run().
"""
def __init__(self):
self._pipes = {}
self.handles = {}
self.pid = None
self._waited = None
def wait(self):
"""
Wait on the process to exit, allowing for child cleanup. Will raise an
IOError if the process exits non-zero
"""
e = os.waitpid(self.pid, 0)[1]
if e != 0:
raise IOError, "GnuPG exited non-zero, with code %d" % (e << 8)
def _run_doctests():
import doctest, GnuPGInterface
return doctest.testmod(GnuPGInterface)
# deprecated
GnuPGInterface = GnuPG
if __name__ == '__main__':
_run_doctests()

View File

@ -218,13 +218,12 @@ class CommonConnection:
if self.gpg.passphrase is None and not use_gpg_agent:
# We didn't set a passphrase
return None
if self.gpg.passphrase is not None or use_gpg_agent:
signed = self.gpg.sign(msg, keyID)
if signed == 'BAD_PASSPHRASE':
self.USE_GPG = False
signed = ''
gajim.nec.push_incoming_event(BadGPGPassphraseEvent(None,
conn=self))
signed = self.gpg.sign(msg, keyID)
if signed == 'BAD_PASSPHRASE':
self.USE_GPG = False
signed = ''
gajim.nec.push_incoming_event(BadGPGPassphraseEvent(None,
conn=self))
return signed
def _on_disconnected(self):
@ -596,14 +595,12 @@ class CommonConnection:
def ask_gpg_keys(self):
if self.gpg:
keys = self.gpg.get_keys()
return keys
return self.gpg.get_keys()
return None
def ask_gpg_secrete_keys(self):
if self.gpg:
keys = self.gpg.get_secret_keys()
return keys
return self.gpg.get_secret_keys()
return None
def load_roster_from_db(self):

View File

@ -150,7 +150,7 @@ except ImportError:
HAVE_GPG = True
try:
import GnuPGInterface
import gnupg
except ImportError:
HAVE_GPG = False
else:

921
src/common/gnupg.py Normal file
View File

@ -0,0 +1,921 @@
""" A wrapper for the 'gpg' command::
Portions of this module are derived from A.M. Kuchling's well-designed
GPG.py, using Richard Jones' updated version 1.3, which can be found
in the pycrypto CVS repository on Sourceforge:
http://pycrypto.cvs.sourceforge.net/viewvc/pycrypto/gpg/GPG.py
This module is *not* forward-compatible with amk's; some of the
old interface has changed. For instance, since I've added decrypt
functionality, I elected to initialize with a 'gnupghome' argument
instead of 'keyring', so that gpg can find both the public and secret
keyrings. I've also altered some of the returned objects in order for
the caller to not have to know as much about the internals of the
result classes.
While the rest of ISconf is released under the GPL, I am releasing
this single file under the same terms that A.M. Kuchling used for
pycrypto.
Steve Traugott, stevegt@terraluna.org
Thu Jun 23 21:27:20 PDT 2005
This version of the module has been modified from Steve Traugott's version
(see http://trac.t7a.org/isconf/browser/trunk/lib/python/isconf/GPG.py) by
Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork()
and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
the previous versions.
Modifications Copyright (C) 2008-2010 Vinay Sajip. All rights reserved.
A unittest harness (test_gnupg.py) has also been added.
"""
import locale
__author__ = "Vinay Sajip"
__date__ = "$08-Oct-2010 23:01:07$"
try:
from io import StringIO
from io import TextIOWrapper
from io import BufferedReader
from io import BufferedWriter
except ImportError:
from cStringIO import StringIO
class BufferedReader: pass
class BufferedWriter: pass
import locale
import logging
import os
import socket
from subprocess import Popen
from subprocess import PIPE
import sys
import threading
try:
import logging.NullHandler as NullHandler
except ImportError:
class NullHandler(logging.Handler):
def handle(self, record):
pass
try:
unicode
_py3k = False
except NameError:
_py3k = True
logger = logging.getLogger(__name__)
if not logger.handlers:
logger.addHandler(NullHandler())
def _copy_data(instream, outstream):
# Copy one stream to another
sent = 0
if hasattr(sys.stdin, 'encoding'):
enc = sys.stdin.encoding
else:
enc = 'ascii'
while True:
data = instream.read(1024)
if len(data) == 0:
break
sent += len(data)
logger.debug("sending chunk (%d): %r", sent, data[:256])
try:
outstream.write(data)
except UnicodeError:
outstream.write(data.encode(enc))
except:
# Can sometimes get 'broken pipe' errors even when the data has all
# been sent
logger.exception('Error sending data')
break
try:
outstream.close()
except IOError:
logger.warning('Exception occurred while closing: ignored', exc_info=1)
logger.debug("closed output, %d bytes sent", sent)
def _threaded_copy_data(instream, outstream):
wr = threading.Thread(target=_copy_data, args=(instream, outstream))
wr.setDaemon(True)
logger.debug('data copier: %r, %r, %r', wr, instream, outstream)
wr.start()
return wr
def _write_passphrase(stream, passphrase, encoding):
passphrase = '%s\n' % passphrase
passphrase = passphrase.encode(encoding)
stream.write(passphrase)
logger.debug("Wrote passphrase: %r", passphrase)
def _is_sequence(instance):
return isinstance(instance,list) or isinstance(instance,tuple)
def _wrap_input(inp):
if isinstance(inp, BufferedWriter):
oldinp = inp
inp = TextIOWrapper(inp)
logger.debug('wrapped input: %r -> %r', oldinp, inp)
return inp
def _wrap_output(outp):
if isinstance(outp, BufferedReader):
oldoutp = outp
outp = TextIOWrapper(outp)
logger.debug('wrapped output: %r -> %r', oldoutp, outp)
return outp
#The following is needed for Python2.7 :-(
def _make_file(s):
try:
rv = StringIO(s)
except (TypeError, UnicodeError):
from io import BytesIO
rv = BytesIO(s)
return rv
def _make_binary_stream(s, encoding):
try:
if _py3k:
if isinstance(s, str):
s = s.encode(encoding)
else:
if type(s) is not str:
s = s.encode(encoding)
from io import BytesIO
rv = BytesIO(s)
except ImportError:
rv = StringIO(s)
return rv
class GPG(object):
"Encapsulate access to the gpg executable"
def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False):
"""Initialize a GPG process wrapper. Options are:
gpgbinary -- full pathname for GPG binary.
gnupghome -- full pathname to where we can find the public and
private keyrings. Default is whatever gpg defaults to.
"""
self.gpgbinary = gpgbinary
self.gnupghome = gnupghome
self.verbose = verbose
self.encoding = locale.getpreferredencoding()
if self.encoding is None: # This happens on Jython!
self.encoding = sys.stdin.encoding
if gnupghome and not os.path.isdir(self.gnupghome):
os.makedirs(self.gnupghome,0x1C0)
p = self._open_subprocess(["--version"])
result = Verify() # any result will do for this
self._collect_output(p, result)
if p.returncode != 0:
raise ValueError("Error invoking gpg: %s: %s" % (p.returncode,
result.stderr))
def _open_subprocess(self, args, passphrase=False):
# Internal method: open a pipe to a GPG subprocess and return
# the file objects for communicating with it.
cmd = [self.gpgbinary, '--status-fd 2 --no-tty']
if self.gnupghome:
cmd.append('--homedir "%s" ' % self.gnupghome)
if passphrase:
cmd.append('--batch --passphrase-fd 0')
cmd.extend(args)
cmd = ' '.join(cmd)
if self.verbose:
print(cmd)
logger.debug("%s", cmd)
return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
def _read_response(self, stream, result):
# Internal method: reads all the output from GPG, taking notice
# only of lines that begin with the magic [GNUPG:] prefix.
#
# Calls methods on the response object for each valid token found,
# with the arg being the remainder of the status line.
lines = []
while True:
line = stream.readline()
lines.append(line)
if self.verbose:
print(line)
logger.debug("%s", line.rstrip())
if line == "": break
line = line.rstrip()
if line[0:9] == '[GNUPG:] ':
# Chop off the prefix
line = line[9:]
L = line.split(None, 1)
keyword = L[0]
if len(L) > 1:
value = L[1]
else:
value = ""
result.handle_status(keyword, value)
result.stderr = ''.join(lines)
def _read_data(self, stream, result):
# Read the contents of the file from GPG's stdout
chunks = []
while True:
data = stream.read(1024)
if len(data) == 0:
break
logger.debug("chunk: %r" % data[:256])
chunks.append(data)
if _py3k:
# Join using b'' or '', as appropriate
result.data = type(data)().join(chunks)
else:
result.data = ''.join(chunks)
def _collect_output(self, process, result, writer=None):
"""
Drain the subprocesses output streams, writing the collected output
to the result. If a writer thread (writing to the subprocess) is given,
make sure it's joined before returning.
"""
stderr = _wrap_output(process.stderr)
rr = threading.Thread(target=self._read_response, args=(stderr, result))
rr.setDaemon(True)
logger.debug('stderr reader: %r', rr)
rr.start()
stdout = process.stdout # _wrap_output(process.stdout)
dr = threading.Thread(target=self._read_data, args=(stdout, result))
dr.setDaemon(True)
logger.debug('stdout reader: %r', dr)
dr.start()
dr.join()
rr.join()
if writer is not None:
writer.join()
process.wait()
def _handle_io(self, args, file, result, passphrase=None, binary=False):
"Handle a call to GPG - pass input data, collect output data"
# Handle a basic data call - pass data to GPG, handle the output
# including status information. Garbage In, Garbage Out :)
p = self._open_subprocess(args, passphrase is not None)
if not binary and not isinstance(file, BufferedReader):
stdin = _wrap_input(p.stdin)
else:
stdin = p.stdin
if passphrase:
_write_passphrase(stdin, passphrase, self.encoding)
writer = _threaded_copy_data(file, stdin)
self._collect_output(p, result, writer)
return result
#
# SIGNATURE METHODS
#
def sign(self, message, **kwargs):
"""sign message"""
file = _make_binary_stream(message, self.encoding)
return self.sign_file(file, **kwargs)
def sign_file(self, file, keyid=None, passphrase=None, clearsign=True,
detach=False):
"""sign file"""
logger.debug("sign_file: %s", file)
args = ["-sa"]
# You can't specify detach-sign and clearsign together: gpg ignores
# the detach-sign in that case.
if detach:
args.append("--detach-sign")
elif clearsign:
args.append("--clearsign")
if keyid:
args.append("--default-key %s" % keyid)
result = Sign(self.encoding)
#We could use _handle_io here except for the fact that if the
#passphrase is bad, gpg bails and you can't write the message.
#self._handle_io(args, _make_file(message), result, passphrase=passphrase)
p = self._open_subprocess(args, passphrase is not None)
try:
stdin = p.stdin
if passphrase:
_write_passphrase(stdin, passphrase, self.encoding)
writer = _threaded_copy_data(file, stdin)
except IOError:
logging.exception("error writing message")
writer = None
self._collect_output(p, result, writer)
return result
def verify(self, data):
"""Verify the signature on the contents of the string 'data'
>>> gpg = GPG(gnupghome="keys")
>>> input = gpg.gen_key_input(Passphrase='foo')
>>> key = gpg.gen_key(input)
>>> assert key
>>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar')
>>> assert not sig
>>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo')
>>> assert sig
>>> verify = gpg.verify(sig.data)
>>> assert verify
"""
return self.verify_file(_make_binary_stream(data, self.encoding))
def verify_file(self, file, data_filename=None):
"Verify the signature on the contents of the file-like object 'file'"
logger.debug('verify_file: %r, %r', file, data_filename)
result = Verify()
args = ['--verify']
if data_filename is None:
self._handle_io(args, file, result, binary=True)
else:
logger.debug('Handling detached verification')
import tempfile
fd, fn = tempfile.mkstemp(prefix='pygpg')
s = file.read()
logger.debug('Wrote to temp file: %r', s)
os.write(fd, s)
os.close(fd)
args.append(fn)
args.append(data_filename)
try:
p = self._open_subprocess(args)
self._collect_output(p, result)
finally:
os.unlink(fn)
return result
#
# KEY MANAGEMENT
#
def import_keys(self, key_data):
""" import the key_data into our keyring
>>> import shutil
>>> shutil.rmtree("keys")
>>> gpg = GPG(gnupghome="keys")
>>> input = gpg.gen_key_input()
>>> result = gpg.gen_key(input)
>>> print1 = result.fingerprint
>>> result = gpg.gen_key(input)
>>> print2 = result.fingerprint
>>> pubkey1 = gpg.export_keys(print1)
>>> seckey1 = gpg.export_keys(print1,secret=True)
>>> seckeys = gpg.list_keys(secret=True)
>>> pubkeys = gpg.list_keys()
>>> assert print1 in seckeys.fingerprints
>>> assert print1 in pubkeys.fingerprints
>>> str(gpg.delete_keys(print1))
'Must delete secret key first'
>>> str(gpg.delete_keys(print1,secret=True))
'ok'
>>> str(gpg.delete_keys(print1))
'ok'
>>> str(gpg.delete_keys("nosuchkey"))
'No such key'
>>> seckeys = gpg.list_keys(secret=True)
>>> pubkeys = gpg.list_keys()
>>> assert not print1 in seckeys.fingerprints
>>> assert not print1 in pubkeys.fingerprints
>>> result = gpg.import_keys('foo')
>>> assert not result
>>> result = gpg.import_keys(pubkey1)
>>> pubkeys = gpg.list_keys()
>>> seckeys = gpg.list_keys(secret=True)
>>> assert not print1 in seckeys.fingerprints
>>> assert print1 in pubkeys.fingerprints
>>> result = gpg.import_keys(seckey1)
>>> assert result
>>> seckeys = gpg.list_keys(secret=True)
>>> pubkeys = gpg.list_keys()
>>> assert print1 in seckeys.fingerprints
>>> assert print1 in pubkeys.fingerprints
>>> assert print2 in pubkeys.fingerprints
"""
result = ImportResult()
logger.debug('import_keys: %r', key_data[:256])
data = _make_binary_stream(key_data, self.encoding)
self._handle_io(['--import'], data, result, binary=True)
logger.debug('import_keys result: %r', result.__dict__)
return result
def delete_keys(self, fingerprints, secret=False):
which='key'
if secret:
which='secret-key'
if _is_sequence(fingerprints):
fingerprints = ' '.join(fingerprints)
args = ["--batch --delete-%s %s" % (which, fingerprints)]
result = DeleteResult()
p = self._open_subprocess(args)
self._collect_output(p, result)
return result
def export_keys(self, keyids, secret=False):
"export the indicated keys. 'keyid' is anything gpg accepts"
which=''
if secret:
which='-secret-key'
if _is_sequence(keyids):
keyids = ' '.join(keyids)
args = ["--armor --export%s %s" % (which, keyids)]
p = self._open_subprocess(args)
# gpg --export produces no status-fd output; stdout will be
# empty in case of failure
#stdout, stderr = p.communicate()
result = DeleteResult() # any result will do
self._collect_output(p, result)
logger.debug('export_keys result: %r', result.data)
return result.data.decode(self.encoding)
def list_keys(self, secret=False):
""" list the keys currently in the keyring
>>> import shutil
>>> shutil.rmtree("keys")
>>> gpg = GPG(gnupghome="keys")
>>> input = gpg.gen_key_input()
>>> result = gpg.gen_key(input)
>>> print1 = result.fingerprint
>>> result = gpg.gen_key(input)
>>> print2 = result.fingerprint
>>> pubkeys = gpg.list_keys()
>>> assert print1 in pubkeys.fingerprints
>>> assert print2 in pubkeys.fingerprints
"""
which='keys'
if secret:
which='secret-keys'
args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which)
args = [args]
p = self._open_subprocess(args)
# there might be some status thingumy here I should handle... (amk)
# ...nope, unless you care about expired sigs or keys (stevegt)
# Get the response information
result = ListKeys()
self._collect_output(p, result)
lines = result.data.decode(self.encoding).splitlines()
valid_keywords = 'pub uid sec fpr'.split()
for line in lines:
if self.verbose:
print(line)
logger.debug("line: %r", line.rstrip())
if not line:
break
L = line.strip().split(':')
if not L:
continue
keyword = L[0]
if keyword in valid_keywords:
getattr(result, keyword)(L)
return result
def gen_key(self, input):
"""Generate a key; you might use gen_key_input() to create the
control input.
>>> gpg = GPG(gnupghome="keys")
>>> input = gpg.gen_key_input()
>>> result = gpg.gen_key(input)
>>> assert result
>>> result = gpg.gen_key('foo')
>>> assert not result
"""
args = ["--gen-key --batch"]
result = GenKey()
file = _make_file(input)
self._handle_io(args, file, result)
return result
def gen_key_input(self, **kwargs):
"""
Generate --gen-key input per gpg doc/DETAILS
"""
parms = {}
for key, val in list(kwargs.items()):
key = key.replace('_','-').title()
parms[key] = val
parms.setdefault('Key-Type','RSA')
parms.setdefault('Key-Length',1024)
parms.setdefault('Name-Real', "Autogenerated Key")
parms.setdefault('Name-Comment', "Generated by gnupg.py")
try:
logname = os.environ['LOGNAME']
except KeyError:
logname = os.environ['USERNAME']
hostname = socket.gethostname()
parms.setdefault('Name-Email', "%s@%s" % (logname.replace(' ', '_'),
hostname))
out = "Key-Type: %s\n" % parms.pop('Key-Type')
for key, val in list(parms.items()):
out += "%s: %s\n" % (key, val)
out += "%commit\n"
return out
# Key-Type: RSA
# Key-Length: 1024
# Name-Real: ISdlink Server on %s
# Name-Comment: Created by %s
# Name-Email: isdlink@%s
# Expire-Date: 0
# %commit
#
#
# Key-Type: DSA
# Key-Length: 1024
# Subkey-Type: ELG-E
# Subkey-Length: 1024
# Name-Real: Joe Tester
# Name-Comment: with stupid passphrase
# Name-Email: joe@foo.bar
# Expire-Date: 0
# Passphrase: abc
# %pubring foo.pub
# %secring foo.sec
# %commit
#
# ENCRYPTION
#
def encrypt_file(self, file, recipients, sign=None,
always_trust=False, passphrase=None,
armor=True, output=None):
"Encrypt the message read from the file-like object 'file'"
args = ['--encrypt']
if armor: # create ascii-armored output - set to False for binary output
args.append('--armor')
if output: # write the output to a file with the specified name
if os.path.exists(output):
os.remove(output) # to avoid overwrite confirmation message
args.append('--output %s' % output)
if not _is_sequence(recipients):
recipients = (recipients,)
for recipient in recipients:
args.append('--recipient %s' % recipient)
if sign:
args.append("--sign --default-key %s" % sign)
if always_trust:
args.append("--always-trust")
result = Crypt(self.encoding)
self._handle_io(args, file, result, passphrase=passphrase, binary=True)
logger.debug('encrypt result: %r', result.data)
return result
def encrypt(self, data, recipients, **kwargs):
"""Encrypt the message contained in the string 'data'
>>> import shutil
>>> if os.path.exists("keys"):
... shutil.rmtree("keys")
>>> gpg = GPG(gnupghome="keys")
>>> input = gpg.gen_key_input(passphrase='foo')
>>> result = gpg.gen_key(input)
>>> print1 = result.fingerprint
>>> input = gpg.gen_key_input()
>>> result = gpg.gen_key(input)
>>> print2 = result.fingerprint
>>> result = gpg.encrypt("hello",print2)
>>> message = str(result)
>>> assert message != 'hello'
>>> result = gpg.decrypt(message)
>>> assert result
>>> str(result)
'hello'
>>> result = gpg.encrypt("hello again",print1)
>>> message = str(result)
>>> result = gpg.decrypt(message)
>>> result.status
'need passphrase'
>>> result = gpg.decrypt(message,passphrase='bar')
>>> result.status
'decryption failed'
>>> assert not result
>>> result = gpg.decrypt(message,passphrase='foo')
>>> result.status
'decryption ok'
>>> str(result)
'hello again'
>>> result = gpg.encrypt("signed hello",print2,sign=print1)
>>> result.status
'need passphrase'
>>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo')
>>> result.status
'encryption ok'
>>> message = str(result)
>>> result = gpg.decrypt(message)
>>> result.status
'decryption ok'
>>> assert result.fingerprint == print1
"""
data = _make_binary_stream(data, self.encoding)
return self.encrypt_file(data, recipients, **kwargs)
def decrypt(self, message, **kwargs):
data = _make_binary_stream(message, self.encoding)
return self.decrypt_file(data, **kwargs)
def decrypt_file(self, file, always_trust=False, passphrase=None,
output=None):
args = ["--decrypt"]
if output: # write the output to a file with the specified name
if os.path.exists(output):
os.remove(output) # to avoid overwrite confirmation message
args.append('--output %s' % output)
if always_trust:
args.append("--always-trust")
result = Crypt(self.encoding)
self._handle_io(args, file, result, passphrase, binary=True)
logger.debug('decrypt result: %r', result.data)
return result
class Verify(object):
"Handle status messages for --verify"
def __init__(self):
self.valid = False
self.fingerprint = self.creation_date = self.timestamp = None
self.signature_id = self.key_id = None
self.username = None
def __nonzero__(self):
return self.valid
__bool__ = __nonzero__
def handle_status(self, key, value):
if key in ("TRUST_UNDEFINED", "TRUST_NEVER", "TRUST_MARGINAL",
"TRUST_FULLY", "TRUST_ULTIMATE", "RSA_OR_IDEA"):
pass
elif key in ("PLAINTEXT", "PLAINTEXT_LENGTH"):
pass
elif key == "BADSIG":
self.valid = False
self.key_id, self.username = value.split(None, 1)
elif key == "GOODSIG":
self.valid = True
self.key_id, self.username = value.split(None, 1)
elif key == "VALIDSIG":
(self.fingerprint,
self.creation_date,
self.sig_timestamp,
self.expire_timestamp) = value.split()[:4]
elif key == "SIG_ID":
(self.signature_id,
self.creation_date, self.timestamp) = value.split()
else:
raise ValueError("Unknown status message: %r" % key)
class ImportResult(object):
"Handle status messages for --import"
counts = '''count no_user_id imported imported_rsa unchanged
n_uids n_subk n_sigs n_revoc sec_read sec_imported
sec_dups not_imported'''.split()
def __init__(self):
self.imported = []
self.results = []
self.fingerprints = []
for result in self.counts:
setattr(self, result, None)
def __nonzero__(self):
if self.not_imported: return False
if not self.fingerprints: return False
return True
__bool__ = __nonzero__
ok_reason = {
'0': 'Not actually changed',
'1': 'Entirely new key',
'2': 'New user IDs',
'4': 'New signatures',
'8': 'New subkeys',
'16': 'Contains private key',
}
problem_reason = {
'0': 'No specific reason given',
'1': 'Invalid Certificate',
'2': 'Issuer Certificate missing',
'3': 'Certificate Chain too long',
'4': 'Error storing certificate',
}
def handle_status(self, key, value):
if key == "IMPORTED":
# this duplicates info we already see in import_ok & import_problem
pass
elif key == "NODATA":
self.results.append({'fingerprint': None,
'problem': '0', 'text': 'No valid data found'})
elif key == "IMPORT_OK":
reason, fingerprint = value.split()
reasons = []
for code, text in list(self.ok_reason.items()):
if int(reason) | int(code) == int(reason):
reasons.append(text)
reasontext = '\n'.join(reasons) + "\n"
self.results.append({'fingerprint': fingerprint,
'ok': reason, 'text': reasontext})
self.fingerprints.append(fingerprint)
elif key == "IMPORT_PROBLEM":
try:
reason, fingerprint = value.split()
except:
reason = value
fingerprint = '<unknown>'
self.results.append({'fingerprint': fingerprint,
'problem': reason, 'text': self.problem_reason[reason]})
elif key == "IMPORT_RES":
import_res = value.split()
for i in range(len(self.counts)):
setattr(self, self.counts[i], int(import_res[i]))
else:
raise ValueError("Unknown status message: %r" % key)
def summary(self):
l = []
l.append('%d imported'%self.imported)
if self.not_imported:
l.append('%d not imported'%self.not_imported)
return ', '.join(l)
class ListKeys(list):
''' Handle status messages for --list-keys.
Handle pub and uid (relating the latter to the former).
Don't care about (info from src/DETAILS):
crt = X.509 certificate
crs = X.509 certificate and private key available
sub = subkey (secondary key)
ssb = secret subkey (secondary key)
uat = user attribute (same as user id except for field 10).
sig = signature
rev = revocation signature
pkd = public key data (special field format, see below)
grp = reserved for gpgsm
rvk = revocation key
'''
def __init__(self):
self.curkey = None
self.fingerprints = []
def key(self, args):
vars = ("""
type trust length algo keyid date expires dummy ownertrust uid
""").split()
self.curkey = {}
for i in range(len(vars)):
self.curkey[vars[i]] = args[i]
self.curkey['uids'] = [self.curkey['uid']]
del self.curkey['uid']
self.append(self.curkey)
pub = sec = key
def fpr(self, args):
self.curkey['fingerprint'] = args[9]
self.fingerprints.append(args[9])
def uid(self, args):
self.curkey['uids'].append(args[9])
def handle_status(self, key, value):
pass
class Crypt(Verify):
"Handle status messages for --encrypt and --decrypt"
def __init__(self, encoding):
Verify.__init__(self)
self.data = ''
self.ok = False
self.status = ''
self.encoding = encoding
def __nonzero__(self):
if self.ok: return True
return False
__bool__ = __nonzero__
def __str__(self):
return self.data.decode(self.encoding)
def handle_status(self, key, value):
if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION",
"BEGIN_SIGNING", "NO_SECKEY"):
pass
elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE",
"DECRYPTION_FAILED"):
self.status = key.replace("_", " ").lower()
elif key == "NEED_PASSPHRASE_SYM":
self.status = 'need symmetric passphrase'
elif key == "BEGIN_DECRYPTION":
self.status = 'decryption incomplete'
elif key == "BEGIN_ENCRYPTION":
self.status = 'encryption incomplete'
elif key == "DECRYPTION_OKAY":
self.status = 'decryption ok'
self.ok = True
elif key == "END_ENCRYPTION":
self.status = 'encryption ok'
self.ok = True
elif key == "INV_RECP":
self.status = 'invalid recipient'
elif key == "KEYEXPIRED":
self.status = 'key expired'
elif key == "SIG_CREATED":
self.status = 'sig created'
elif key == "SIGEXPIRED":
self.status = 'sig expired'
else:
Verify.handle_status(self, key, value)
class GenKey(object):
"Handle status messages for --gen-key"
def __init__(self):
self.type = None
self.fingerprint = None
def __nonzero__(self):
if self.fingerprint: return True
return False
__bool__ = __nonzero__
def __str__(self):
return self.fingerprint or ''
def handle_status(self, key, value):
if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA"):
pass
elif key == "KEY_CREATED":
(self.type,self.fingerprint) = value.split()
else:
raise ValueError("Unknown status message: %r" % key)
class DeleteResult(object):
"Handle status messages for --delete-key and --delete-secret-key"
def __init__(self):
self.status = 'ok'
def __str__(self):
return self.status
problem_reason = {
'1': 'No such key',
'2': 'Must delete secret key first',
'3': 'Ambigious specification',
}
def handle_status(self, key, value):
if key == "DELETE_PROBLEM":
self.status = self.problem_reason.get(value,
"Unknown error: %r" % value)
else:
raise ValueError("Unknown status message: %r" % key)
class Sign(object):
"Handle status messages for --sign"
def __init__(self, encoding):
self.type = None
self.fingerprint = None
self.encoding = encoding
def __nonzero__(self):
return self.fingerprint is not None
__bool__ = __nonzero__
def __str__(self):
return self.data.decode(self.encoding)
def handle_status(self, key, value):
if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
"GOOD_PASSPHRASE", "BEGIN_SIGNING"):
pass
elif key == "SIG_CREATED":
(self.type,
algo, hashalgo, cls,
self.timestamp, self.fingerprint
) = value.split()
else:
raise ValueError("Unknown status message: %r" % key)

View File

@ -60,8 +60,8 @@ class FeaturesWindow:
_('Feature not available under Windows.')),
_('OpenGPG message encryption'): (self.gpg_available,
_('Encrypting chat messages with gpg keys.'),
_('Requires gpg and python-GnuPGInterface.'),
_('Feature not available under Windows.')),
_('Requires gpg and python-gnupg (http://code.google.com/p/python-gnupg/).'),
_('Requires gpg.exe in PATH.')),
_('Network-manager'): (self.network_manager_available,
_('Autodetection of network status.'),
_('Requires gnome-network-manager and python-dbus.'),
@ -181,8 +181,6 @@ class FeaturesWindow:
return dbus_support.supported
def gpg_available(self):
if os.name == 'nt':
return False
return gajim.HAVE_GPG
def network_manager_available(self):