update gnupg from upstream

This commit is contained in:
Yann Leboulanger 2011-03-26 22:29:56 +01:00
parent 362e8764d8
commit bba6eb6a27
1 changed files with 60 additions and 25 deletions

View File

@ -27,14 +27,14 @@ Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork()
and so does not work on Windows). Renamed to gnupg.py to avoid confusion with
the previous versions.
Modifications Copyright (C) 2008-2010 Vinay Sajip. All rights reserved.
Modifications Copyright (C) 2008-2011 Vinay Sajip. All rights reserved.
A unittest harness (test_gnupg.py) has also been added.
"""
import locale
__author__ = "Vinay Sajip"
__date__ = "$08-Oct-2010 23:01:07$"
__date__ = "$25-Jan-2011 11:40:48$"
try:
from io import StringIO
@ -154,7 +154,7 @@ def _make_binary_stream(s, encoding):
class GPG(object):
"Encapsulate access to the gpg executable"
def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False):
def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False, use_agent=False):
"""Initialize a GPG process wrapper. Options are:
gpgbinary -- full pathname for GPG binary.
@ -165,6 +165,7 @@ class GPG(object):
self.gpgbinary = gpgbinary
self.gnupghome = gnupghome
self.verbose = verbose
self.use_agent = use_agent
self.encoding = locale.getpreferredencoding()
if self.encoding is None: # This happens on Jython!
self.encoding = sys.stdin.encoding
@ -172,7 +173,7 @@ class GPG(object):
os.makedirs(self.gnupghome,0x1C0)
p = self._open_subprocess(["--version"])
result = Verify() # any result will do for this
self._collect_output(p, result)
self._collect_output(p, result, stdin=p.stdin)
if p.returncode != 0:
raise ValueError("Error invoking gpg: %s: %s" % (p.returncode,
result.stderr))
@ -185,7 +186,8 @@ class GPG(object):
cmd.append('--homedir "%s" ' % self.gnupghome)
if passphrase:
cmd.append('--batch --passphrase-fd 0')
if self.use_agent:
cmd.append('--use-agent')
cmd.extend(args)
cmd = ' '.join(cmd)
if self.verbose:
@ -235,11 +237,12 @@ class GPG(object):
else:
result.data = ''.join(chunks)
def _collect_output(self, process, result, writer=None):
def _collect_output(self, process, result, writer=None, stdin=None):
"""
Drain the subprocesses output streams, writing the collected output
to the result. If a writer thread (writing to the subprocess) is given,
make sure it's joined before returning.
make sure it's joined before returning. If a stdin stream is given,
close it before returning.
"""
stderr = _wrap_output(process.stderr)
rr = threading.Thread(target=self._read_response, args=(stderr, result))
@ -258,6 +261,13 @@ class GPG(object):
if writer is not None:
writer.join()
process.wait()
if stdin is not None:
try:
stdin.close()
except IOError:
pass
stderr.close()
stdout.close()
def _handle_io(self, args, file, result, passphrase=None, binary=False):
"Handle a call to GPG - pass input data, collect output data"
@ -271,7 +281,7 @@ class GPG(object):
if passphrase:
_write_passphrase(stdin, passphrase, self.encoding)
writer = _threaded_copy_data(file, stdin)
self._collect_output(p, result, writer)
self._collect_output(p, result, writer, stdin)
return result
#
@ -279,14 +289,19 @@ class GPG(object):
#
def sign(self, message, **kwargs):
"""sign message"""
file = _make_binary_stream(message, self.encoding)
return self.sign_file(file, **kwargs)
f = _make_binary_stream(message, self.encoding)
result = self.sign_file(f, **kwargs)
f.close()
return result
def sign_file(self, file, keyid=None, passphrase=None, clearsign=True,
detach=False):
detach=False, binary=False):
"""sign file"""
logger.debug("sign_file: %s", file)
args = ["-sa"]
if binary:
args = ['-s']
else:
args = ['-sa']
# You can't specify detach-sign and clearsign together: gpg ignores
# the detach-sign in that case.
if detach:
@ -308,7 +323,7 @@ class GPG(object):
except IOError:
logging.exception("error writing message")
writer = None
self._collect_output(p, result, writer)
self._collect_output(p, result, writer, stdin)
return result
def verify(self, data):
@ -326,7 +341,10 @@ class GPG(object):
>>> assert verify
"""
return self.verify_file(_make_binary_stream(data, self.encoding))
f = _make_binary_stream(data, self.encoding)
result = self.verify_file(f)
f.close()
return result
def verify_file(self, file, data_filename=None):
"Verify the signature on the contents of the file-like object 'file'"
@ -340,6 +358,7 @@ class GPG(object):
import tempfile
fd, fn = tempfile.mkstemp(prefix='pygpg')
s = file.read()
file.close()
logger.debug('Wrote to temp file: %r', s)
os.write(fd, s)
os.close(fd)
@ -347,7 +366,7 @@ class GPG(object):
args.append(data_filename)
try:
p = self._open_subprocess(args)
self._collect_output(p, result)
self._collect_output(p, result, stdin=p.stdin)
finally:
os.unlink(fn)
return result
@ -406,6 +425,7 @@ class GPG(object):
data = _make_binary_stream(key_data, self.encoding)
self._handle_io(['--import'], data, result, binary=True)
logger.debug('import_keys result: %r', result.__dict__)
data.close()
return result
def delete_keys(self, fingerprints, secret=False):
@ -417,7 +437,7 @@ class GPG(object):
args = ["--batch --delete-%s %s" % (which, fingerprints)]
result = DeleteResult()
p = self._open_subprocess(args)
self._collect_output(p, result)
self._collect_output(p, result, stdin=p.stdin)
return result
def export_keys(self, keyids, secret=False):
@ -433,7 +453,7 @@ class GPG(object):
# empty in case of failure
#stdout, stderr = p.communicate()
result = DeleteResult() # any result will do
self._collect_output(p, result)
self._collect_output(p, result, stdin=p.stdin)
logger.debug('export_keys result: %r', result.data)
return result.data.decode(self.encoding)
@ -457,7 +477,7 @@ class GPG(object):
which='keys'
if secret:
which='secret-keys'
args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which)
args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which,)
args = [args]
p = self._open_subprocess(args)
@ -466,7 +486,7 @@ class GPG(object):
# Get the response information
result = ListKeys()
self._collect_output(p, result)
self._collect_output(p, result, stdin=p.stdin)
lines = result.data.decode(self.encoding).splitlines()
valid_keywords = 'pub uid sec fpr'.split()
for line in lines:
@ -497,8 +517,9 @@ class GPG(object):
"""
args = ["--gen-key --batch"]
result = GenKey()
file = _make_file(input)
self._handle_io(args, file, result)
f = _make_file(input)
self._handle_io(args, f, result)
f.close()
return result
def gen_key_input(self, **kwargs):
@ -623,11 +644,15 @@ class GPG(object):
"""
data = _make_binary_stream(data, self.encoding)
return self.encrypt_file(data, recipients, **kwargs)
result = self.encrypt_file(data, recipients, **kwargs)
data.close()
return result
def decrypt(self, message, **kwargs):
data = _make_binary_stream(message, self.encoding)
return self.decrypt_file(data, **kwargs)
result = self.decrypt_file(data, **kwargs)
data.close()
return result
def decrypt_file(self, file, always_trust=False, passphrase=None,
output=None):
@ -755,6 +780,12 @@ class ImportResult(object):
import_res = value.split()
for i in range(len(self.counts)):
setattr(self, self.counts[i], int(import_res[i]))
elif key == "KEYEXPIRED":
self.results.append({'fingerprint': None,
'problem': '0', 'text': 'Key expired'})
elif key == "SIGEXPIRED":
self.results.append({'fingerprint': None,
'problem': '0', 'text': 'Signature expired'})
else:
raise ValueError("Unknown status message: %r" % key)
@ -786,6 +817,7 @@ class ListKeys(list):
def __init__(self):
self.curkey = None
self.fingerprints = []
self.uids = []
def key(self, args):
vars = ("""
@ -794,7 +826,9 @@ class ListKeys(list):
self.curkey = {}
for i in range(len(vars)):
self.curkey[vars[i]] = args[i]
self.curkey['uids'] = [self.curkey['uid']]
self.curkey['uids'] = []
if self.curkey['uid']:
self.curkey['uids'].append(self.curkey['uid'])
del self.curkey['uid']
self.append(self.curkey)
@ -806,6 +840,7 @@ class ListKeys(list):
def uid(self, args):
self.curkey['uids'].append(args[9])
self.uids.append(args[9])
def handle_status(self, key, value):
pass
@ -833,7 +868,7 @@ class Crypt(Verify):
"BEGIN_SIGNING", "NO_SECKEY"):
pass
elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE",
"DECRYPTION_FAILED"):
"MISSING_PASSPHRASE", "DECRYPTION_FAILED"):
self.status = key.replace("_", " ").lower()
elif key == "NEED_PASSPHRASE_SYM":
self.status = 'need symmetric passphrase'