diff --git a/src/common/gnupg.py b/src/common/gnupg.py index ea4695478..04550dffe 100644 --- a/src/common/gnupg.py +++ b/src/common/gnupg.py @@ -27,15 +27,15 @@ Vinay Sajip to make use of the subprocess module (Steve's version uses os.fork() and so does not work on Windows). Renamed to gnupg.py to avoid confusion with the previous versions. -Modifications Copyright (C) 2008-2012 Vinay Sajip. All rights reserved. +Modifications Copyright (C) 2008-2013 Vinay Sajip. All rights reserved. A unittest harness (test_gnupg.py) has also been added. """ import locale -__version__ = "0.3.0" +__version__ = "0.3.4" __author__ = "Vinay Sajip" -__date__ = "$12-May-2012 10:49:10$" +__date__ = "$05-Jun-2013 09:48:54$" try: from io import StringIO @@ -46,6 +46,7 @@ import codecs import locale import logging import os +import re import socket from subprocess import Popen from subprocess import PIPE @@ -61,8 +62,12 @@ except ImportError: try: unicode _py3k = False + string_types = basestring + text_type = unicode except NameError: _py3k = True + string_types = str + text_type = str logger = logging.getLogger('gajim.c.gnupg') if not logger.handlers: @@ -77,7 +82,7 @@ def _copy_data(instream, outstream): enc = 'ascii' while True: data = instream.read(1024) - if len(data) == 0: + if not data: break sent += len(data) logger.debug("sending chunk (%d): %r", sent, data[:256]) @@ -110,16 +115,16 @@ def _write_passphrase(stream, passphrase, encoding): logger.debug("Wrote passphrase: %r", passphrase) def _is_sequence(instance): - return isinstance(instance,list) or isinstance(instance,tuple) + return isinstance(instance, (list, tuple, set, frozenset)) def _make_binary_stream(s, encoding): + if _py3k: + if isinstance(s, str): + s = s.encode(encoding) + else: + if type(s) is not str: + s = s.encode(encoding) try: - if _py3k: - if isinstance(s, str): - s = s.encode(encoding) - else: - if type(s) is not str: - s = s.encode(encoding) from io import BytesIO rv = BytesIO(s) except ImportError: @@ -129,12 +134,33 @@ def _make_binary_stream(s, encoding): class Verify(object): "Handle status messages for --verify" + TRUST_UNDEFINED = 0 + TRUST_NEVER = 1 + TRUST_MARGINAL = 2 + TRUST_FULLY = 3 + TRUST_ULTIMATE = 4 + + TRUST_LEVELS = { + "TRUST_UNDEFINED" : TRUST_UNDEFINED, + "TRUST_NEVER" : TRUST_NEVER, + "TRUST_MARGINAL" : TRUST_MARGINAL, + "TRUST_FULLY" : TRUST_FULLY, + "TRUST_ULTIMATE" : TRUST_ULTIMATE, + } + def __init__(self, gpg): self.gpg = gpg self.valid = False self.fingerprint = self.creation_date = self.timestamp = None self.signature_id = self.key_id = None self.username = None + self.key_status = None + self.status = None + self.pubkey_fingerprint = None + self.expire_timestamp = None + self.sig_timestamp = None + self.trust_text = None + self.trust_level = None def __nonzero__(self): return self.valid @@ -142,15 +168,30 @@ class Verify(object): __bool__ = __nonzero__ def handle_status(self, key, value): - if key in ("TRUST_UNDEFINED", "TRUST_NEVER", "TRUST_MARGINAL", - "TRUST_FULLY", "TRUST_ULTIMATE", "RSA_OR_IDEA", "NODATA", - "IMPORT_RES", "PLAINTEXT", "PLAINTEXT_LENGTH", - "POLICY_URL", "DECRYPTION_INFO", "DECRYPTION_OKAY"): + if key in self.TRUST_LEVELS: + self.trust_text = key + self.trust_level = self.TRUST_LEVELS[key] + elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT", + "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO", + "DECRYPTION_OKAY", "INV_SGNR", "FILE_START", "FILE_ERROR", + "FILE_DONE", "PKA_TRUST_GOOD", "PKA_TRUST_BAD", "BADMDC", + "GOODMDC", "NO_SGNR"): pass elif key == "BADSIG": self.valid = False self.status = 'signature bad' self.key_id, self.username = value.split(None, 1) + elif key == "ERRSIG": + self.valid = False + (self.key_id, + algo, hash_algo, + cls, + self.timestamp) = value.split()[:5] + self.status = 'signature error' + elif key == "EXPSIG": + self.valid = False + self.status = 'signature expired' + self.key_id, self.username = value.split(None, 1) elif key == "GOODSIG": self.valid = True self.status = 'signature good' @@ -166,13 +207,6 @@ class Verify(object): elif key == "SIG_ID": (self.signature_id, self.creation_date, self.timestamp) = value.split() - elif key == "ERRSIG": - self.valid = False - (self.key_id, - algo, hash_algo, - cls, - self.timestamp) = value.split()[:5] - self.status = 'signature error' elif key == "DECRYPTION_FAILED": self.valid = False self.key_id = value @@ -181,17 +215,21 @@ class Verify(object): self.valid = False self.key_id = value self.status = 'no public key' - elif key in ("KEYEXPIRED", "SIGEXPIRED"): + elif key in ("KEYEXPIRED", "SIGEXPIRED", "KEYREVOKED"): # these are useless in verify, since they are spit out for any # pub/subkeys on the key, not just the one doing the signing. # if we want to check for signatures with expired key, - # the relevant flag is EXPKEYSIG. + # the relevant flag is EXPKEYSIG or REVKEYSIG. pass elif key in ("EXPKEYSIG", "REVKEYSIG"): # signed with expired or revoked key self.valid = False self.key_id = value.split()[0] - self.status = (('%s %s') % (key[:3], key[3:])).lower() + if key == "EXPKEYSIG": + self.key_status = 'signing key has expired' + else: + self.key_status = 'signing key was revoked' + self.status = self.key_status else: raise ValueError("Unknown status message: %r" % key) @@ -273,11 +311,21 @@ class ImportResult(object): def summary(self): l = [] - l.append('%d imported'%self.imported) + l.append('%d imported' % self.imported) if self.not_imported: - l.append('%d not imported'%self.not_imported) + l.append('%d not imported' % self.not_imported) return ', '.join(l) +ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I) +BASIC_ESCAPES = { + r'\n': '\n', + r'\r': '\r', + r'\f': '\f', + r'\v': '\v', + r'\b': '\b', + r'\0': '\0', +} + class ListKeys(list): ''' Handle status messages for --list-keys. @@ -322,8 +370,12 @@ class ListKeys(list): self.fingerprints.append(args[9]) def uid(self, args): - self.curkey['uids'].append(args[9]) - self.uids.append(args[9]) + uid = args[9] + uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) + for k, v in BASIC_ESCAPES.items(): + uid = uid.replace(k, v) + self.curkey['uids'].append(uid) + self.uids.append(uid) def sub(self, args): subkey = [args[4], args[11]] @@ -352,7 +404,7 @@ class Crypt(Verify): def handle_status(self, key, value): if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION", "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", - "CARDCTRL"): + "CARDCTRL", "BADMDC", "SC_OP_FAILURE", "SC_OP_SUCCESS"): # in the case of ERROR, this is because a more specific error # message will have come first pass @@ -400,7 +452,7 @@ class GenKey(object): return self.fingerprint or '' def handle_status(self, key, value): - if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA"): + if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA", "KEY_NOT_CREATED"): pass elif key == "KEY_CREATED": (self.type,self.fingerprint) = value.split() @@ -419,7 +471,7 @@ class DeleteResult(object): problem_reason = { '1': 'No such key', '2': 'Must delete secret key first', - '3': 'Ambigious specification', + '3': 'Ambiguous specification', } def handle_status(self, key, value): @@ -429,13 +481,19 @@ class DeleteResult(object): else: raise ValueError("Unknown status message: %r" % key) + def __nonzero__(self): + return self.status == 'ok' + + __bool__ = __nonzero__ + + class Sign(object): "Handle status messages for --sign" def __init__(self, gpg): self.gpg = gpg self.type = None + self.hash_algo = None self.fingerprint = None - self.status = '' def __nonzero__(self): return self.fingerprint is not None @@ -447,18 +505,19 @@ class Sign(object): def handle_status(self, key, value): if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", - "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "MISSING_PASSPHRASE"): + "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR", + "KEYEXPIRED", "SIGEXPIRED", "KEYREVOKED", "NO_SGNR", + "MISSING_PASSPHRASE", "SC_OP_FAILURE", "SC_OP_SUCCESS"): pass - elif key in ("KEYEXPIRED", "SIGEXPIRED"): - self.status = 'key expired' elif key == "SIG_CREATED": (self.type, - algo, hashalgo, cls, + algo, self.hash_algo, cls, self.timestamp, self.fingerprint ) = value.split() else: raise ValueError("Unknown status message: %r" % key) +VERSION_RE = re.compile(r'gpg \(GnuPG\) (\d+(\.\d+)*)'.encode('utf-8'), re.I) class GPG(object): @@ -476,21 +535,39 @@ class GPG(object): "Encapsulate access to the gpg executable" def __init__(self, gpgbinary='gpg', gnupghome=None, verbose=False, - use_agent=False, keyring=None): + use_agent=False, keyring=None, options=None, + secret_keyring=None): """Initialize a GPG process wrapper. Options are: gpgbinary -- full pathname for GPG binary. gnupghome -- full pathname to where we can find the public and private keyrings. Default is whatever gpg defaults to. - keyring -- name of alternative keyring file to use. If specified, - the default keyring is not used. + keyring -- name of alternative keyring file to use, or list of such + keyrings. If specified, the default keyring is not used. + options =-- a list of additional options to pass to the GPG binary. + secret_keyring -- name of alternative secret keyring file to use, or + list of such keyrings. """ self.gpgbinary = gpgbinary self.gnupghome = gnupghome + if keyring: + # Allow passing a string or another iterable. Make it uniformly + # a list of keyring filenames + if isinstance(keyring, string_types): + keyring = [keyring] self.keyring = keyring + if secret_keyring: + # Allow passing a string or another iterable. Make it uniformly + # a list of keyring filenames + if isinstance(secret_keyring, string_types): + secret_keyring = [secret_keyring] + self.secret_keyring = secret_keyring self.verbose = verbose self.use_agent = use_agent + if isinstance(options, str): + options = [options] + self.options = options self.encoding = locale.getpreferredencoding() if self.encoding is None: # This happens on Jython! self.encoding = sys.stdin.encoding @@ -502,21 +579,42 @@ class GPG(object): if p.returncode != 0: raise ValueError("Error invoking gpg: %s: %s" % (p.returncode, result.stderr)) + m = VERSION_RE.match(result.data) + if not m: + self.version = None + else: + dot = '.'.encode('utf-8') + self.version = tuple([int(s) for s in m.groups()[0].split(dot)]) - def _open_subprocess(self, args, passphrase=False): - # Internal method: open a pipe to a GPG subprocess and return - # the file objects for communicating with it. + def make_args(self, args, passphrase): + """ + Make a list of command line elements for GPG. The value of ``args`` + will be appended. The ``passphrase`` argument needs to be True if + a passphrase will be sent to GPG, else False. + """ cmd = [self.gpgbinary, '--status-fd 2 --no-tty'] if self.gnupghome: - cmd.append('--homedir "%s" ' % self.gnupghome) + cmd.append('--homedir "%s"' % self.gnupghome) if self.keyring: - cmd.append('--no-default-keyring --keyring "%s" ' % self.keyring) + cmd.append('--no-default-keyring') + for fn in self.keyring: + cmd.append('--keyring "%s"' % fn) + if self.secret_keyring: + for fn in self.secret_keyring: + cmd.append('--secret-keyring "%s"' % fn) if passphrase: cmd.append('--batch --passphrase-fd 0') if self.use_agent: cmd.append('--use-agent') + if self.options: + cmd.extend(self.options) cmd.extend(args) - cmd = ' '.join(cmd) + return cmd + + def _open_subprocess(self, args, passphrase=False): + # Internal method: open a pipe to a GPG subprocess and return + # the file objects for communicating with it. + cmd = ' '.join(self.make_args(args, passphrase)) if self.verbose: print(cmd) logger.debug("%s", cmd) @@ -597,7 +695,7 @@ class GPG(object): stderr.close() stdout.close() - def _handle_io(self, args, file, result, passphrase=None, binary=False): + def _handle_io(self, args, fileobj, result, passphrase=None, binary=False): "Handle a call to GPG - pass input data, collect output data" # Handle a basic data call - pass data to GPG, handle the output # including status information. Garbage In, Garbage Out :) @@ -608,7 +706,7 @@ class GPG(object): stdin = p.stdin if passphrase: _write_passphrase(stdin, passphrase, self.encoding) - writer = _threaded_copy_data(file, stdin) + writer = _threaded_copy_data(fileobj, stdin) self._collect_output(p, result, writer, stdin) return result @@ -758,10 +856,14 @@ class GPG(object): def recv_keys(self, keyserver, *keyids): """Import a key from a keyserver + The doctest assertion is skipped in Jython because security permissions + may prevent the recv_keys from succeeding. + >>> import shutil >>> shutil.rmtree("keys") >>> gpg = GPG(gnupghome="keys") - >>> result = gpg.recv_keys('pgp.mit.edu', '3FF0DB166A7476EA') + >>> os.chmod('keys', 0x1C0) + >>> result = gpg.recv_keys('keyserver.ubuntu.com', '92905378') >>> assert result """ @@ -878,7 +980,8 @@ class GPG(object): parms = {} for key, val in list(kwargs.items()): key = key.replace('_','-').title() - parms[key] = val + if str(val).strip(): # skip empty strings + parms[key] = val parms.setdefault('Key-Type','RSA') parms.setdefault('Key-Length',1024) parms.setdefault('Name-Real', "Autogenerated Key") @@ -971,9 +1074,6 @@ class GPG(object): 'hello' >>> result = gpg.encrypt("hello again",print1) >>> message = str(result) - >>> result = gpg.decrypt(message) - >>> result.status == 'need passphrase' - True >>> result = gpg.decrypt(message,passphrase='bar') >>> result.status in ('decryption failed', 'bad passphrase') True @@ -983,9 +1083,6 @@ class GPG(object): True >>> str(result) 'hello again' - >>> result = gpg.encrypt("signed hello",print2,sign=print1) - >>> result.status == 'need passphrase' - True >>> result = gpg.encrypt("signed hello",print2,sign=print1,passphrase='foo') >>> result.status == 'encryption ok' True