update gnupg to 0.3.4

This commit is contained in:
Yann Leboulanger 2013-06-20 20:47:42 +02:00
parent 44f9c18b95
commit 9e7ea9b328
1 changed files with 153 additions and 56 deletions

View File

@ -27,15 +27,15 @@ Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork()
and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
the previous versions.
Modifications Copyright (C) 2008-2012 Vinay Sajip. All rights reserved.
Modifications Copyright (C) 2008-2013 Vinay Sajip. All rights reserved.
A unittest harness (test_gnupg.py) has also been added.
"""
import locale
__version__ = "0.3.0"
__version__ = "0.3.4"
__author__ = "Vinay Sajip"
__date__ = "$12-May-2012 10:49:10$"
__date__ = "$05-Jun-2013 09:48:54$"
try:
from io import StringIO
@ -46,6 +46,7 @@ import codecs
import locale
import logging
import os
import re
import socket
from subprocess import Popen
from subprocess import PIPE
@ -61,8 +62,12 @@ except ImportError:
try:
unicode
_py3k = False
string_types = basestring
text_type = unicode
except NameError:
_py3k = True
string_types = str
text_type = str
logger = logging.getLogger('gajim.c.gnupg')
if not logger.handlers:
@ -77,7 +82,7 @@ def _copy_data(instream, outstream):
enc = 'ascii'
while True:
data = instream.read(1024)
if len(data) == 0:
if not data:
break
sent += len(data)
logger.debug("sending chunk (%d): %r", sent, data[:256])
@ -110,16 +115,16 @@ def _write_passphrase(stream, passphrase, encoding):
logger.debug("Wrote passphrase: %r", passphrase)
def _is_sequence(instance):
return isinstance(instance,list) or isinstance(instance,tuple)
return isinstance(instance, (list, tuple, set, frozenset))
def _make_binary_stream(s, encoding):
try:
if _py3k:
if isinstance(s, str):
s = s.encode(encoding)
else:
if type(s) is not str:
s = s.encode(encoding)
try:
from io import BytesIO
rv = BytesIO(s)
except ImportError:
@ -129,12 +134,33 @@ def _make_binary_stream(s, encoding):
class Verify(object):
"Handle status messages for --verify"
TRUST_UNDEFINED = 0
TRUST_NEVER = 1
TRUST_MARGINAL = 2
TRUST_FULLY = 3
TRUST_ULTIMATE = 4
TRUST_LEVELS = {
"TRUST_UNDEFINED" : TRUST_UNDEFINED,
"TRUST_NEVER" : TRUST_NEVER,
"TRUST_MARGINAL" : TRUST_MARGINAL,
"TRUST_FULLY" : TRUST_FULLY,
"TRUST_ULTIMATE" : TRUST_ULTIMATE,
}
def __init__(self, gpg):
self.gpg = gpg
self.valid = False
self.fingerprint = self.creation_date = self.timestamp = None
self.signature_id = self.key_id = None
self.username = None
self.key_status = None
self.status = None
self.pubkey_fingerprint = None
self.expire_timestamp = None
self.sig_timestamp = None
self.trust_text = None
self.trust_level = None
def __nonzero__(self):
return self.valid
@ -142,15 +168,30 @@ class Verify(object):
__bool__ = __nonzero__
def handle_status(self, key, value):
if key in ("TRUST_UNDEFINED", "TRUST_NEVER", "TRUST_MARGINAL",
"TRUST_FULLY", "TRUST_ULTIMATE", "RSA_OR_IDEA", "NODATA",
"IMPORT_RES", "PLAINTEXT", "PLAINTEXT_LENGTH",
"POLICY_URL", "DECRYPTION_INFO", "DECRYPTION_OKAY"):
if key in self.TRUST_LEVELS:
self.trust_text = key
self.trust_level = self.TRUST_LEVELS[key]
elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT",
"PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO",
"DECRYPTION_OKAY", "INV_SGNR", "FILE_START", "FILE_ERROR",
"FILE_DONE", "PKA_TRUST_GOOD", "PKA_TRUST_BAD", "BADMDC",
"GOODMDC", "NO_SGNR"):
pass
elif key == "BADSIG":
self.valid = False
self.status = 'signature bad'
self.key_id, self.username = value.split(None, 1)
elif key == "ERRSIG":
self.valid = False
(self.key_id,
algo, hash_algo,
cls,
self.timestamp) = value.split()[:5]
self.status = 'signature error'
elif key == "EXPSIG":
self.valid = False
self.status = 'signature expired'
self.key_id, self.username = value.split(None, 1)
elif key == "GOODSIG":
self.valid = True
self.status = 'signature good'
@ -166,13 +207,6 @@ class Verify(object):
elif key == "SIG_ID":
(self.signature_id,
self.creation_date, self.timestamp) = value.split()
elif key == "ERRSIG":
self.valid = False
(self.key_id,
algo, hash_algo,
cls,
self.timestamp) = value.split()[:5]
self.status = 'signature error'
elif key == "DECRYPTION_FAILED":
self.valid = False
self.key_id = value
@ -181,17 +215,21 @@ class Verify(object):
self.valid = False
self.key_id = value
self.status = 'no public key'
elif key in ("KEYEXPIRED", "SIGEXPIRED"):
elif key in ("KEYEXPIRED", "SIGEXPIRED", "KEYREVOKED"):
# these are useless in verify, since they are spit out for any
# pub/subkeys on the key, not just the one doing the signing.
# if we want to check for signatures with expired key,
# the relevant flag is EXPKEYSIG.
# the relevant flag is EXPKEYSIG or REVKEYSIG.
pass
elif key in ("EXPKEYSIG", "REVKEYSIG"):
# signed with expired or revoked key
self.valid = False
self.key_id = value.split()[0]
self.status = (('%s %s') % (key[:3], key[3:])).lower()
if key == "EXPKEYSIG":
self.key_status = 'signing key has expired'
else:
self.key_status = 'signing key was revoked'
self.status = self.key_status
else:
raise ValueError("Unknown status message: %r" % key)
@ -278,6 +316,16 @@ class ImportResult(object):
l.append('%d not imported' % self.not_imported)
return ', '.join(l)
ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I)
BASIC_ESCAPES = {
r'\n': '\n',
r'\r': '\r',
r'\f': '\f',
r'\v': '\v',
r'\b': '\b',
r'\0': '\0',
}
class ListKeys(list):
''' Handle status messages for --list-keys.
@ -322,8 +370,12 @@ class ListKeys(list):
self.fingerprints.append(args[9])
def uid(self, args):
self.curkey['uids'].append(args[9])
self.uids.append(args[9])
uid = args[9]
uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
for k, v in BASIC_ESCAPES.items():
uid = uid.replace(k, v)
self.curkey['uids'].append(uid)
self.uids.append(uid)
def sub(self, args):
subkey = [args[4], args[11]]
@ -352,7 +404,7 @@ class Crypt(Verify):
def handle_status(self, key, value):
if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION",
"BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA",
"CARDCTRL"):
"CARDCTRL", "BADMDC", "SC_OP_FAILURE", "SC_OP_SUCCESS"):
# in the case of ERROR, this is because a more specific error
# message will have come first
pass
@ -400,7 +452,7 @@ class GenKey(object):
return self.fingerprint or ''
def handle_status(self, key, value):
if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA"):
if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA", "KEY_NOT_CREATED"):
pass
elif key == "KEY_CREATED":
(self.type,self.fingerprint) = value.split()
@ -419,7 +471,7 @@ class DeleteResult(object):
problem_reason = {
'1': 'No such key',
'2': 'Must delete secret key first',
'3': 'Ambigious specification',
'3': 'Ambiguous specification',
}
def handle_status(self, key, value):
@ -429,13 +481,19 @@ class DeleteResult(object):
else:
raise ValueError("Unknown status message: %r" % key)
def __nonzero__(self):
return self.status == 'ok'
__bool__ = __nonzero__
class Sign(object):
"Handle status messages for --sign"
def __init__(self, gpg):
self.gpg = gpg
self.type = None
self.hash_algo = None
self.fingerprint = None
self.status = ''
def __nonzero__(self):
return self.fingerprint is not None
@ -447,18 +505,19 @@ class Sign(object):
def handle_status(self, key, value):
if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
"GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "MISSING_PASSPHRASE"):
"GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR",
"KEYEXPIRED", "SIGEXPIRED", "KEYREVOKED", "NO_SGNR",
"MISSING_PASSPHRASE", "SC_OP_FAILURE", "SC_OP_SUCCESS"):
pass
elif key in ("KEYEXPIRED", "SIGEXPIRED"):
self.status = 'key expired'
elif key == "SIG_CREATED":
(self.type,
algo, hashalgo, cls,
algo, self.hash_algo, cls,
self.timestamp, self.fingerprint
) = value.split()
else:
raise ValueError("Unknown status message: %r" % key)
VERSION_RE = re.compile(r'gpg \(GnuPG\) (\d+(\.\d+)*)'.encode('utf-8'), re.I)
class GPG(object):
@ -476,21 +535,39 @@ class GPG(object):
"Encapsulate access to the gpg executable"
def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False,
use_agent=False, keyring=None):
use_agent=False, keyring=None, options=None,
secret_keyring=None):
"""Initialize a GPG process wrapper. Options are:
gpgbinary -- full pathname for GPG binary.
gnupghome -- full pathname to where we can find the public and
private keyrings. Default is whatever gpg defaults to.
keyring -- name of alternative keyring file to use. If specified,
the default keyring is not used.
keyring -- name of alternative keyring file to use, or list of such
keyrings. If specified, the default keyring is not used.
options =-- a list of additional options to pass to the GPG binary.
secret_keyring -- name of alternative secret keyring file to use, or
list of such keyrings.
"""
self.gpgbinary = gpgbinary
self.gnupghome = gnupghome
if keyring:
# Allow passing a string or another iterable. Make it uniformly
# a list of keyring filenames
if isinstance(keyring, string_types):
keyring = [keyring]
self.keyring = keyring
if secret_keyring:
# Allow passing a string or another iterable. Make it uniformly
# a list of keyring filenames
if isinstance(secret_keyring, string_types):
secret_keyring = [secret_keyring]
self.secret_keyring = secret_keyring
self.verbose = verbose
self.use_agent = use_agent
if isinstance(options, str):
options = [options]
self.options = options
self.encoding = locale.getpreferredencoding()
if self.encoding is None: # This happens on Jython!
self.encoding = sys.stdin.encoding
@ -502,21 +579,42 @@ class GPG(object):
if p.returncode != 0:
raise ValueError("Error invoking gpg: %s: %s" % (p.returncode,
result.stderr))
m = VERSION_RE.match(result.data)
if not m:
self.version = None
else:
dot = '.'.encode('utf-8')
self.version = tuple([int(s) for s in m.groups()[0].split(dot)])
def _open_subprocess(self, args, passphrase=False):
# Internal method: open a pipe to a GPG subprocess and return
# the file objects for communicating with it.
def make_args(self, args, passphrase):
"""
Make a list of command line elements for GPG. The value of ``args``
will be appended. The ``passphrase`` argument needs to be True if
a passphrase will be sent to GPG, else False.
"""
cmd = [self.gpgbinary, '--status-fd 2 --no-tty']
if self.gnupghome:
cmd.append('--homedir "%s"' % self.gnupghome)
if self.keyring:
cmd.append('--no-default-keyring --keyring "%s" ' % self.keyring)
cmd.append('--no-default-keyring')
for fn in self.keyring:
cmd.append('--keyring "%s"' % fn)
if self.secret_keyring:
for fn in self.secret_keyring:
cmd.append('--secret-keyring "%s"' % fn)
if passphrase:
cmd.append('--batch --passphrase-fd 0')
if self.use_agent:
cmd.append('--use-agent')
if self.options:
cmd.extend(self.options)
cmd.extend(args)
cmd = ' '.join(cmd)
return cmd
def _open_subprocess(self, args, passphrase=False):
# Internal method: open a pipe to a GPG subprocess and return
# the file objects for communicating with it.
cmd = ' '.join(self.make_args(args, passphrase))
if self.verbose:
print(cmd)
logger.debug("%s", cmd)
@ -597,7 +695,7 @@ class GPG(object):
stderr.close()
stdout.close()
def _handle_io(self, args, file, result, passphrase=None, binary=False):
def _handle_io(self, args, fileobj, result, passphrase=None, binary=False):
"Handle a call to GPG - pass input data, collect output data"
# Handle a basic data call - pass data to GPG, handle the output
# including status information. Garbage In, Garbage Out :)
@ -608,7 +706,7 @@ class GPG(object):
stdin = p.stdin
if passphrase:
_write_passphrase(stdin, passphrase, self.encoding)
writer = _threaded_copy_data(file, stdin)
writer = _threaded_copy_data(fileobj, stdin)
self._collect_output(p, result, writer, stdin)
return result
@ -758,10 +856,14 @@ class GPG(object):
def recv_keys(self, keyserver, *keyids):
"""Import a key from a keyserver
The doctest assertion is skipped in Jython because security permissions
may prevent the recv_keys from succeeding.
>>> import shutil
>>> shutil.rmtree("keys")
>>> gpg = GPG(gnupghome="keys")
>>> result = gpg.recv_keys('pgp.mit.edu', '3FF0DB166A7476EA')
>>> os.chmod('keys', 0x1C0)
>>> result = gpg.recv_keys('keyserver.ubuntu.com', '92905378')
>>> assert result
"""
@ -878,6 +980,7 @@ class GPG(object):
parms = {}
for key, val in list(kwargs.items()):
key = key.replace('_','-').title()
if str(val).strip(): # skip empty strings
parms[key] = val
parms.setdefault('Key-Type','RSA')
parms.setdefault('Key-Length',1024)
@ -971,9 +1074,6 @@ class GPG(object):
'hello'
>>> result = gpg.encrypt("hello again",print1)
>>> message = str(result)
>>> result = gpg.decrypt(message)
>>> result.status == 'need passphrase'
True
>>> result = gpg.decrypt(message,passphrase='bar')
>>> result.status in ('decryption failed', 'bad passphrase')
True
@ -983,9 +1083,6 @@ class GPG(object):
True
>>> str(result)
'hello again'
>>> result = gpg.encrypt("signed hello",print2,sign=print1)
>>> result.status == 'need passphrase'
True
>>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo')
>>> result.status == 'encryption ok'
True