update to latest gnupg.py

This commit is contained in:
Yann Leboulanger 2015-01-24 16:09:44 +01:00
parent c2fe1c3bdf
commit 651d52560d
1 changed files with 324 additions and 107 deletions

View File

@ -31,11 +31,10 @@ Modifications Copyright (C) 2008-2014 Vinay Sajip. All rights reserved.
A unittest harness (test_gnupg.py) has also been added.
"""
import locale
__version__ = "0.3.4"
__version__ = "0.3.8.dev0"
__author__ = "Vinay Sajip"
__date__ = "$05-Jun-2014 09:48:54$"
__date__ = "$07-Dec-2014 18:46:17$"
try:
from io import StringIO
@ -53,6 +52,13 @@ from subprocess import PIPE
import sys
import threading
STARTUPINFO = None
if os.name == 'nt':
try:
from subprocess import STARTUPINFO, STARTF_USESHOWWINDOW, SW_HIDE
except ImportError:
STARTUPINFO = None
try:
import logging.NullHandler as NullHandler
except ImportError:
@ -73,6 +79,50 @@ logger = logging.getLogger('gajim.c.gnupg')
if not logger.handlers:
logger.addHandler(NullHandler())
# We use the test below because it works for Jython as well as CPython
if os.path.__name__ == 'ntpath':
# On Windows, we don't need shell quoting, other than worrying about
# paths with spaces in them.
def shell_quote(s):
return '"%s"' % s
else:
# Section copied from sarge
# This regex determines which shell input needs quoting
# because it may be unsafe
UNSAFE = re.compile(r'[^\w%+,./:=@-]')
def shell_quote(s):
"""
Quote text so that it is safe for Posix command shells.
For example, "*.py" would be converted to "'*.py'". If the text is
considered safe it is returned unquoted.
:param s: The value to quote
:type s: str (or unicode on 2.x)
:return: A safe version of the input, from the point of view of Posix
command shells
:rtype: The passed-in type
"""
if not isinstance(s, string_types):
raise TypeError('Expected string type, got %s' % type(s))
if not s:
result = "''"
elif not UNSAFE.search(s):
result = s
else:
result = "'%s'" % s.replace("'", r"'\''")
return result
# end of sarge code
# Now that we use shell=False, we shouldn't need to quote arguments.
# Use no_quote instead of shell_quote to remind us of where quoting
# was needed.
def no_quote(s):
return s
def _copy_data(instream, outstream):
# Copy one stream to another
sent = 0
@ -112,11 +162,19 @@ def _write_passphrase(stream, passphrase, encoding):
passphrase = '%s\n' % passphrase
passphrase = passphrase.encode(encoding)
stream.write(passphrase)
logger.debug("Wrote passphrase: %r", passphrase)
logger.debug('Wrote passphrase')
def _is_sequence(instance):
return isinstance(instance, (list, tuple, set, frozenset))
def _make_memory_stream(s):
try:
from io import BytesIO
rv = BytesIO(s)
except ImportError:
rv = StringIO(s)
return rv
def _make_binary_stream(s, encoding):
if _py3k:
if isinstance(s, str):
@ -124,12 +182,7 @@ def _make_binary_stream(s, encoding):
else:
if type(s) is not str:
s = s.encode(encoding)
try:
from io import BytesIO
rv = BytesIO(s)
except ImportError:
rv = StringIO(s)
return rv
return _make_memory_stream(s)
class Verify(object):
"Handle status messages for --verify"
@ -175,7 +228,8 @@ class Verify(object):
"PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO",
"DECRYPTION_OKAY", "INV_SGNR", "FILE_START", "FILE_ERROR",
"FILE_DONE", "PKA_TRUST_GOOD", "PKA_TRUST_BAD", "BADMDC",
"GOODMDC", "NO_SGNR", "NOTATION_NAME", "NOTATION_DATA"):
"GOODMDC", "NO_SGNR", "NOTATION_NAME", "NOTATION_DATA",
"PROGRESS"):
pass
elif key == "BADSIG":
self.valid = False
@ -230,6 +284,10 @@ class Verify(object):
else:
self.key_status = 'signing key was revoked'
self.status = self.key_status
elif key == "UNEXPECTED":
self.valid = False
self.key_id = value
self.status = 'unexpected data'
else:
raise ValueError("Unknown status message: %r" % key)
@ -298,8 +356,8 @@ class ImportResult(object):
'problem': reason, 'text': self.problem_reason[reason]})
elif key == "IMPORT_RES":
import_res = value.split()
for i in range(len(self.counts)):
setattr(self, self.counts[i], int(import_res[i]))
for i, count in enumerate(self.counts):
setattr(self, count, int(import_res[i]))
elif key == "KEYEXPIRED":
self.results.append({'fingerprint': None,
'problem': '0', 'text': 'Key expired'})
@ -326,7 +384,53 @@ BASIC_ESCAPES = {
r'\0': '\0',
}
class ListKeys(list):
class SendResult(object):
def __init__(self, gpg):
self.gpg = gpg
def handle_status(self, key, value):
logger.debug('SendResult: %s: %s', key, value)
class SearchKeys(list):
''' Handle status messages for --search-keys.
Handle pub and uid (relating the latter to the former).
Don't care about the rest
'''
UID_INDEX = 1
FIELDS = 'type keyid algo length date expires'.split()
def __init__(self, gpg):
self.gpg = gpg
self.curkey = None
self.fingerprints = []
self.uids = []
def get_fields(self, args):
result = {}
for i, var in enumerate(self.FIELDS):
result[var] = args[i]
result['uids'] = []
return result
def pub(self, args):
self.curkey = curkey = self.get_fields(args)
self.append(curkey)
def uid(self, args):
uid = args[self.UID_INDEX]
uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
for k, v in BASIC_ESCAPES.items():
uid = uid.replace(k, v)
self.curkey['uids'].append(uid)
self.uids.append(uid)
def handle_status(self, key, value):
pass
class ListKeys(SearchKeys):
''' Handle status messages for --list-keys.
Handle pub and uid (relating the latter to the former).
@ -343,25 +447,17 @@ class ListKeys(list):
grp = reserved for gpgsm
rvk = revocation key
'''
def __init__(self, gpg):
self.gpg = gpg
self.curkey = None
self.fingerprints = []
self.uids = []
UID_INDEX = 9
FIELDS = 'type trust length algo keyid date expires dummy ownertrust uid'.split()
def key(self, args):
vars = ("""
type trust length algo keyid date expires dummy ownertrust uid
""").split()
self.curkey = {}
for i in range(len(vars)):
self.curkey[vars[i]] = args[i]
self.curkey['uids'] = []
if self.curkey['uid']:
self.curkey['uids'].append(self.curkey['uid'])
del self.curkey['uid']
self.curkey['subkeys'] = []
self.append(self.curkey)
self.curkey = curkey = self.get_fields(args)
if curkey['uid']:
curkey['uids'].append(curkey['uid'])
del curkey['uid']
curkey['subkeys'] = []
self.append(curkey)
pub = sec = key
@ -369,22 +465,34 @@ class ListKeys(list):
self.curkey['fingerprint'] = args[9]
self.fingerprints.append(args[9])
def uid(self, args):
uid = args[9]
uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
for k, v in BASIC_ESCAPES.items():
uid = uid.replace(k, v)
self.curkey['uids'].append(uid)
self.uids.append(uid)
def sub(self, args):
subkey = [args[4], args[11]]
self.curkey['subkeys'].append(subkey)
def handle_status(self, key, value):
pass
class Crypt(Verify):
class ScanKeys(ListKeys):
''' Handle status messages for --with-fingerprint.'''
def sub(self, args):
# --with-fingerprint --with-colons somehow outputs fewer colons,
# use the last value args[-1] instead of args[11]
subkey = [args[4], args[-1]]
self.curkey['subkeys'].append(subkey)
class TextHandler(object):
def _as_text(self):
return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
if _py3k:
__str__ = _as_text
else:
__unicode__ = _as_text
def __str__(self):
return self.data
class Crypt(Verify, TextHandler):
"Handle status messages for --encrypt and --decrypt"
def __init__(self, gpg):
Verify.__init__(self, gpg)
@ -398,19 +506,16 @@ class Crypt(Verify):
__bool__ = __nonzero__
def __str__(self):
return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
def handle_status(self, key, value):
if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION",
"BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA",
"BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", "PROGRESS",
"CARDCTRL", "BADMDC", "SC_OP_FAILURE", "SC_OP_SUCCESS"):
# in the case of ERROR, this is because a more specific error
# message will have come first
pass
elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE",
"MISSING_PASSPHRASE", "DECRYPTION_FAILED",
"KEY_NOT_CREATED"):
"KEY_NOT_CREATED", "NEED_PASSPHRASE_PIN"):
self.status = key.replace("_", " ").lower()
elif key == "NEED_PASSPHRASE_SYM":
self.status = 'need symmetric passphrase'
@ -487,31 +592,29 @@ class DeleteResult(object):
__bool__ = __nonzero__
class Sign(object):
class Sign(TextHandler):
"Handle status messages for --sign"
def __init__(self, gpg):
self.gpg = gpg
self.type = None
self.hash_algo = None
self.fingerprint = None
self.status = ''
def __nonzero__(self):
return self.fingerprint is not None
__bool__ = __nonzero__
def __str__(self):
return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
def handle_status(self, key, value):
if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
"GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR",
"KEYREVOKED", "NO_SGNR", "MISSING_PASSPHRASE",
"SC_OP_FAILURE", "SC_OP_SUCCESS"):
"NO_SGNR", "MISSING_PASSPHRASE", "NEED_PASSPHRASE_PIN",
"SC_OP_FAILURE", "SC_OP_SUCCESS", "PROGRESS"):
pass
elif key in ("KEYEXPIRED", "SIGEXPIRED"):
self.status = 'key expired'
elif key == "KEYREVOKED":
self.status = 'key revoked'
elif key == "SIG_CREATED":
(self.type,
algo, self.hash_algo, cls,
@ -520,7 +623,8 @@ class Sign(object):
else:
raise ValueError("Unknown status message: %r" % key)
VERSION_RE = re.compile(r'gpg \(GnuPG\) (\d+(\.\d+)*)'.encode('utf-8'), re.I)
VERSION_RE = re.compile(r'gpg \(GnuPG\) (\d+(\.\d+)*)'.encode('ascii'), re.I)
HEX_DIGITS_RE = re.compile(r'[0-9a-f]+$', re.I)
class GPG(object):
@ -531,7 +635,10 @@ class GPG(object):
'delete': DeleteResult,
'generate': GenKey,
'import': ImportResult,
'send': SendResult,
'list': ListKeys,
'scan': ScanKeys,
'search': SearchKeys,
'sign': Sign,
'verify': Verify,
}
@ -571,9 +678,11 @@ class GPG(object):
if isinstance(options, str):
options = [options]
self.options = options
self.encoding = locale.getpreferredencoding()
if self.encoding is None: # This happens on Jython!
self.encoding = sys.stdin.encoding
# Changed in 0.3.7 to use Latin-1 encoding rather than
# locale.getpreferredencoding falling back to sys.stdin.encoding
# falling back to utf-8, because gpg itself uses latin-1 as the default
# encoding.
self.encoding = 'latin-1'
if gnupghome and not os.path.isdir(self.gnupghome):
os.makedirs(self.gnupghome,0x1C0)
p = self._open_subprocess(["--version"])
@ -586,7 +695,7 @@ class GPG(object):
if not m:
self.version = None
else:
dot = '.'.encode('utf-8')
dot = '.'.encode('ascii')
self.version = tuple([int(s) for s in m.groups()[0].split(dot)])
def make_args(self, args, passphrase):
@ -595,18 +704,18 @@ class GPG(object):
will be appended. The ``passphrase`` argument needs to be True if
a passphrase will be sent to GPG, else False.
"""
cmd = [self.gpgbinary, '--status-fd 2 --no-tty']
cmd = [self.gpgbinary, '--status-fd', '2', '--no-tty']
if self.gnupghome:
cmd.append('--homedir "%s"' % self.gnupghome)
cmd.extend(['--homedir', no_quote(self.gnupghome)])
if self.keyring:
cmd.append('--no-default-keyring')
for fn in self.keyring:
cmd.append('--keyring "%s"' % fn)
cmd.extend(['--keyring', no_quote(fn)])
if self.secret_keyring:
for fn in self.secret_keyring:
cmd.append('--secret-keyring "%s"' % fn)
cmd.extend(['--secret-keyring', no_quote(fn)])
if passphrase:
cmd.append('--batch --passphrase-fd 0')
cmd.extend(['--batch', '--passphrase-fd', '0'])
if self.use_agent:
cmd.append('--use-agent')
if self.options:
@ -617,11 +726,19 @@ class GPG(object):
def _open_subprocess(self, args, passphrase=False):
# Internal method: open a pipe to a GPG subprocess and return
# the file objects for communicating with it.
cmd = ' '.join(self.make_args(args, passphrase))
cmd = self.make_args(args, passphrase)
if self.verbose:
print(cmd)
pcmd = ' '.join(cmd)
print(pcmd)
logger.debug("%s", cmd)
return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
if not STARTUPINFO:
si = None
else:
si = STARTUPINFO()
si.dwFlags = STARTF_USESHOWWINDOW
si.wShowWindow = SW_HIDE
return Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE,
startupinfo=si)
def _read_response(self, stream, result):
# Internal method: reads all the stderr output from GPG, taking notice
@ -723,8 +840,15 @@ class GPG(object):
f.close()
return result
def set_output_without_confirmation(self, args, output):
"If writing to a file which exists, avoid a confirmation message."
if os.path.exists(output):
# We need to avoid an overwrite confirmation message
args.extend(['--batch', '--yes'])
args.extend(['--output', output])
def sign_file(self, file, keyid=None, passphrase=None, clearsign=True,
detach=False, binary=False):
detach=False, binary=False, output=None):
"""sign file"""
logger.debug("sign_file: %s", file)
if binary:
@ -738,7 +862,10 @@ class GPG(object):
elif clearsign:
args.append("--clearsign")
if keyid:
args.append('--default-key "%s"' % keyid)
args.extend(['--default-key', no_quote(keyid)])
if output: # write the output to a file with the specified name
self.set_output_without_confirmation(args, output)
result = self.result_map['sign'](self)
#We could use _handle_io here except for the fact that if the
#passphrase is bad, gpg bails and you can't write the message.
@ -790,8 +917,8 @@ class GPG(object):
logger.debug('Wrote to temp file: %r', s)
os.write(fd, s)
os.close(fd)
args.append(fn)
args.append('"%s"' % data_filename)
args.append(no_quote(fn))
args.append(no_quote(data_filename))
try:
p = self._open_subprocess(args)
self._collect_output(p, result, stdin=p.stdin)
@ -799,6 +926,15 @@ class GPG(object):
os.unlink(fn)
return result
def verify_data(self, sig_filename, data):
"Verify the signature in sig_filename against data in memory"
logger.debug('verify_data: %r, %r ...', sig_filename, data[:16])
result = self.result_map['verify'](self)
args = ['--verify', no_quote(sig_filename), '-']
stream = _make_memory_stream(data)
self._handle_io(args, stream, result, binary=True)
return result
#
# KEY MANAGEMENT
#
@ -859,9 +995,6 @@ class GPG(object):
def recv_keys(self, keyserver, *keyids):
"""Import a key from a keyserver
The doctest assertion is skipped in Jython because security permissions
may prevent the recv_keys from succeeding.
>>> import shutil
>>> shutil.rmtree("keys")
>>> gpg = GPG(gnupghome="keys")
@ -874,33 +1007,60 @@ class GPG(object):
logger.debug('recv_keys: %r', keyids)
data = _make_binary_stream("", self.encoding)
#data = ""
args = ['--keyserver', keyserver, '--recv-keys']
args.extend(keyids)
args = ['--keyserver', no_quote(keyserver), '--recv-keys']
args.extend([no_quote(k) for k in keyids])
self._handle_io(args, data, result, binary=True)
logger.debug('recv_keys result: %r', result.__dict__)
data.close()
return result
def send_keys(self, keyserver, *keyids):
"""Send a key to a keyserver.
Note: it's not practical to test this function without sending
arbitrary data to live keyservers.
"""
result = self.result_map['send'](self)
logger.debug('send_keys: %r', keyids)
data = _make_binary_stream('', self.encoding)
#data = ""
args = ['--keyserver', no_quote(keyserver), '--send-keys']
args.extend([no_quote(k) for k in keyids])
self._handle_io(args, data, result, binary=True)
logger.debug('send_keys result: %r', result.__dict__)
data.close()
return result
def delete_keys(self, fingerprints, secret=False):
which='key'
if secret:
which='secret-key'
if _is_sequence(fingerprints):
fingerprints = ' '.join(fingerprints)
args = ['--batch --delete-%s "%s"' % (which, fingerprints)]
fingerprints = [no_quote(s) for s in fingerprints]
else:
fingerprints = [no_quote(fingerprints)]
args = ['--batch', '--delete-%s' % which]
args.extend(fingerprints)
result = self.result_map['delete'](self)
p = self._open_subprocess(args)
self._collect_output(p, result, stdin=p.stdin)
return result
def export_keys(self, keyids, secret=False):
def export_keys(self, keyids, secret=False, armor=True, minimal=False):
"export the indicated keys. 'keyid' is anything gpg accepts"
which=''
if secret:
which='-secret-key'
if _is_sequence(keyids):
keyids = ' '.join(['"%s"' % k for k in keyids])
args = ["--armor --export%s %s" % (which, keyids)]
keyids = [no_quote(k) for k in keyids]
else:
keyids = [no_quote(keyids)]
args = ['--export%s' % which]
if armor:
args.append('--armor')
if minimal:
args.extend(['--export-options','export-minimal'])
args.extend(keyids)
p = self._open_subprocess(args)
# gpg --export produces no status-fd output; stdout will be
# empty in case of failure
@ -910,6 +1070,27 @@ class GPG(object):
logger.debug('export_keys result: %r', result.data)
return result.data.decode(self.encoding, self.decode_errors)
def _get_list_output(self, p, kind):
# Get the response information
result = self.result_map[kind](self)
self._collect_output(p, result, stdin=p.stdin)
lines = result.data.decode(self.encoding,
self.decode_errors).splitlines()
valid_keywords = 'pub uid sec fpr sub'.split()
for line in lines:
if self.verbose:
print(line)
logger.debug("line: %r", line.rstrip())
if not line:
break
L = line.strip().split(':')
if not L:
continue
keyword = L[0]
if keyword in valid_keywords:
getattr(result, keyword)(L)
return result
def list_keys(self, secret=False):
""" list the keys currently in the keyring
@ -930,25 +1111,58 @@ class GPG(object):
which='keys'
if secret:
which='secret-keys'
args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which,)
args = [args]
args = ["--list-%s" % which, "--fixed-list-mode", "--fingerprint",
"--with-colons"]
p = self._open_subprocess(args)
return self._get_list_output(p, 'list')
def scan_keys(self, filename):
"""
List details of an ascii armored or binary key file
without first importing it to the local keyring.
The function achieves this by running:
$ gpg --with-fingerprint --with-colons filename
"""
args = ['--with-fingerprint', '--with-colons']
args.append(no_quote(filename))
p = self._open_subprocess(args)
return self._get_list_output(p, 'scan')
def search_keys(self, query, keyserver='pgp.mit.edu'):
""" search keyserver by query (using --search-keys option)
>>> import shutil
>>> shutil.rmtree('keys')
>>> gpg = GPG(gnupghome='keys')
>>> os.chmod('keys', 0x1C0)
>>> result = gpg.search_keys('<vinay_sajip@hotmail.com>')
>>> assert result, 'Failed using default keyserver'
>>> keyserver = 'keyserver.ubuntu.com'
>>> result = gpg.search_keys('<vinay_sajip@hotmail.com>', keyserver)
>>> assert result, 'Failed using keyserver.ubuntu.com'
"""
query = query.strip()
if HEX_DIGITS_RE.match(query):
query = '0x' + query
args = ['--fixed-list-mode', '--fingerprint', '--with-colons',
'--keyserver', no_quote(keyserver), '--search-keys',
no_quote(query)]
p = self._open_subprocess(args)
# there might be some status thingumy here I should handle... (amk)
# ...nope, unless you care about expired sigs or keys (stevegt)
# Get the response information
result = self.result_map['list'](self)
result = self.result_map['search'](self)
self._collect_output(p, result, stdin=p.stdin)
lines = result.data.decode(self.encoding,
self.decode_errors).splitlines()
valid_keywords = 'pub uid sec fpr sub'.split()
valid_keywords = ['pub', 'uid']
for line in lines:
if self.verbose:
print(line)
logger.debug("line: %r", line.rstrip())
if not line:
break
logger.debug('line: %r', line.rstrip())
if not line: # sometimes get blank lines on Windows
continue
L = line.strip().split(':')
if not L:
continue
@ -969,7 +1183,7 @@ class GPG(object):
>>> assert not result
"""
args = ["--gen-key --batch"]
args = ["--gen-key", "--batch"]
result = self.result_map['generate'](self)
f = _make_binary_stream(input, self.encoding)
self._handle_io(args, f, result, binary=True)
@ -986,9 +1200,8 @@ class GPG(object):
if str(val).strip(): # skip empty strings
parms[key] = val
parms.setdefault('Key-Type','RSA')
parms.setdefault('Key-Length',1024)
parms.setdefault('Key-Length',2048)
parms.setdefault('Name-Real', "Autogenerated Key")
parms.setdefault('Name-Comment', "Generated by gnupg.py")
try:
logname = os.environ['LOGNAME']
except KeyError:
@ -1033,23 +1246,30 @@ class GPG(object):
"Encrypt the message read from the file-like object 'file'"
args = ['--encrypt']
if symmetric:
# can't be False or None - could be True or a cipher algo value
# such as AES256
args = ['--symmetric']
if symmetric is not True:
args.extend(['--cipher-algo', no_quote(symmetric)])
# else use the default, currently CAST5
else:
args = ['--encrypt']
if not recipients:
raise ValueError('No recipients specified with asymmetric '
'encryption')
if not _is_sequence(recipients):
recipients = (recipients,)
for recipient in recipients:
args.append('--recipient "%s"' % recipient)
if armor: # create ascii-armored output - set to False for binary output
args.extend(['--recipient', no_quote(recipient)])
if armor: # create ascii-armored output - False for binary output
args.append('--armor')
if output: # write the output to a file with the specified name
if os.path.exists(output):
os.remove(output) # to avoid overwrite confirmation message
args.append('--output "%s"' % output)
if sign:
args.append('--sign --default-key "%s"' % sign)
self.set_output_without_confirmation(args, output)
if sign is True:
args.append('--sign')
elif sign:
args.extend(['--sign', '--default-key', no_quote(sign)])
if always_trust:
args.append("--always-trust")
args.append('--always-trust')
result = self.result_map['crypt'](self)
self._handle_io(args, file, result, passphrase=passphrase, binary=True)
logger.debug('encrypt result: %r', result.data)
@ -1111,13 +1331,10 @@ class GPG(object):
output=None):
args = ["--decrypt"]
if output: # write the output to a file with the specified name
if os.path.exists(output):
os.remove(output) # to avoid overwrite confirmation message
args.append('--output "%s"' % output)
self.set_output_without_confirmation(args, output)
if always_trust:
args.append("--always-trust")
result = self.result_map['crypt'](self)
self._handle_io(args, file, result, passphrase, binary=True)
logger.debug('decrypt result: %r', result.data)
return result