From c28282300b8c3b71cc6cf64b58df9d2ed7114c6f Mon Sep 17 00:00:00 2001 From: Yann Leboulanger Date: Fri, 18 May 2012 16:58:51 +0200 Subject: [PATCH] update python-gnupg to 0.3.0 + add KEYEXPIRED support. Fixes #7151 --- src/common/gnupg.py | 1190 ++++++++++++++++++++++--------------------- src/common/gpg.py | 4 +- 2 files changed, 622 insertions(+), 572 deletions(-) diff --git a/src/common/gnupg.py b/src/common/gnupg.py index f5c703414..e728d366a 100644 --- a/src/common/gnupg.py +++ b/src/common/gnupg.py @@ -27,25 +27,22 @@ Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork() and so does not work on Windows). Renamed to gnupg.py to avoid confusion with the previous versions. -Modifications Copyright (C) 2008-2011 Vinay Sajip. All rights reserved. +Modifications Copyright (C) 2008-2012 Vinay Sajip. All rights reserved. A unittest harness (test_gnupg.py) has also been added. """ import locale +__version__ = "0.3.0" __author__ = "Vinay Sajip" -__date__ = "$25-Jan-2011 11:40:48$" +__date__ = "$12-May-2012 10:49:10$" try: from io import StringIO - from io import TextIOWrapper - from io import BufferedReader - from io import BufferedWriter except ImportError: from cStringIO import StringIO - class BufferedReader: pass - class BufferedWriter: pass +import codecs import locale import logging import os @@ -110,34 +107,11 @@ def _write_passphrase(stream, passphrase, encoding): passphrase = '%s\n' % passphrase passphrase = passphrase.encode(encoding) stream.write(passphrase) - logger.debug("Passphrase written") + logger.debug("Wrote passphrase: %r", passphrase) def _is_sequence(instance): return isinstance(instance,list) or isinstance(instance,tuple) -def _wrap_input(inp): - if isinstance(inp, BufferedWriter): - oldinp = inp - inp = TextIOWrapper(inp) - logger.debug('wrapped input: %r -> %r', oldinp, inp) - return inp - -def _wrap_output(outp): - if isinstance(outp, BufferedReader): - oldoutp = outp - outp = TextIOWrapper(outp) - logger.debug('wrapped output: %r -> %r', oldoutp, outp) - return outp - -#The following is needed for Python2.7 :-( -def _make_file(s): - try: - rv = StringIO(s) - except (TypeError, UnicodeError): - from io import BytesIO - rv = BytesIO(s) - return rv - def _make_binary_stream(s, encoding): try: if _py3k: @@ -152,526 +126,11 @@ def _make_binary_stream(s, encoding): rv = StringIO(s) return rv -class GPG(object): - "Encapsulate access to the gpg executable" - def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False, use_agent=False): - """Initialize a GPG process wrapper. Options are: - - gpgbinary -- full pathname for GPG binary. - - gnupghome -- full pathname to where we can find the public and - private keyrings. Default is whatever gpg defaults to. - """ - self.gpgbinary = gpgbinary - self.gnupghome = gnupghome - self.verbose = verbose - self.use_agent = use_agent - self.encoding = locale.getpreferredencoding() - if self.encoding is None: # This happens on Jython! - self.encoding = sys.stdin.encoding - if gnupghome and not os.path.isdir(self.gnupghome): - os.makedirs(self.gnupghome,0x1C0) - p = self._open_subprocess(["--version"]) - result = Verify() # any result will do for this - self._collect_output(p, result, stdin=p.stdin) - if p.returncode != 0: - raise ValueError("Error invoking gpg: %s: %s" % (p.returncode, - result.stderr)) - - def _open_subprocess(self, args, passphrase=False): - # Internal method: open a pipe to a GPG subprocess and return - # the file objects for communicating with it. - cmd = [self.gpgbinary, '--status-fd 2 --no-tty'] - if self.gnupghome: - cmd.append('--homedir "%s" ' % self.gnupghome) - if passphrase: - cmd.append('--batch --passphrase-fd 0') - if self.use_agent: - cmd.append('--use-agent') - cmd.extend(args) - cmd = ' '.join(cmd) - if self.verbose: - print(cmd) - logger.debug("%s", cmd) - return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) - - def _read_response(self, stream, result): - # Internal method: reads all the output from GPG, taking notice - # only of lines that begin with the magic [GNUPG:] prefix. - # - # Calls methods on the response object for each valid token found, - # with the arg being the remainder of the status line. - lines = [] - while True: - line = stream.readline() - lines.append(line) - if self.verbose: - print(line) - logger.debug("%s", line.rstrip()) - if line == "": break - line = line.rstrip() - if line[0:9] == '[GNUPG:] ': - # Chop off the prefix - line = line[9:] - L = line.split(None, 1) - keyword = L[0] - if len(L) > 1: - value = L[1] - else: - value = "" - result.handle_status(keyword, value) - result.stderr = ''.join(lines) - - def _read_data(self, stream, result): - # Read the contents of the file from GPG's stdout - chunks = [] - while True: - data = stream.read(1024) - if len(data) == 0: - break - logger.debug("chunk: %r" % data[:256]) - chunks.append(data) - if _py3k: - # Join using b'' or '', as appropriate - result.data = type(data)().join(chunks) - else: - result.data = ''.join(chunks) - - def _collect_output(self, process, result, writer=None, stdin=None): - """ - Drain the subprocesses output streams, writing the collected output - to the result. If a writer thread (writing to the subprocess) is given, - make sure it's joined before returning. If a stdin stream is given, - close it before returning. - """ - stderr = _wrap_output(process.stderr) - rr = threading.Thread(target=self._read_response, args=(stderr, result)) - rr.setDaemon(True) - logger.debug('stderr reader: %r', rr) - rr.start() - - stdout = process.stdout # _wrap_output(process.stdout) - dr = threading.Thread(target=self._read_data, args=(stdout, result)) - dr.setDaemon(True) - logger.debug('stdout reader: %r', dr) - dr.start() - - dr.join() - rr.join() - if writer is not None: - writer.join() - process.wait() - if stdin is not None: - try: - stdin.close() - except IOError: - pass - stderr.close() - stdout.close() - - def _handle_io(self, args, file, result, passphrase=None, binary=False): - "Handle a call to GPG - pass input data, collect output data" - # Handle a basic data call - pass data to GPG, handle the output - # including status information. Garbage In, Garbage Out :) - p = self._open_subprocess(args, passphrase is not None) - if not binary and not isinstance(file, BufferedReader): - stdin = _wrap_input(p.stdin) - else: - stdin = p.stdin - if passphrase: - _write_passphrase(stdin, passphrase, self.encoding) - writer = _threaded_copy_data(file, stdin) - self._collect_output(p, result, writer, stdin) - return result - - # - # SIGNATURE METHODS - # - def sign(self, message, **kwargs): - """sign message""" - f = _make_binary_stream(message, self.encoding) - result = self.sign_file(f, **kwargs) - f.close() - return result - - def sign_file(self, file, keyid=None, passphrase=None, clearsign=True, - detach=False, binary=False): - """sign file""" - logger.debug("sign_file: %s", file) - if binary: - args = ['-s'] - else: - args = ['-sa'] - # You can't specify detach-sign and clearsign together: gpg ignores - # the detach-sign in that case. - if detach: - args.append("--detach-sign") - elif clearsign: - args.append("--clearsign") - if keyid: - args.append("--default-key %s" % keyid) - result = Sign(self.encoding) - #We could use _handle_io here except for the fact that if the - #passphrase is bad, gpg bails and you can't write the message. - #self._handle_io(args, _make_file(message), result, passphrase=passphrase) - p = self._open_subprocess(args, passphrase is not None) - try: - stdin = p.stdin - if passphrase: - _write_passphrase(stdin, passphrase, self.encoding) - writer = _threaded_copy_data(file, stdin) - except IOError: - logging.exception("error writing message") - writer = None - self._collect_output(p, result, writer, stdin) - return result - - def verify(self, data): - """Verify the signature on the contents of the string 'data' - - >>> gpg = GPG(gnupghome="keys") - >>> input = gpg.gen_key_input(Passphrase='foo') - >>> key = gpg.gen_key(input) - >>> assert key - >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar') - >>> assert not sig - >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo') - >>> assert sig - >>> verify = gpg.verify(sig.data) - >>> assert verify - - """ - f = _make_binary_stream(data, self.encoding) - result = self.verify_file(f) - f.close() - return result - - def verify_file(self, file, data_filename=None): - "Verify the signature on the contents of the file-like object 'file'" - logger.debug('verify_file: %r, %r', file, data_filename) - result = Verify() - args = ['--verify'] - if data_filename is None: - self._handle_io(args, file, result, binary=True) - else: - logger.debug('Handling detached verification') - import tempfile - fd, fn = tempfile.mkstemp(prefix='pygpg') - s = file.read() - file.close() - logger.debug('Wrote to temp file: %r', s) - os.write(fd, s) - os.close(fd) - args.append(fn) - args.append(data_filename) - try: - p = self._open_subprocess(args) - self._collect_output(p, result, stdin=p.stdin) - finally: - os.unlink(fn) - return result - - # - # KEY MANAGEMENT - # - - def import_keys(self, key_data): - """ import the key_data into our keyring - - >>> import shutil - >>> shutil.rmtree("keys") - >>> gpg = GPG(gnupghome="keys") - >>> input = gpg.gen_key_input() - >>> result = gpg.gen_key(input) - >>> print1 = result.fingerprint - >>> result = gpg.gen_key(input) - >>> print2 = result.fingerprint - >>> pubkey1 = gpg.export_keys(print1) - >>> seckey1 = gpg.export_keys(print1,secret=True) - >>> seckeys = gpg.list_keys(secret=True) - >>> pubkeys = gpg.list_keys() - >>> assert print1 in seckeys.fingerprints - >>> assert print1 in pubkeys.fingerprints - >>> str(gpg.delete_keys(print1)) - 'Must delete secret key first' - >>> str(gpg.delete_keys(print1,secret=True)) - 'ok' - >>> str(gpg.delete_keys(print1)) - 'ok' - >>> str(gpg.delete_keys("nosuchkey")) - 'No such key' - >>> seckeys = gpg.list_keys(secret=True) - >>> pubkeys = gpg.list_keys() - >>> assert not print1 in seckeys.fingerprints - >>> assert not print1 in pubkeys.fingerprints - >>> result = gpg.import_keys('foo') - >>> assert not result - >>> result = gpg.import_keys(pubkey1) - >>> pubkeys = gpg.list_keys() - >>> seckeys = gpg.list_keys(secret=True) - >>> assert not print1 in seckeys.fingerprints - >>> assert print1 in pubkeys.fingerprints - >>> result = gpg.import_keys(seckey1) - >>> assert result - >>> seckeys = gpg.list_keys(secret=True) - >>> pubkeys = gpg.list_keys() - >>> assert print1 in seckeys.fingerprints - >>> assert print1 in pubkeys.fingerprints - >>> assert print2 in pubkeys.fingerprints - - """ - result = ImportResult() - logger.debug('import_keys: %r', key_data[:256]) - data = _make_binary_stream(key_data, self.encoding) - self._handle_io(['--import'], data, result, binary=True) - logger.debug('import_keys result: %r', result.__dict__) - data.close() - return result - - def delete_keys(self, fingerprints, secret=False): - which='key' - if secret: - which='secret-key' - if _is_sequence(fingerprints): - fingerprints = ' '.join(fingerprints) - args = ["--batch --delete-%s %s" % (which, fingerprints)] - result = DeleteResult() - p = self._open_subprocess(args) - self._collect_output(p, result, stdin=p.stdin) - return result - - def export_keys(self, keyids, secret=False): - "export the indicated keys. 'keyid' is anything gpg accepts" - which='' - if secret: - which='-secret-key' - if _is_sequence(keyids): - keyids = ' '.join(keyids) - args = ["--armor --export%s %s" % (which, keyids)] - p = self._open_subprocess(args) - # gpg --export produces no status-fd output; stdout will be - # empty in case of failure - #stdout, stderr = p.communicate() - result = DeleteResult() # any result will do - self._collect_output(p, result, stdin=p.stdin) - logger.debug('export_keys result: %r', result.data) - return result.data.decode(self.encoding, 'replace') - - def list_keys(self, secret=False): - """ list the keys currently in the keyring - - >>> import shutil - >>> shutil.rmtree("keys") - >>> gpg = GPG(gnupghome="keys") - >>> input = gpg.gen_key_input() - >>> result = gpg.gen_key(input) - >>> print1 = result.fingerprint - >>> result = gpg.gen_key(input) - >>> print2 = result.fingerprint - >>> pubkeys = gpg.list_keys() - >>> assert print1 in pubkeys.fingerprints - >>> assert print2 in pubkeys.fingerprints - - """ - - which='keys' - if secret: - which='secret-keys' - args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which,) - args = [args] - p = self._open_subprocess(args) - - # there might be some status thingumy here I should handle... (amk) - # ...nope, unless you care about expired sigs or keys (stevegt) - - # Get the response information - result = ListKeys() - self._collect_output(p, result, stdin=p.stdin) - lines = result.data.decode(self.encoding, 'replace').splitlines() - valid_keywords = 'pub uid sec fpr sub'.split() - for line in lines: - if self.verbose: - print(line) - logger.debug("line: %r", line.rstrip()) - if not line: - break - L = line.strip().split(':') - if not L: - continue - keyword = L[0] - if keyword in valid_keywords: - getattr(result, keyword)(L) - return result - - def gen_key(self, input): - """Generate a key; you might use gen_key_input() to create the - control input. - - >>> gpg = GPG(gnupghome="keys") - >>> input = gpg.gen_key_input() - >>> result = gpg.gen_key(input) - >>> assert result - >>> result = gpg.gen_key('foo') - >>> assert not result - - """ - args = ["--gen-key --batch"] - result = GenKey() - f = _make_file(input) - self._handle_io(args, f, result) - f.close() - return result - - def gen_key_input(self, **kwargs): - """ - Generate --gen-key input per gpg doc/DETAILS - """ - parms = {} - for key, val in list(kwargs.items()): - key = key.replace('_','-').title() - parms[key] = val - parms.setdefault('Key-Type','RSA') - parms.setdefault('Key-Length',1024) - parms.setdefault('Name-Real', "Autogenerated Key") - parms.setdefault('Name-Comment', "Generated by gnupg.py") - try: - logname = os.environ['LOGNAME'] - except KeyError: - logname = os.environ['USERNAME'] - hostname = socket.gethostname() - parms.setdefault('Name-Email', "%s@%s" % (logname.replace(' ', '_'), - hostname)) - out = "Key-Type: %s\n" % parms.pop('Key-Type') - for key, val in list(parms.items()): - out += "%s: %s\n" % (key, val) - out += "%commit\n" - return out - - # Key-Type: RSA - # Key-Length: 1024 - # Name-Real: ISdlink Server on %s - # Name-Comment: Created by %s - # Name-Email: isdlink@%s - # Expire-Date: 0 - # %commit - # - # - # Key-Type: DSA - # Key-Length: 1024 - # Subkey-Type: ELG-E - # Subkey-Length: 1024 - # Name-Real: Joe Tester - # Name-Comment: with stupid passphrase - # Name-Email: joe@foo.bar - # Expire-Date: 0 - # Passphrase: abc - # %pubring foo.pub - # %secring foo.sec - # %commit - - # - # ENCRYPTION - # - def encrypt_file(self, file, recipients, sign=None, - always_trust=False, passphrase=None, - armor=True, output=None): - "Encrypt the message read from the file-like object 'file'" - args = ['--encrypt'] - if armor: # create ascii-armored output - set to False for binary output - args.append('--armor') - if output: # write the output to a file with the specified name - if os.path.exists(output): - os.remove(output) # to avoid overwrite confirmation message - args.append('--output %s' % output) - if not _is_sequence(recipients): - recipients = (recipients,) - for recipient in recipients: - args.append('--recipient %s' % recipient) - if sign: - args.append("--sign --default-key %s" % sign) - if always_trust: - args.append("--always-trust") - result = Crypt(self.encoding) - self._handle_io(args, file, result, passphrase=passphrase, binary=True) - logger.debug('encrypt result: %r', result.data) - return result - - def encrypt(self, data, recipients, **kwargs): - """Encrypt the message contained in the string 'data' - - >>> import shutil - >>> if os.path.exists("keys"): - ... shutil.rmtree("keys") - >>> gpg = GPG(gnupghome="keys") - >>> input = gpg.gen_key_input(passphrase='foo') - >>> result = gpg.gen_key(input) - >>> print1 = result.fingerprint - >>> input = gpg.gen_key_input() - >>> result = gpg.gen_key(input) - >>> print2 = result.fingerprint - >>> result = gpg.encrypt("hello",print2) - >>> message = str(result) - >>> assert message != 'hello' - >>> result = gpg.decrypt(message) - >>> assert result - >>> str(result) - 'hello' - >>> result = gpg.encrypt("hello again",print1) - >>> message = str(result) - >>> result = gpg.decrypt(message) - >>> result.status - 'need passphrase' - >>> result = gpg.decrypt(message,passphrase='bar') - >>> result.status - 'decryption failed' - >>> assert not result - >>> result = gpg.decrypt(message,passphrase='foo') - >>> result.status - 'decryption ok' - >>> str(result) - 'hello again' - >>> result = gpg.encrypt("signed hello",print2,sign=print1) - >>> result.status - 'need passphrase' - >>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo') - >>> result.status - 'encryption ok' - >>> message = str(result) - >>> result = gpg.decrypt(message) - >>> result.status - 'decryption ok' - >>> assert result.fingerprint == print1 - - """ - data = _make_binary_stream(data, self.encoding) - result = self.encrypt_file(data, recipients, **kwargs) - data.close() - return result - - def decrypt(self, message, **kwargs): - data = _make_binary_stream(message, self.encoding) - result = self.decrypt_file(data, **kwargs) - data.close() - return result - - def decrypt_file(self, file, always_trust=False, passphrase=None, - output=None): - args = ["--decrypt"] - if output: # write the output to a file with the specified name - if os.path.exists(output): - os.remove(output) # to avoid overwrite confirmation message - args.append('--output %s' % output) - if always_trust: - args.append("--always-trust") - result = Crypt(self.encoding) - self._handle_io(args, file, result, passphrase, binary=True) - logger.debug('decrypt result: %r', result.data) - return result - class Verify(object): "Handle status messages for --verify" - def __init__(self): + def __init__(self, gpg): + self.gpg = gpg self.valid = False self.fingerprint = self.creation_date = self.timestamp = None self.signature_id = self.key_id = None @@ -684,24 +143,26 @@ class Verify(object): def handle_status(self, key, value): if key in ("TRUST_UNDEFINED", "TRUST_NEVER", "TRUST_MARGINAL", - "TRUST_FULLY", "TRUST_ULTIMATE", "RSA_OR_IDEA"): - pass - elif key in ("PLAINTEXT", "PLAINTEXT_LENGTH"): - pass - elif key == "IMPORT_RES": - # If auto-key-retrieve option is enabled, this can happen + "TRUST_FULLY", "TRUST_ULTIMATE", "RSA_OR_IDEA", "NODATA", + "IMPORT_RES", "PLAINTEXT", "PLAINTEXT_LENGTH", + "POLICY_URL", "DECRYPTION_INFO", "DECRYPTION_OKAY"): pass elif key == "BADSIG": self.valid = False + self.status = 'signature bad' self.key_id, self.username = value.split(None, 1) elif key == "GOODSIG": self.valid = True + self.status = 'signature good' self.key_id, self.username = value.split(None, 1) elif key == "VALIDSIG": (self.fingerprint, self.creation_date, self.sig_timestamp, self.expire_timestamp) = value.split()[:4] + # may be different if signature is made with a subkey + self.pubkey_fingerprint = value.split()[-1] + self.status = 'signature valid' elif key == "SIG_ID": (self.signature_id, self.creation_date, self.timestamp) = value.split() @@ -711,9 +172,26 @@ class Verify(object): algo, hash_algo, cls, self.timestamp) = value.split()[:5] + self.status = 'signature error' + elif key == "DECRYPTION_FAILED": + self.valid = False + self.key_id = value + self.status = 'decryption failed' elif key == "NO_PUBKEY": self.valid = False self.key_id = value + self.status = 'no public key' + elif key in ("KEYEXPIRED", "SIGEXPIRED"): + # these are useless in verify, since they are spit out for any + # pub/subkeys on the key, not just the one doing the signing. + # if we want to check for signatures with expired key, + # the relevant flag is EXPKEYSIG. + pass + elif key in ("EXPKEYSIG", "REVKEYSIG"): + # signed with expired or revoked key + self.valid = False + self.key_id = value.split()[0] + self.status = (('%s %s') % (key[:3], key[3:])).lower() else: raise ValueError("Unknown status message: %r" % key) @@ -723,7 +201,8 @@ class ImportResult(object): counts = '''count no_user_id imported imported_rsa unchanged n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups not_imported'''.split() - def __init__(self): + def __init__(self, gpg): + self.gpg = gpg self.imported = [] self.results = [] self.fingerprints = [] @@ -808,7 +287,6 @@ class ListKeys(list): crt = X.509 certificate crs = X.509 certificate and private key available - sub = subkey (secondary key) ssb = secret subkey (secondary key) uat = user attribute (same as user id except for field 10). sig = signature @@ -817,7 +295,8 @@ class ListKeys(list): grp = reserved for gpgsm rvk = revocation key ''' - def __init__(self): + def __init__(self, gpg): + self.gpg = gpg self.curkey = None self.fingerprints = [] self.uids = [] @@ -847,7 +326,7 @@ class ListKeys(list): self.uids.append(args[9]) def sub(self, args): - subkey = [args[4],args[11]] + subkey = [args[4], args[11]] self.curkey['subkeys'].append(subkey) def handle_status(self, key, value): @@ -855,12 +334,11 @@ class ListKeys(list): class Crypt(Verify): "Handle status messages for --encrypt and --decrypt" - def __init__(self, encoding): - Verify.__init__(self) + def __init__(self, gpg): + Verify.__init__(self, gpg) self.data = '' self.ok = False self.status = '' - self.encoding = encoding def __nonzero__(self): if self.ok: return True @@ -869,14 +347,18 @@ class Crypt(Verify): __bool__ = __nonzero__ def __str__(self): - return self.data.decode(self.encoding, 'replace') + return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) def handle_status(self, key, value): if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION", - "BEGIN_SIGNING", "NO_SECKEY"): + "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", + "CARDCTRL"): + # in the case of ERROR, this is because a more specific error + # message will have come first pass elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", - "MISSING_PASSPHRASE", "DECRYPTION_FAILED"): + "MISSING_PASSPHRASE", "DECRYPTION_FAILED", + "KEY_NOT_CREATED"): self.status = key.replace("_", " ").lower() elif key == "NEED_PASSPHRASE_SYM": self.status = 'need symmetric passphrase' @@ -903,7 +385,8 @@ class Crypt(Verify): class GenKey(object): "Handle status messages for --gen-key" - def __init__(self): + def __init__(self, gpg): + self.gpg = gpg self.type = None self.fingerprint = None @@ -926,7 +409,8 @@ class GenKey(object): class DeleteResult(object): "Handle status messages for --delete-key and --delete-secret-key" - def __init__(self): + def __init__(self, gpg): + self.gpg = gpg self.status = 'ok' def __str__(self): @@ -947,10 +431,11 @@ class DeleteResult(object): class Sign(object): "Handle status messages for --sign" - def __init__(self, encoding): + def __init__(self, gpg): + self.gpg = gpg self.type = None self.fingerprint = None - self.encoding = encoding + self.status = '' def __nonzero__(self): return self.fingerprint is not None @@ -958,12 +443,14 @@ class Sign(object): __bool__ = __nonzero__ def __str__(self): - return self.data.decode(self.encoding, 'replace') + return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) def handle_status(self, key, value): if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", - "GOOD_PASSPHRASE", "BEGIN_SIGNING", "MISSING_PASSPHRASE"): + "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL"): pass + elif key in ("KEYEXPIRED", "SIGEXPIRED"): + self.status = 'key expired' elif key == "SIG_CREATED": (self.type, algo, hashalgo, cls, @@ -971,3 +458,566 @@ class Sign(object): ) = value.split() else: raise ValueError("Unknown status message: %r" % key) + + +class GPG(object): + + decode_errors = 'strict' + + result_map = { + 'crypt': Crypt, + 'delete': DeleteResult, + 'generate': GenKey, + 'import': ImportResult, + 'list': ListKeys, + 'sign': Sign, + 'verify': Verify, + } + + "Encapsulate access to the gpg executable" + def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False, + use_agent=False, keyring=None): + """Initialize a GPG process wrapper. Options are: + + gpgbinary -- full pathname for GPG binary. + + gnupghome -- full pathname to where we can find the public and + private keyrings. Default is whatever gpg defaults to. + keyring -- name of alternative keyring file to use. If specified, + the default keyring is not used. + """ + self.gpgbinary = gpgbinary + self.gnupghome = gnupghome + self.keyring = keyring + self.verbose = verbose + self.use_agent = use_agent + self.encoding = locale.getpreferredencoding() + if self.encoding is None: # This happens on Jython! + self.encoding = sys.stdin.encoding + if gnupghome and not os.path.isdir(self.gnupghome): + os.makedirs(self.gnupghome,0x1C0) + p = self._open_subprocess(["--version"]) + result = self.result_map['verify'](self) # any result will do for this + self._collect_output(p, result, stdin=p.stdin) + if p.returncode != 0: + raise ValueError("Error invoking gpg: %s: %s" % (p.returncode, + result.stderr)) + + def _open_subprocess(self, args, passphrase=False): + # Internal method: open a pipe to a GPG subprocess and return + # the file objects for communicating with it. + cmd = [self.gpgbinary, '--status-fd 2 --no-tty'] + if self.gnupghome: + cmd.append('--homedir "%s" ' % self.gnupghome) + if self.keyring: + cmd.append('--no-default-keyring --keyring "%s" ' % self.keyring) + if passphrase: + cmd.append('--batch --passphrase-fd 0') + if self.use_agent: + cmd.append('--use-agent') + cmd.extend(args) + cmd = ' '.join(cmd) + if self.verbose: + print(cmd) + logger.debug("%s", cmd) + return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) + + def _read_response(self, stream, result): + # Internal method: reads all the stderr output from GPG, taking notice + # only of lines that begin with the magic [GNUPG:] prefix. + # + # Calls methods on the response object for each valid token found, + # with the arg being the remainder of the status line. + lines = [] + while True: + line = stream.readline() + if len(line) == 0: + break + lines.append(line) + line = line.rstrip() + if self.verbose: + print(line) + logger.debug("%s", line) + if line[0:9] == '[GNUPG:] ': + # Chop off the prefix + line = line[9:] + L = line.split(None, 1) + keyword = L[0] + if len(L) > 1: + value = L[1] + else: + value = "" + result.handle_status(keyword, value) + result.stderr = ''.join(lines) + + def _read_data(self, stream, result): + # Read the contents of the file from GPG's stdout + chunks = [] + while True: + data = stream.read(1024) + if len(data) == 0: + break + logger.debug("chunk: %r" % data[:256]) + chunks.append(data) + if _py3k: + # Join using b'' or '', as appropriate + result.data = type(data)().join(chunks) + else: + result.data = ''.join(chunks) + + def _collect_output(self, process, result, writer=None, stdin=None): + """ + Drain the subprocesses output streams, writing the collected output + to the result. If a writer thread (writing to the subprocess) is given, + make sure it's joined before returning. If a stdin stream is given, + close it before returning. + """ + stderr = codecs.getreader(self.encoding)(process.stderr) + rr = threading.Thread(target=self._read_response, args=(stderr, result)) + rr.setDaemon(True) + logger.debug('stderr reader: %r', rr) + rr.start() + + stdout = process.stdout + dr = threading.Thread(target=self._read_data, args=(stdout, result)) + dr.setDaemon(True) + logger.debug('stdout reader: %r', dr) + dr.start() + + dr.join() + rr.join() + if writer is not None: + writer.join() + process.wait() + if stdin is not None: + try: + stdin.close() + except IOError: + pass + stderr.close() + stdout.close() + + def _handle_io(self, args, file, result, passphrase=None, binary=False): + "Handle a call to GPG - pass input data, collect output data" + # Handle a basic data call - pass data to GPG, handle the output + # including status information. Garbage In, Garbage Out :) + p = self._open_subprocess(args, passphrase is not None) + if not binary: + stdin = codecs.getwriter(self.encoding)(p.stdin) + else: + stdin = p.stdin + if passphrase: + _write_passphrase(stdin, passphrase, self.encoding) + writer = _threaded_copy_data(file, stdin) + self._collect_output(p, result, writer, stdin) + return result + + # + # SIGNATURE METHODS + # + def sign(self, message, **kwargs): + """sign message""" + f = _make_binary_stream(message, self.encoding) + result = self.sign_file(f, **kwargs) + f.close() + return result + + def sign_file(self, file, keyid=None, passphrase=None, clearsign=True, + detach=False, binary=False): + """sign file""" + logger.debug("sign_file: %s", file) + if binary: + args = ['-s'] + else: + args = ['-sa'] + # You can't specify detach-sign and clearsign together: gpg ignores + # the detach-sign in that case. + if detach: + args.append("--detach-sign") + elif clearsign: + args.append("--clearsign") + if keyid: + args.append('--default-key "%s"' % keyid) + result = self.result_map['sign'](self) + #We could use _handle_io here except for the fact that if the + #passphrase is bad, gpg bails and you can't write the message. + p = self._open_subprocess(args, passphrase is not None) + try: + stdin = p.stdin + if passphrase: + _write_passphrase(stdin, passphrase, self.encoding) + writer = _threaded_copy_data(file, stdin) + except IOError: + logging.exception("error writing message") + writer = None + self._collect_output(p, result, writer, stdin) + return result + + def verify(self, data): + """Verify the signature on the contents of the string 'data' + + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input(Passphrase='foo') + >>> key = gpg.gen_key(input) + >>> assert key + >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar') + >>> assert not sig + >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo') + >>> assert sig + >>> verify = gpg.verify(sig.data) + >>> assert verify + + """ + f = _make_binary_stream(data, self.encoding) + result = self.verify_file(f) + f.close() + return result + + def verify_file(self, file, data_filename=None): + "Verify the signature on the contents of the file-like object 'file'" + logger.debug('verify_file: %r, %r', file, data_filename) + result = self.result_map['verify'](self) + args = ['--verify'] + if data_filename is None: + self._handle_io(args, file, result, binary=True) + else: + logger.debug('Handling detached verification') + import tempfile + fd, fn = tempfile.mkstemp(prefix='pygpg') + s = file.read() + file.close() + logger.debug('Wrote to temp file: %r', s) + os.write(fd, s) + os.close(fd) + args.append(fn) + args.append('"%s"' % data_filename) + try: + p = self._open_subprocess(args) + self._collect_output(p, result, stdin=p.stdin) + finally: + os.unlink(fn) + return result + + # + # KEY MANAGEMENT + # + + def import_keys(self, key_data): + """ import the key_data into our keyring + + >>> import shutil + >>> shutil.rmtree("keys") + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input() + >>> result = gpg.gen_key(input) + >>> print1 = result.fingerprint + >>> result = gpg.gen_key(input) + >>> print2 = result.fingerprint + >>> pubkey1 = gpg.export_keys(print1) + >>> seckey1 = gpg.export_keys(print1,secret=True) + >>> seckeys = gpg.list_keys(secret=True) + >>> pubkeys = gpg.list_keys() + >>> assert print1 in seckeys.fingerprints + >>> assert print1 in pubkeys.fingerprints + >>> str(gpg.delete_keys(print1)) + 'Must delete secret key first' + >>> str(gpg.delete_keys(print1,secret=True)) + 'ok' + >>> str(gpg.delete_keys(print1)) + 'ok' + >>> str(gpg.delete_keys("nosuchkey")) + 'No such key' + >>> seckeys = gpg.list_keys(secret=True) + >>> pubkeys = gpg.list_keys() + >>> assert not print1 in seckeys.fingerprints + >>> assert not print1 in pubkeys.fingerprints + >>> result = gpg.import_keys('foo') + >>> assert not result + >>> result = gpg.import_keys(pubkey1) + >>> pubkeys = gpg.list_keys() + >>> seckeys = gpg.list_keys(secret=True) + >>> assert not print1 in seckeys.fingerprints + >>> assert print1 in pubkeys.fingerprints + >>> result = gpg.import_keys(seckey1) + >>> assert result + >>> seckeys = gpg.list_keys(secret=True) + >>> pubkeys = gpg.list_keys() + >>> assert print1 in seckeys.fingerprints + >>> assert print1 in pubkeys.fingerprints + >>> assert print2 in pubkeys.fingerprints + + """ + result = self.result_map['import'](self) + logger.debug('import_keys: %r', key_data[:256]) + data = _make_binary_stream(key_data, self.encoding) + self._handle_io(['--import'], data, result, binary=True) + logger.debug('import_keys result: %r', result.__dict__) + data.close() + return result + + def recv_keys(self, keyserver, *keyids): + """Import a key from a keyserver + + >>> import shutil + >>> shutil.rmtree("keys") + >>> gpg = GPG(gnupghome="keys") + >>> result = gpg.recv_keys('pgp.mit.edu', '3FF0DB166A7476EA') + >>> assert result + + """ + result = self.result_map['import'](self) + logger.debug('recv_keys: %r', keyids) + data = _make_binary_stream("", self.encoding) + #data = "" + args = ['--keyserver', keyserver, '--recv-keys'] + args.extend(keyids) + self._handle_io(args, data, result, binary=True) + logger.debug('recv_keys result: %r', result.__dict__) + data.close() + return result + + def delete_keys(self, fingerprints, secret=False): + which='key' + if secret: + which='secret-key' + if _is_sequence(fingerprints): + fingerprints = ' '.join(fingerprints) + args = ['--batch --delete-%s "%s"' % (which, fingerprints)] + result = self.result_map['delete'](self) + p = self._open_subprocess(args) + self._collect_output(p, result, stdin=p.stdin) + return result + + def export_keys(self, keyids, secret=False): + "export the indicated keys. 'keyid' is anything gpg accepts" + which='' + if secret: + which='-secret-key' + if _is_sequence(keyids): + keyids = ' '.join(['"%s"' % k for k in keyids]) + args = ["--armor --export%s %s" % (which, keyids)] + p = self._open_subprocess(args) + # gpg --export produces no status-fd output; stdout will be + # empty in case of failure + #stdout, stderr = p.communicate() + result = self.result_map['delete'](self) # any result will do + self._collect_output(p, result, stdin=p.stdin) + logger.debug('export_keys result: %r', result.data) + return result.data.decode(self.encoding, self.decode_errors) + + def list_keys(self, secret=False): + """ list the keys currently in the keyring + + >>> import shutil + >>> shutil.rmtree("keys") + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input() + >>> result = gpg.gen_key(input) + >>> print1 = result.fingerprint + >>> result = gpg.gen_key(input) + >>> print2 = result.fingerprint + >>> pubkeys = gpg.list_keys() + >>> assert print1 in pubkeys.fingerprints + >>> assert print2 in pubkeys.fingerprints + + """ + + which='keys' + if secret: + which='secret-keys' + args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which,) + args = [args] + p = self._open_subprocess(args) + + # there might be some status thingumy here I should handle... (amk) + # ...nope, unless you care about expired sigs or keys (stevegt) + + # Get the response information + result = self.result_map['list'](self) + self._collect_output(p, result, stdin=p.stdin) + lines = result.data.decode(self.encoding, + self.decode_errors).splitlines() + valid_keywords = 'pub uid sec fpr sub'.split() + for line in lines: + if self.verbose: + print(line) + logger.debug("line: %r", line.rstrip()) + if not line: + break + L = line.strip().split(':') + if not L: + continue + keyword = L[0] + if keyword in valid_keywords: + getattr(result, keyword)(L) + return result + + def gen_key(self, input): + """Generate a key; you might use gen_key_input() to create the + control input. + + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input() + >>> result = gpg.gen_key(input) + >>> assert result + >>> result = gpg.gen_key('foo') + >>> assert not result + + """ + args = ["--gen-key --batch"] + result = self.result_map['generate'](self) + f = _make_binary_stream(input, self.encoding) + self._handle_io(args, f, result, binary=True) + f.close() + return result + + def gen_key_input(self, **kwargs): + """ + Generate --gen-key input per gpg doc/DETAILS + """ + parms = {} + for key, val in list(kwargs.items()): + key = key.replace('_','-').title() + parms[key] = val + parms.setdefault('Key-Type','RSA') + parms.setdefault('Key-Length',1024) + parms.setdefault('Name-Real', "Autogenerated Key") + parms.setdefault('Name-Comment', "Generated by gnupg.py") + try: + logname = os.environ['LOGNAME'] + except KeyError: + logname = os.environ['USERNAME'] + hostname = socket.gethostname() + parms.setdefault('Name-Email', "%s@%s" % (logname.replace(' ', '_'), + hostname)) + out = "Key-Type: %s\n" % parms.pop('Key-Type') + for key, val in list(parms.items()): + out += "%s: %s\n" % (key, val) + out += "%commit\n" + return out + + # Key-Type: RSA + # Key-Length: 1024 + # Name-Real: ISdlink Server on %s + # Name-Comment: Created by %s + # Name-Email: isdlink@%s + # Expire-Date: 0 + # %commit + # + # + # Key-Type: DSA + # Key-Length: 1024 + # Subkey-Type: ELG-E + # Subkey-Length: 1024 + # Name-Real: Joe Tester + # Name-Comment: with stupid passphrase + # Name-Email: joe@foo.bar + # Expire-Date: 0 + # Passphrase: abc + # %pubring foo.pub + # %secring foo.sec + # %commit + + # + # ENCRYPTION + # + def encrypt_file(self, file, recipients, sign=None, + always_trust=False, passphrase=None, + armor=True, output=None, symmetric=False): + "Encrypt the message read from the file-like object 'file'" + args = ['--encrypt'] + if symmetric: + args = ['--symmetric'] + else: + args = ['--encrypt'] + if not _is_sequence(recipients): + recipients = (recipients,) + for recipient in recipients: + args.append('--recipient "%s"' % recipient) + if armor: # create ascii-armored output - set to False for binary output + args.append('--armor') + if output: # write the output to a file with the specified name + if os.path.exists(output): + os.remove(output) # to avoid overwrite confirmation message + args.append('--output "%s"' % output) + if sign: + args.append('--sign --default-key "%s"' % sign) + if always_trust: + args.append("--always-trust") + result = self.result_map['crypt'](self) + self._handle_io(args, file, result, passphrase=passphrase, binary=True) + logger.debug('encrypt result: %r', result.data) + return result + + def encrypt(self, data, recipients, **kwargs): + """Encrypt the message contained in the string 'data' + + >>> import shutil + >>> if os.path.exists("keys"): + ... shutil.rmtree("keys") + >>> gpg = GPG(gnupghome="keys") + >>> input = gpg.gen_key_input(passphrase='foo') + >>> result = gpg.gen_key(input) + >>> print1 = result.fingerprint + >>> input = gpg.gen_key_input() + >>> result = gpg.gen_key(input) + >>> print2 = result.fingerprint + >>> result = gpg.encrypt("hello",print2) + >>> message = str(result) + >>> assert message != 'hello' + >>> result = gpg.decrypt(message) + >>> assert result + >>> str(result) + 'hello' + >>> result = gpg.encrypt("hello again",print1) + >>> message = str(result) + >>> result = gpg.decrypt(message) + >>> result.status == 'need passphrase' + True + >>> result = gpg.decrypt(message,passphrase='bar') + >>> result.status in ('decryption failed', 'bad passphrase') + True + >>> assert not result + >>> result = gpg.decrypt(message,passphrase='foo') + >>> result.status == 'decryption ok' + True + >>> str(result) + 'hello again' + >>> result = gpg.encrypt("signed hello",print2,sign=print1) + >>> result.status == 'need passphrase' + True + >>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo') + >>> result.status == 'encryption ok' + True + >>> message = str(result) + >>> result = gpg.decrypt(message) + >>> result.status == 'decryption ok' + True + >>> assert result.fingerprint == print1 + + """ + data = _make_binary_stream(data, self.encoding) + result = self.encrypt_file(data, recipients, **kwargs) + data.close() + return result + + def decrypt(self, message, **kwargs): + data = _make_binary_stream(message, self.encoding) + result = self.decrypt_file(data, **kwargs) + data.close() + return result + + def decrypt_file(self, file, always_trust=False, passphrase=None, + output=None): + args = ["--decrypt"] + if output: # write the output to a file with the specified name + if os.path.exists(output): + os.remove(output) # to avoid overwrite confirmation message + args.append('--output "%s"' % output) + if always_trust: + args.append("--always-trust") + result = self.result_map['crypt'](self) + self._handle_io(args, file, result, passphrase, binary=True) + logger.debug('decrypt result: %r', result.data) + return result + diff --git a/src/common/gpg.py b/src/common/gpg.py index 7e78fdc0b..af00d89a9 100644 --- a/src/common/gpg.py +++ b/src/common/gpg.py @@ -72,8 +72,8 @@ if HAVE_GPG: if result.fingerprint: return self._stripHeaderFooter(str(result)) -# if 'KEYEXPIRED' in resp: -# return 'KEYEXPIRED' + if result.status == 'key expired': + return 'KEYEXPIRED' return 'BAD_PASSPHRASE' def verify(self, str_, sign):