diff --git a/src/common/gnupg.py b/src/common/gnupg.py index d3a81dbc8..6c28bbc57 100644 --- a/src/common/gnupg.py +++ b/src/common/gnupg.py @@ -31,11 +31,10 @@ Modifications Copyright (C) 2008-2014 Vinay Sajip. All rights reserved. A unittest harness (test_gnupg.py) has also been added. """ -import locale -__version__ = "0.3.4" +__version__ = "0.3.8.dev0" __author__ = "Vinay Sajip" -__date__ = "$05-Jun-2014 09:48:54$" +__date__ = "$07-Dec-2014 18:46:17$" try: from io import StringIO @@ -53,6 +52,13 @@ from subprocess import PIPE import sys import threading +STARTUPINFO = None +if os.name == 'nt': + try: + from subprocess import STARTUPINFO, STARTF_USESHOWWINDOW, SW_HIDE + except ImportError: + STARTUPINFO = None + try: import logging.NullHandler as NullHandler except ImportError: @@ -73,6 +79,50 @@ logger = logging.getLogger('gajim.c.gnupg') if not logger.handlers: logger.addHandler(NullHandler()) +# We use the test below because it works for Jython as well as CPython +if os.path.__name__ == 'ntpath': + # On Windows, we don't need shell quoting, other than worrying about + # paths with spaces in them. + def shell_quote(s): + return '"%s"' % s +else: + # Section copied from sarge + + # This regex determines which shell input needs quoting + # because it may be unsafe + UNSAFE = re.compile(r'[^\w%+,./:=@-]') + + def shell_quote(s): + """ + Quote text so that it is safe for Posix command shells. + + For example, "*.py" would be converted to "'*.py'". If the text is + considered safe it is returned unquoted. + + :param s: The value to quote + :type s: str (or unicode on 2.x) + :return: A safe version of the input, from the point of view of Posix + command shells + :rtype: The passed-in type + """ + if not isinstance(s, string_types): + raise TypeError('Expected string type, got %s' % type(s)) + if not s: + result = "''" + elif not UNSAFE.search(s): + result = s + else: + result = "'%s'" % s.replace("'", r"'\''") + return result + + # end of sarge code + +# Now that we use shell=False, we shouldn't need to quote arguments. +# Use no_quote instead of shell_quote to remind us of where quoting +# was needed. +def no_quote(s): + return s + def _copy_data(instream, outstream): # Copy one stream to another sent = 0 @@ -112,11 +162,19 @@ def _write_passphrase(stream, passphrase, encoding): passphrase = '%s\n' % passphrase passphrase = passphrase.encode(encoding) stream.write(passphrase) - logger.debug("Wrote passphrase: %r", passphrase) + logger.debug('Wrote passphrase') def _is_sequence(instance): return isinstance(instance, (list, tuple, set, frozenset)) +def _make_memory_stream(s): + try: + from io import BytesIO + rv = BytesIO(s) + except ImportError: + rv = StringIO(s) + return rv + def _make_binary_stream(s, encoding): if _py3k: if isinstance(s, str): @@ -124,12 +182,7 @@ def _make_binary_stream(s, encoding): else: if type(s) is not str: s = s.encode(encoding) - try: - from io import BytesIO - rv = BytesIO(s) - except ImportError: - rv = StringIO(s) - return rv + return _make_memory_stream(s) class Verify(object): "Handle status messages for --verify" @@ -175,7 +228,8 @@ class Verify(object): "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO", "DECRYPTION_OKAY", "INV_SGNR", "FILE_START", "FILE_ERROR", "FILE_DONE", "PKA_TRUST_GOOD", "PKA_TRUST_BAD", "BADMDC", - "GOODMDC", "NO_SGNR", "NOTATION_NAME", "NOTATION_DATA"): + "GOODMDC", "NO_SGNR", "NOTATION_NAME", "NOTATION_DATA", + "PROGRESS"): pass elif key == "BADSIG": self.valid = False @@ -230,6 +284,10 @@ class Verify(object): else: self.key_status = 'signing key was revoked' self.status = self.key_status + elif key == "UNEXPECTED": + self.valid = False + self.key_id = value + self.status = 'unexpected data' else: raise ValueError("Unknown status message: %r" % key) @@ -298,8 +356,8 @@ class ImportResult(object): 'problem': reason, 'text': self.problem_reason[reason]}) elif key == "IMPORT_RES": import_res = value.split() - for i in range(len(self.counts)): - setattr(self, self.counts[i], int(import_res[i])) + for i, count in enumerate(self.counts): + setattr(self, count, int(import_res[i])) elif key == "KEYEXPIRED": self.results.append({'fingerprint': None, 'problem': '0', 'text': 'Key expired'}) @@ -326,7 +384,53 @@ BASIC_ESCAPES = { r'\0': '\0', } -class ListKeys(list): +class SendResult(object): + def __init__(self, gpg): + self.gpg = gpg + + def handle_status(self, key, value): + logger.debug('SendResult: %s: %s', key, value) + +class SearchKeys(list): + ''' Handle status messages for --search-keys. + + Handle pub and uid (relating the latter to the former). + + Don't care about the rest + ''' + + UID_INDEX = 1 + FIELDS = 'type keyid algo length date expires'.split() + + def __init__(self, gpg): + self.gpg = gpg + self.curkey = None + self.fingerprints = [] + self.uids = [] + + def get_fields(self, args): + result = {} + for i, var in enumerate(self.FIELDS): + result[var] = args[i] + result['uids'] = [] + return result + + def pub(self, args): + self.curkey = curkey = self.get_fields(args) + self.append(curkey) + + def uid(self, args): + uid = args[self.UID_INDEX] + uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) + for k, v in BASIC_ESCAPES.items(): + uid = uid.replace(k, v) + self.curkey['uids'].append(uid) + self.uids.append(uid) + + def handle_status(self, key, value): + pass + +class ListKeys(SearchKeys): ''' Handle status messages for --list-keys. Handle pub and uid (relating the latter to the former). @@ -343,25 +447,17 @@ class ListKeys(list): grp = reserved for gpgsm rvk = revocation key ''' - def __init__(self, gpg): - self.gpg = gpg - self.curkey = None - self.fingerprints = [] - self.uids = [] + + UID_INDEX = 9 + FIELDS = 'type trust length algo keyid date expires dummy ownertrust uid'.split() def key(self, args): - vars = (""" - type trust length algo keyid date expires dummy ownertrust uid - """).split() - self.curkey = {} - for i in range(len(vars)): - self.curkey[vars[i]] = args[i] - self.curkey['uids'] = [] - if self.curkey['uid']: - self.curkey['uids'].append(self.curkey['uid']) - del self.curkey['uid'] - self.curkey['subkeys'] = [] - self.append(self.curkey) + self.curkey = curkey = self.get_fields(args) + if curkey['uid']: + curkey['uids'].append(curkey['uid']) + del curkey['uid'] + curkey['subkeys'] = [] + self.append(curkey) pub = sec = key @@ -369,22 +465,34 @@ class ListKeys(list): self.curkey['fingerprint'] = args[9] self.fingerprints.append(args[9]) - def uid(self, args): - uid = args[9] - uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) - for k, v in BASIC_ESCAPES.items(): - uid = uid.replace(k, v) - self.curkey['uids'].append(uid) - self.uids.append(uid) - def sub(self, args): subkey = [args[4], args[11]] self.curkey['subkeys'].append(subkey) - def handle_status(self, key, value): - pass -class Crypt(Verify): +class ScanKeys(ListKeys): + ''' Handle status messages for --with-fingerprint.''' + + def sub(self, args): + # --with-fingerprint --with-colons somehow outputs fewer colons, + # use the last value args[-1] instead of args[11] + subkey = [args[4], args[-1]] + self.curkey['subkeys'].append(subkey) + +class TextHandler(object): + def _as_text(self): + return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) + + if _py3k: + __str__ = _as_text + else: + __unicode__ = _as_text + + def __str__(self): + return self.data + + +class Crypt(Verify, TextHandler): "Handle status messages for --encrypt and --decrypt" def __init__(self, gpg): Verify.__init__(self, gpg) @@ -398,19 +506,16 @@ class Crypt(Verify): __bool__ = __nonzero__ - def __str__(self): - return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) - def handle_status(self, key, value): if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION", - "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", + "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", "PROGRESS", "CARDCTRL", "BADMDC", "SC_OP_FAILURE", "SC_OP_SUCCESS"): # in the case of ERROR, this is because a more specific error # message will have come first pass elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", "MISSING_PASSPHRASE", "DECRYPTION_FAILED", - "KEY_NOT_CREATED"): + "KEY_NOT_CREATED", "NEED_PASSPHRASE_PIN"): self.status = key.replace("_", " ").lower() elif key == "NEED_PASSPHRASE_SYM": self.status = 'need symmetric passphrase' @@ -487,31 +592,29 @@ class DeleteResult(object): __bool__ = __nonzero__ -class Sign(object): +class Sign(TextHandler): "Handle status messages for --sign" def __init__(self, gpg): self.gpg = gpg self.type = None self.hash_algo = None self.fingerprint = None - self.status = '' def __nonzero__(self): return self.fingerprint is not None __bool__ = __nonzero__ - def __str__(self): - return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) - def handle_status(self, key, value): if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR", - "KEYREVOKED", "NO_SGNR", "MISSING_PASSPHRASE", - "SC_OP_FAILURE", "SC_OP_SUCCESS"): + "NO_SGNR", "MISSING_PASSPHRASE", "NEED_PASSPHRASE_PIN", + "SC_OP_FAILURE", "SC_OP_SUCCESS", "PROGRESS"): pass elif key in ("KEYEXPIRED", "SIGEXPIRED"): self.status = 'key expired' + elif key == "KEYREVOKED": + self.status = 'key revoked' elif key == "SIG_CREATED": (self.type, algo, self.hash_algo, cls, @@ -520,7 +623,8 @@ class Sign(object): else: raise ValueError("Unknown status message: %r" % key) -VERSION_RE = re.compile(r'gpg \(GnuPG\) (\d+(\.\d+)*)'.encode('utf-8'), re.I) +VERSION_RE = re.compile(r'gpg \(GnuPG\) (\d+(\.\d+)*)'.encode('ascii'), re.I) +HEX_DIGITS_RE = re.compile(r'[0-9a-f]+$', re.I) class GPG(object): @@ -531,7 +635,10 @@ class GPG(object): 'delete': DeleteResult, 'generate': GenKey, 'import': ImportResult, + 'send': SendResult, 'list': ListKeys, + 'scan': ScanKeys, + 'search': SearchKeys, 'sign': Sign, 'verify': Verify, } @@ -571,9 +678,11 @@ class GPG(object): if isinstance(options, str): options = [options] self.options = options - self.encoding = locale.getpreferredencoding() - if self.encoding is None: # This happens on Jython! - self.encoding = sys.stdin.encoding + # Changed in 0.3.7 to use Latin-1 encoding rather than + # locale.getpreferredencoding falling back to sys.stdin.encoding + # falling back to utf-8, because gpg itself uses latin-1 as the default + # encoding. + self.encoding = 'latin-1' if gnupghome and not os.path.isdir(self.gnupghome): os.makedirs(self.gnupghome,0x1C0) p = self._open_subprocess(["--version"]) @@ -586,7 +695,7 @@ class GPG(object): if not m: self.version = None else: - dot = '.'.encode('utf-8') + dot = '.'.encode('ascii') self.version = tuple([int(s) for s in m.groups()[0].split(dot)]) def make_args(self, args, passphrase): @@ -595,18 +704,18 @@ class GPG(object): will be appended. The ``passphrase`` argument needs to be True if a passphrase will be sent to GPG, else False. """ - cmd = [self.gpgbinary, '--status-fd 2 --no-tty'] + cmd = [self.gpgbinary, '--status-fd', '2', '--no-tty'] if self.gnupghome: - cmd.append('--homedir "%s"' % self.gnupghome) + cmd.extend(['--homedir', no_quote(self.gnupghome)]) if self.keyring: cmd.append('--no-default-keyring') for fn in self.keyring: - cmd.append('--keyring "%s"' % fn) + cmd.extend(['--keyring', no_quote(fn)]) if self.secret_keyring: for fn in self.secret_keyring: - cmd.append('--secret-keyring "%s"' % fn) + cmd.extend(['--secret-keyring', no_quote(fn)]) if passphrase: - cmd.append('--batch --passphrase-fd 0') + cmd.extend(['--batch', '--passphrase-fd', '0']) if self.use_agent: cmd.append('--use-agent') if self.options: @@ -617,11 +726,19 @@ class GPG(object): def _open_subprocess(self, args, passphrase=False): # Internal method: open a pipe to a GPG subprocess and return # the file objects for communicating with it. - cmd = ' '.join(self.make_args(args, passphrase)) + cmd = self.make_args(args, passphrase) if self.verbose: - print(cmd) + pcmd = ' '.join(cmd) + print(pcmd) logger.debug("%s", cmd) - return Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) + if not STARTUPINFO: + si = None + else: + si = STARTUPINFO() + si.dwFlags = STARTF_USESHOWWINDOW + si.wShowWindow = SW_HIDE + return Popen(cmd, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE, + startupinfo=si) def _read_response(self, stream, result): # Internal method: reads all the stderr output from GPG, taking notice @@ -723,8 +840,15 @@ class GPG(object): f.close() return result + def set_output_without_confirmation(self, args, output): + "If writing to a file which exists, avoid a confirmation message." + if os.path.exists(output): + # We need to avoid an overwrite confirmation message + args.extend(['--batch', '--yes']) + args.extend(['--output', output]) + def sign_file(self, file, keyid=None, passphrase=None, clearsign=True, - detach=False, binary=False): + detach=False, binary=False, output=None): """sign file""" logger.debug("sign_file: %s", file) if binary: @@ -738,7 +862,10 @@ class GPG(object): elif clearsign: args.append("--clearsign") if keyid: - args.append('--default-key "%s"' % keyid) + args.extend(['--default-key', no_quote(keyid)]) + if output: # write the output to a file with the specified name + self.set_output_without_confirmation(args, output) + result = self.result_map['sign'](self) #We could use _handle_io here except for the fact that if the #passphrase is bad, gpg bails and you can't write the message. @@ -790,8 +917,8 @@ class GPG(object): logger.debug('Wrote to temp file: %r', s) os.write(fd, s) os.close(fd) - args.append(fn) - args.append('"%s"' % data_filename) + args.append(no_quote(fn)) + args.append(no_quote(data_filename)) try: p = self._open_subprocess(args) self._collect_output(p, result, stdin=p.stdin) @@ -799,6 +926,15 @@ class GPG(object): os.unlink(fn) return result + def verify_data(self, sig_filename, data): + "Verify the signature in sig_filename against data in memory" + logger.debug('verify_data: %r, %r ...', sig_filename, data[:16]) + result = self.result_map['verify'](self) + args = ['--verify', no_quote(sig_filename), '-'] + stream = _make_memory_stream(data) + self._handle_io(args, stream, result, binary=True) + return result + # # KEY MANAGEMENT # @@ -859,9 +995,6 @@ class GPG(object): def recv_keys(self, keyserver, *keyids): """Import a key from a keyserver - The doctest assertion is skipped in Jython because security permissions - may prevent the recv_keys from succeeding. - >>> import shutil >>> shutil.rmtree("keys") >>> gpg = GPG(gnupghome="keys") @@ -874,33 +1007,60 @@ class GPG(object): logger.debug('recv_keys: %r', keyids) data = _make_binary_stream("", self.encoding) #data = "" - args = ['--keyserver', keyserver, '--recv-keys'] - args.extend(keyids) + args = ['--keyserver', no_quote(keyserver), '--recv-keys'] + args.extend([no_quote(k) for k in keyids]) self._handle_io(args, data, result, binary=True) logger.debug('recv_keys result: %r', result.__dict__) data.close() return result + def send_keys(self, keyserver, *keyids): + """Send a key to a keyserver. + + Note: it's not practical to test this function without sending + arbitrary data to live keyservers. + """ + result = self.result_map['send'](self) + logger.debug('send_keys: %r', keyids) + data = _make_binary_stream('', self.encoding) + #data = "" + args = ['--keyserver', no_quote(keyserver), '--send-keys'] + args.extend([no_quote(k) for k in keyids]) + self._handle_io(args, data, result, binary=True) + logger.debug('send_keys result: %r', result.__dict__) + data.close() + return result + def delete_keys(self, fingerprints, secret=False): which='key' if secret: which='secret-key' if _is_sequence(fingerprints): - fingerprints = ' '.join(fingerprints) - args = ['--batch --delete-%s "%s"' % (which, fingerprints)] + fingerprints = [no_quote(s) for s in fingerprints] + else: + fingerprints = [no_quote(fingerprints)] + args = ['--batch', '--delete-%s' % which] + args.extend(fingerprints) result = self.result_map['delete'](self) p = self._open_subprocess(args) self._collect_output(p, result, stdin=p.stdin) return result - def export_keys(self, keyids, secret=False): + def export_keys(self, keyids, secret=False, armor=True, minimal=False): "export the indicated keys. 'keyid' is anything gpg accepts" which='' if secret: which='-secret-key' if _is_sequence(keyids): - keyids = ' '.join(['"%s"' % k for k in keyids]) - args = ["--armor --export%s %s" % (which, keyids)] + keyids = [no_quote(k) for k in keyids] + else: + keyids = [no_quote(keyids)] + args = ['--export%s' % which] + if armor: + args.append('--armor') + if minimal: + args.extend(['--export-options','export-minimal']) + args.extend(keyids) p = self._open_subprocess(args) # gpg --export produces no status-fd output; stdout will be # empty in case of failure @@ -910,6 +1070,27 @@ class GPG(object): logger.debug('export_keys result: %r', result.data) return result.data.decode(self.encoding, self.decode_errors) + def _get_list_output(self, p, kind): + # Get the response information + result = self.result_map[kind](self) + self._collect_output(p, result, stdin=p.stdin) + lines = result.data.decode(self.encoding, + self.decode_errors).splitlines() + valid_keywords = 'pub uid sec fpr sub'.split() + for line in lines: + if self.verbose: + print(line) + logger.debug("line: %r", line.rstrip()) + if not line: + break + L = line.strip().split(':') + if not L: + continue + keyword = L[0] + if keyword in valid_keywords: + getattr(result, keyword)(L) + return result + def list_keys(self, secret=False): """ list the keys currently in the keyring @@ -930,25 +1111,58 @@ class GPG(object): which='keys' if secret: which='secret-keys' - args = "--list-%s --fixed-list-mode --fingerprint --with-colons" % (which,) - args = [args] + args = ["--list-%s" % which, "--fixed-list-mode", "--fingerprint", + "--with-colons"] + p = self._open_subprocess(args) + return self._get_list_output(p, 'list') + + def scan_keys(self, filename): + """ + List details of an ascii armored or binary key file + without first importing it to the local keyring. + + The function achieves this by running: + $ gpg --with-fingerprint --with-colons filename + """ + args = ['--with-fingerprint', '--with-colons'] + args.append(no_quote(filename)) + p = self._open_subprocess(args) + return self._get_list_output(p, 'scan') + + def search_keys(self, query, keyserver='pgp.mit.edu'): + """ search keyserver by query (using --search-keys option) + + >>> import shutil + >>> shutil.rmtree('keys') + >>> gpg = GPG(gnupghome='keys') + >>> os.chmod('keys', 0x1C0) + >>> result = gpg.search_keys('') + >>> assert result, 'Failed using default keyserver' + >>> keyserver = 'keyserver.ubuntu.com' + >>> result = gpg.search_keys('', keyserver) + >>> assert result, 'Failed using keyserver.ubuntu.com' + + """ + query = query.strip() + if HEX_DIGITS_RE.match(query): + query = '0x' + query + args = ['--fixed-list-mode', '--fingerprint', '--with-colons', + '--keyserver', no_quote(keyserver), '--search-keys', + no_quote(query)] p = self._open_subprocess(args) - # there might be some status thingumy here I should handle... (amk) - # ...nope, unless you care about expired sigs or keys (stevegt) - # Get the response information - result = self.result_map['list'](self) + result = self.result_map['search'](self) self._collect_output(p, result, stdin=p.stdin) lines = result.data.decode(self.encoding, self.decode_errors).splitlines() - valid_keywords = 'pub uid sec fpr sub'.split() + valid_keywords = ['pub', 'uid'] for line in lines: if self.verbose: print(line) - logger.debug("line: %r", line.rstrip()) - if not line: - break + logger.debug('line: %r', line.rstrip()) + if not line: # sometimes get blank lines on Windows + continue L = line.strip().split(':') if not L: continue @@ -969,7 +1183,7 @@ class GPG(object): >>> assert not result """ - args = ["--gen-key --batch"] + args = ["--gen-key", "--batch"] result = self.result_map['generate'](self) f = _make_binary_stream(input, self.encoding) self._handle_io(args, f, result, binary=True) @@ -986,9 +1200,8 @@ class GPG(object): if str(val).strip(): # skip empty strings parms[key] = val parms.setdefault('Key-Type','RSA') - parms.setdefault('Key-Length',1024) + parms.setdefault('Key-Length',2048) parms.setdefault('Name-Real', "Autogenerated Key") - parms.setdefault('Name-Comment', "Generated by gnupg.py") try: logname = os.environ['LOGNAME'] except KeyError: @@ -1033,23 +1246,30 @@ class GPG(object): "Encrypt the message read from the file-like object 'file'" args = ['--encrypt'] if symmetric: + # can't be False or None - could be True or a cipher algo value + # such as AES256 args = ['--symmetric'] + if symmetric is not True: + args.extend(['--cipher-algo', no_quote(symmetric)]) + # else use the default, currently CAST5 else: - args = ['--encrypt'] + if not recipients: + raise ValueError('No recipients specified with asymmetric ' + 'encryption') if not _is_sequence(recipients): recipients = (recipients,) for recipient in recipients: - args.append('--recipient "%s"' % recipient) - if armor: # create ascii-armored output - set to False for binary output + args.extend(['--recipient', no_quote(recipient)]) + if armor: # create ascii-armored output - False for binary output args.append('--armor') if output: # write the output to a file with the specified name - if os.path.exists(output): - os.remove(output) # to avoid overwrite confirmation message - args.append('--output "%s"' % output) - if sign: - args.append('--sign --default-key "%s"' % sign) + self.set_output_without_confirmation(args, output) + if sign is True: + args.append('--sign') + elif sign: + args.extend(['--sign', '--default-key', no_quote(sign)]) if always_trust: - args.append("--always-trust") + args.append('--always-trust') result = self.result_map['crypt'](self) self._handle_io(args, file, result, passphrase=passphrase, binary=True) logger.debug('encrypt result: %r', result.data) @@ -1111,13 +1331,10 @@ class GPG(object): output=None): args = ["--decrypt"] if output: # write the output to a file with the specified name - if os.path.exists(output): - os.remove(output) # to avoid overwrite confirmation message - args.append('--output "%s"' % output) + self.set_output_without_confirmation(args, output) if always_trust: args.append("--always-trust") result = self.result_map['crypt'](self) self._handle_io(args, file, result, passphrase, binary=True) logger.debug('decrypt result: %r', result.data) return result -