From a3126a453e614b921f3533867c1f52f50ce999ad Mon Sep 17 00:00:00 2001 From: Yann Leboulanger Date: Sun, 17 Apr 2005 17:08:28 +0000 Subject: [PATCH] second test for gnupg --- src/common/GnuPG.py | 299 ++++++++++++++++++++++---------------------- 1 file changed, 149 insertions(+), 150 deletions(-) diff --git a/src/common/GnuPG.py b/src/common/GnuPG.py index 0ce6b3935..6a9426f15 100644 --- a/src/common/GnuPG.py +++ b/src/common/GnuPG.py @@ -25,171 +25,170 @@ try: import GnuPGInterface except: USE_GPG = 0 - return +else: + class GnuPG(GnuPGInterface.GnuPG): + def __init__(self): + GnuPGInterface.GnuPG.__init__(self) + self._setup_my_options() -class GnuPG(GnuPGInterface.GnuPG): - def __init__(self): - GnuPGInterface.GnuPG.__init__(self) - self._setup_my_options() + def _setup_my_options(self): + self.options.armor = 1 + self.options.meta_interactive = 0 + self.options.extra_args.append('--no-secmem-warning') + # Nolith's patch - prevent crashs on non fully-trusted keys + self.options.extra_args.append('--always-trust') - def _setup_my_options(self): - self.options.armor = 1 - self.options.meta_interactive = 0 - self.options.extra_args.append('--no-secmem-warning') - # Nolith's patch - prevent crashs on non fully-trusted keys - self.options.extra_args.append('--always-trust') + def _read_response(self, child_stdout): + # Internal method: reads all the output from GPG, taking notice + # only of lines that begin with the magic [GNUPG:] prefix. + # (See doc/DETAILS in the GPG distribution for info on GPG's + # output when --status-fd is specified.) + # + # Returns a dictionary, mapping GPG's keywords to the arguments + # for that keyword. - def _read_response(self, child_stdout): - # Internal method: reads all the output from GPG, taking notice - # only of lines that begin with the magic [GNUPG:] prefix. - # (See doc/DETAILS in the GPG distribution for info on GPG's - # output when --status-fd is specified.) - # - # Returns a dictionary, mapping GPG's keywords to the arguments - # for that keyword. + resp = {} + while 1: + line = child_stdout.readline() + if line == "": break + line = line.rstrip() + if line[0:9] == '[GNUPG:] ': + # Chop off the prefix + line = line[9:] + L = line.split(None, 1) + keyword = L[0] + if len(L) > 1: + resp[ keyword ] = L[1] + else: + resp[ keyword ] = "" + return resp - resp = {} - while 1: - line = child_stdout.readline() - if line == "": break - line = line.rstrip() - if line[0:9] == '[GNUPG:] ': - # Chop off the prefix - line = line[9:] - L = line.split(None, 1) - keyword = L[0] - if len(L) > 1: - resp[ keyword ] = L[1] - else: - resp[ keyword ] = "" - return resp + def encrypt(self, str, recipients): + if not USE_GPG: + return str + self.options.recipients = recipients # a list! - def encrypt(self, str, recipients): - if not USE_GPG: - return str - self.options.recipients = recipients # a list! + proc = self.run(['--encrypt'], create_fhs=['stdin', 'stdout']) + proc.handles['stdin'].write(str) + proc.handles['stdin'].close() - proc = self.run(['--encrypt'], create_fhs=['stdin', 'stdout']) - proc.handles['stdin'].write(str) - proc.handles['stdin'].close() + output = proc.handles['stdout'].read() + proc.handles['stdout'].close() - output = proc.handles['stdout'].read() - proc.handles['stdout'].close() - - try: proc.wait() - except IOError: pass - return self._stripHeaderFooter(output) - - def decrypt(self, str, keyID): - if not USE_GPG: - return str - proc = self.run(['--decrypt', '-q', '-u %s'%keyID], create_fhs=['stdin', 'stdout', 'status']) - enc = self._addHeaderFooter(str, 'MESSAGE') - proc.handles['stdin'].write(enc) - proc.handles['stdin'].close() - - output = proc.handles['stdout'].read() - proc.handles['stdout'].close() - - resp = proc.handles['status'].read() - proc.handles['status'].close() - - try: proc.wait() - except IOError: pass - return output - - def sign(self, str, keyID): - if not USE_GPG: - return str - proc = self.run(['-b', '-u %s'%keyID], create_fhs=['stdin', 'stdout', 'status', 'stderr']) - proc.handles['stdin'].write(str) - proc.handles['stdin'].close() - - output = proc.handles['stdout'].read() - proc.handles['stdout'].close() - proc.handles['stderr'].close() - - stat = proc.handles['status'] - resp = self._read_response(stat) - proc.handles['status'].close() - - try: proc.wait() - except IOError: pass - if resp.has_key('BAD_PASSPHRASE'): - return 'BAD_PASSPHRASE' - elif resp.has_key('GOOD_PASSPHRASE'): + try: proc.wait() + except IOError: pass return self._stripHeaderFooter(output) - def verify(self, str, sign): - if not USE_GPG: - return str - if not str: - return '' - file = TemporaryFile(prefix='gajim') - fd = file.fileno() - file.write(str) - file.seek(0) + def decrypt(self, str, keyID): + if not USE_GPG: + return str + proc = self.run(['--decrypt', '-q', '-u %s'%keyID], create_fhs=['stdin', 'stdout', 'status']) + enc = self._addHeaderFooter(str, 'MESSAGE') + proc.handles['stdin'].write(enc) + proc.handles['stdin'].close() - proc = self.run(['--verify', '--enable-special-filenames', '-', '-&%s'%fd], create_fhs=['stdin', 'status', 'stderr']) + output = proc.handles['stdout'].read() + proc.handles['stdout'].close() - file.close() - sign = self._addHeaderFooter(sign, 'SIGNATURE') - proc.handles['stdin'].write(sign) - proc.handles['stdin'].close() - proc.handles['stderr'].close() + resp = proc.handles['status'].read() + proc.handles['status'].close() - stat = proc.handles['status'] - resp = self._read_response(stat) - proc.handles['status'].close() + try: proc.wait() + except IOError: pass + return output - try: proc.wait() - except IOError: pass + def sign(self, str, keyID): + if not USE_GPG: + return str + proc = self.run(['-b', '-u %s'%keyID], create_fhs=['stdin', 'stdout', 'status', 'stderr']) + proc.handles['stdin'].write(str) + proc.handles['stdin'].close() - keyid = '' - if resp.has_key('GOODSIG'): - keyid = resp['GOODSIG'].split()[0] - elif resp.has_key('BADSIG'): - keyid = resp['BADSIG'].split()[0] - return keyid + output = proc.handles['stdout'].read() + proc.handles['stdout'].close() + proc.handles['stderr'].close() - def get_secret_keys(self): - if not USE_GPG: - return - proc = self.run(['--with-colons', '--list-secret-keys'], \ - create_fhs=['stdout']) - output = proc.handles['stdout'].read() - proc.handles['stdout'].close() + stat = proc.handles['status'] + resp = self._read_response(stat) + proc.handles['status'].close() - keys = {} - lines = output.split('\n') - for line in lines: - sline = line.split(':') - if sline[0] == 'sec': - keys[sline[4][8:]] = sline[9] - return keys - try: proc.wait() - except IOError: pass + try: proc.wait() + except IOError: pass + if resp.has_key('BAD_PASSPHRASE'): + return 'BAD_PASSPHRASE' + elif resp.has_key('GOOD_PASSPHRASE'): + return self._stripHeaderFooter(output) - def _stripHeaderFooter(self, data): - """Remove header and footer from data""" - lines = data.split('\n') - while lines[0] != '': - lines.remove(lines[0]) - while lines[0] == '': - lines.remove(lines[0]) - i = 0 - for line in lines: - if line: - if line[0] == '-': break - i = i+1 - line = '\n'.join(lines[0:i]) - return line + def verify(self, str, sign): + if not USE_GPG: + return str + if not str: + return '' + file = TemporaryFile(prefix='gajim') + fd = file.fileno() + file.write(str) + file.seek(0) - def _addHeaderFooter(self, data, type): - """Add header and footer from data""" - out = "-----BEGIN PGP %s-----\n" % type - out = out + "Version: PGP\n" - out = out + "\n" - out = out + data + "\n" - out = out + "-----END PGP %s-----\n" % type - return out + proc = self.run(['--verify', '--enable-special-filenames', '-', '-&%s'%fd], create_fhs=['stdin', 'status', 'stderr']) + + file.close() + sign = self._addHeaderFooter(sign, 'SIGNATURE') + proc.handles['stdin'].write(sign) + proc.handles['stdin'].close() + proc.handles['stderr'].close() + + stat = proc.handles['status'] + resp = self._read_response(stat) + proc.handles['status'].close() + + try: proc.wait() + except IOError: pass + + keyid = '' + if resp.has_key('GOODSIG'): + keyid = resp['GOODSIG'].split()[0] + elif resp.has_key('BADSIG'): + keyid = resp['BADSIG'].split()[0] + return keyid + + def get_secret_keys(self): + if not USE_GPG: + return + proc = self.run(['--with-colons', '--list-secret-keys'], \ + create_fhs=['stdout']) + output = proc.handles['stdout'].read() + proc.handles['stdout'].close() + + keys = {} + lines = output.split('\n') + for line in lines: + sline = line.split(':') + if sline[0] == 'sec': + keys[sline[4][8:]] = sline[9] + return keys + try: proc.wait() + except IOError: pass + + def _stripHeaderFooter(self, data): + """Remove header and footer from data""" + lines = data.split('\n') + while lines[0] != '': + lines.remove(lines[0]) + while lines[0] == '': + lines.remove(lines[0]) + i = 0 + for line in lines: + if line: + if line[0] == '-': break + i = i+1 + line = '\n'.join(lines[0:i]) + return line + + def _addHeaderFooter(self, data, type): + """Add header and footer from data""" + out = "-----BEGIN PGP %s-----\n" % type + out = out + "Version: PGP\n" + out = out + "\n" + out = out + data + "\n" + out = out + "-----END PGP %s-----\n" % type + return out