From 37b755ecef6b176798ad80b5700662b9c12ad9f5 Mon Sep 17 00:00:00 2001 From: Brendan Taylor Date: Sun, 16 Sep 2007 04:16:45 +0000 Subject: [PATCH] split common crypto operations into a separate module --- src/common/crypto.py | 73 +++++++++++++++++ src/common/stanza_session.py | 154 ++++++++++------------------------- 2 files changed, 115 insertions(+), 112 deletions(-) create mode 100644 src/common/crypto.py diff --git a/src/common/crypto.py b/src/common/crypto.py new file mode 100644 index 000000000..5dc7c7cfa --- /dev/null +++ b/src/common/crypto.py @@ -0,0 +1,73 @@ +# common crypto functions (mostly specific to XEP-0116, but useful elsewhere) + +# convert a large integer to a big-endian bitstring +def encode_mpi(n): + if n >= 256: + return encode_mpi(n / 256) + chr(n % 256) + else: + return chr(n) + +# convert a large integer to a big-endian bitstring, padded with \x00s to +# 16 bytes +def encode_mpi_with_padding(n): + ret = encode_mpi(n) + + mod = len(ret) % 16 + if mod != 0: + ret = ((16 - mod) * '\x00') + ret + + return ret + +# convert a big-endian bitstring to an integer +def decode_mpi(s): + if len(s) == 0: + return 0 + else: + return 256 * decode_mpi(s[:-1]) + ord(s[-1]) + +def sha256(string): + sh = SHA256.new() + sh.update(string) + return sh.digest() + +base28_chr = "acdefghikmopqruvwxy123456789" + +def sas_28x5(m_a, form_b): + sha = sha256(m_a + form_b + 'Short Authentication String') + lsb24 = decode_mpi(sha[-3:]) + return base28(lsb24) + +def base28(n): + if n >= 28: + return base28(n / 28) + base28_chr[n % 28] + else: + return base28_chr[n] + +def random_bytes(bytes): + return os.urandom(bytes) + +def generate_nonce(): + return random_bytes(8) + +# generate a random number between 'bottom' and 'top' +def srand(bottom, top): + # minimum number of bytes needed to represent that range + bytes = int(math.ceil(math.log(top - bottom, 256))) + + # in retrospect, this is horribly inadequate. + return (decode_mpi(random_bytes(bytes)) % (top - bottom)) + bottom + +# a faster version of (base ** exp) % mod +# taken from +def powmod(base, exp, mod): + square = base % mod + result = 1 + + while exp > 0: + if exp & 1: # exponent is odd + result = (result * square) % mod + + square = (square * square) % mod + exp /= 2 + + return result diff --git a/src/common/stanza_session.py b/src/common/stanza_session.py index e6a0b7156..00430c04a 100644 --- a/src/common/stanza_session.py +++ b/src/common/stanza_session.py @@ -12,6 +12,7 @@ import os import time from common import dh +from common import crypto import xmpp.c14n import base64 @@ -164,43 +165,18 @@ class EncryptedStanzaSession(StanzaSession): kc_s = property(get_kc_s, set_kc_s) kc_o = property(get_kc_o, set_kc_o) - # convert a large integer to a big-endian bitstring - def encode_mpi(self, n): - if n >= 256: - return self.encode_mpi(n / 256) + chr(n % 256) - else: - return chr(n) - - # convert a large integer to a big-endian bitstring, padded with \x00s to - # 16 bytes - def encode_mpi_with_padding(self, n): - ret = self.encode_mpi(n) - - mod = len(ret) % 16 - if mod != 0: - ret = ((16 - mod) * '\x00') + ret - - return ret - - # convert a big-endian bitstring to an integer - def decode_mpi(self, s): - if len(s) == 0: - return 0 - else: - return 256 * self.decode_mpi(s[:-1]) + ord(s[-1]) - def encryptcounter(self): self.c_s = (self.c_s + 1) % (2 ** self.n) - return self.encode_mpi_with_padding(self.c_s) + return crypto.encode_mpi_with_padding(self.c_s) def decryptcounter(self): self.c_o = (self.c_o + 1) % (2 ** self.n) - return self.encode_mpi_with_padding(self.c_o) + return crypto.encode_mpi_with_padding(self.c_o) def sign(self, string): if self.negotiated['sign_algs'] == (XmlDsig + 'rsa-sha256'): - hash = self.sha256(string) - return self.encode_mpi(gajim.pubkey.sign(hash, '')[0]) + hash = crypto.sha256(string) + return crypto.encode_mpi(gajim.pubkey.sign(hash, '')[0]) def encrypt_stanza(self, stanza): encryptable = filter(lambda x: x.getName() not in ('error', 'amp', @@ -229,31 +205,13 @@ class EncryptedStanzaSession(StanzaSession): m_content = ''.join(map(str, c.getChildren())) c.NT.mac = base64.b64encode(self.hmac(self.km_s, m_content + \ - self.encode_mpi(old_en_counter))) + crypto.encode_mpi(old_en_counter))) return stanza def hmac(self, key, content): return HMAC.new(key, content, self.hash_alg).digest() - def sha256(self, string): - sh = SHA256.new() - sh.update(string) - return sh.digest() - - base28_chr = "acdefghikmopqruvwxy123456789" - - def sas_28x5(self, m_a, form_b): - sha = self.sha256(m_a + form_b + 'Short Authentication String') - lsb24 = self.decode_mpi(sha[-3:]) - return self.base28(lsb24) - - def base28(self, n): - if n >= 28: - return self.base28(n / 28) + self.base28_chr[n % 28] - else: - return self.base28_chr[n] - def generate_initiator_keys(self, k): return (self.hmac(k, 'Initiator Cipher Key'), self.hmac(k, 'Initiator MAC Key'), @@ -279,13 +237,6 @@ class EncryptedStanzaSession(StanzaSession): return self.encrypter.encrypt(encryptable) - # FIXME: use a real PRNG - def random_bytes(self, bytes): - return os.urandom(bytes) - - def generate_nonce(self): - return self.random_bytes(8) - def decrypt_stanza(self, stanza): c = stanza.getTag(name='c', namespace='http://www.xmpp.org/extensions/xep-0200.html#ns') @@ -298,7 +249,7 @@ class EncryptedStanzaSession(StanzaSession): received_mac = base64.b64decode(c.getTagData('mac')) calculated_mac = self.hmac(self.km_o, macable + \ - self.encode_mpi_with_padding(self.c_o)) + crypto.encode_mpi_with_padding(self.c_o)) if not calculated_mac == received_mac: raise exceptions.DecryptionError, 'bad signature' @@ -325,7 +276,7 @@ class EncryptedStanzaSession(StanzaSession): return ["may", "mustnot"] else: return ["mustnot", "may"] - + def get_shared_secret(self, e, y, p): if (not 1 < e < (p - 1)): raise exceptions.NegotiationError, "invalid DH value" @@ -341,14 +292,14 @@ class EncryptedStanzaSession(StanzaSession): m_o = base64.b64decode(form['mac']) id_o = base64.b64decode(form['identity']) - m_o_calculated = self.hmac(self.km_o, self.encode_mpi(self.c_o) + id_o) + m_o_calculated = self.hmac(self.km_o, crypto.encode_mpi(self.c_o) + id_o) if m_o_calculated != m_o: raise exceptions.NegotiationError, 'calculated m_%s differs from received m_%s' % (i_o, i_o) if i_o == 'a' and self.sas_algs == 'sas28x5': # XXX not necessary if there's a verified retained secret - self.sas = self.sas_28x5(m_o, self.form_s) + self.sas = crypto.sas_28x5(m_o, self.form_s) if self.negotiated['recv_pubkey']: plaintext = self.decrypt(id_o) @@ -363,7 +314,7 @@ class EncryptedStanzaSession(StanzaSession): if self.negotiated['sign_algs'] == (XmlDsig + 'rsa-sha256'): keyvalue = parsed.getTag(name='RSAKeyValue', namespace=XmlDsig) - n, e = map(lambda x: self.decode_mpi(base64.b64decode( + n, e = map(lambda x: crypto.decode_mpi(base64.b64decode( keyvalue.getTagData(x))), ('Modulus', 'Exponent')) eir_pubkey = RSA.construct((n,long(e))) @@ -374,14 +325,14 @@ class EncryptedStanzaSession(StanzaSession): enc_sig = parsed.getTag(name='SignatureValue', namespace=XmlDsig).getData() - signature = (self.decode_mpi(base64.b64decode(enc_sig)),) + signature = (crypto.decode_mpi(base64.b64decode(enc_sig)),) else: mac_o = self.decrypt(id_o) pubkey_o = '' c7l_form = self.c7lize_mac_id(form) - content = self.n_s + self.n_o + self.encode_mpi(dh_i) + pubkey_o + content = self.n_s + self.n_o + crypto.encode_mpi(dh_i) + pubkey_o if sigmai: self.form_o = c7l_form @@ -393,7 +344,7 @@ class EncryptedStanzaSession(StanzaSession): mac_o_calculated = self.hmac(self.ks_o, content) if self.negotiated['recv_pubkey']: - hash = self.sha256(mac_o_calculated) + hash = crypto.sha256(mac_o_calculated) if not eir_pubkey.verify(hash, signature): raise exceptions.NegotiationError, 'public key signature verification failed!' @@ -404,9 +355,10 @@ class EncryptedStanzaSession(StanzaSession): def make_identity(self, form, dh_i): if self.negotiated['send_pubkey']: if self.negotiated['sign_algs'] == (XmlDsig + 'rsa-sha256'): - fields = (gajim.pubkey.n, gajim.pubkey.e) + pubkey = gajim.interface.get_pubkey(self.conn.name) + fields = (pubkey.n, pubkey.e) - cb_fields = map(lambda f: base64.b64encode(self.encode_mpi(f)), fields) + cb_fields = map(lambda f: base64.b64encode(crypto.encode_mpi(f)), fields) pubkey_s = '%s%s' % tuple(cb_fields) else: @@ -415,7 +367,7 @@ class EncryptedStanzaSession(StanzaSession): form_s2 = ''.join(map(lambda el: xmpp.c14n.c14n(el), form.getChildren())) old_c_s = self.c_s - content = self.n_o + self.n_s + self.encode_mpi(dh_i) + pubkey_s + self.form_s + form_s2 + content = self.n_o + self.n_s + crypto.encode_mpi(dh_i) + pubkey_s + self.form_s + form_s2 mac_s = self.hmac(self.ks_s, content) @@ -432,12 +384,12 @@ class EncryptedStanzaSession(StanzaSession): else: id_s = self.encrypt(mac_s) - m_s = self.hmac(self.km_s, self.encode_mpi(old_c_s) + id_s) + m_s = self.hmac(self.km_s, crypto.encode_mpi(old_c_s) + id_s) if self.status == 'requested-e2e' and self.sas_algs == 'sas28x5': # we're alice; check for a retained secret # if none exists, prompt the user with the SAS - self.sas = self.sas_28x5(m_s, self.form_o) + self.sas = crypto.sas_28x5(m_s, self.form_o) if self.sigmai: self.check_identity() @@ -482,7 +434,7 @@ class EncryptedStanzaSession(StanzaSession): x.addChild(node=xmpp.DataField(name='sas_algs', value='sas28x5', typ='hidden')) x.addChild(node=xmpp.DataField(name='sign_algs', value='http://www.w3.org/2000/09/xmldsig#rsa-sha256', typ='hidden')) - self.n_s = self.generate_nonce() + self.n_s = crypto.generate_nonce() x.addChild(node=xmpp.DataField(name='my_nonce', value=base64.b64encode(self.n_s), typ='hidden')) @@ -607,17 +559,17 @@ class EncryptedStanzaSession(StanzaSession): bytes = int(self.n / 8) - self.n_s = self.generate_nonce() + self.n_s = crypto.generate_nonce() - self.c_o = self.decode_mpi(self.random_bytes(bytes)) # n-bit random number + self.c_o = crypto.decode_mpi(crypto.random_bytes(bytes)) # n-bit random number self.c_s = self.c_o ^ (2 ** (self.n - 1)) - self.y = self.srand(2 ** (2 * self.n - 1), p - 1) - self.d = self.powmod(g, self.y, p) + self.y = crypto.srand(2 ** (2 * self.n - 1), p - 1) + self.d = crypto.powmod(g, self.y, p) to_add = { 'my_nonce': self.n_s, - 'dhkeys': self.encode_mpi(self.d), - 'counter': self.encode_mpi(self.c_o), + 'dhkeys': crypto.encode_mpi(self.d), + 'counter': crypto.encode_mpi(self.c_o), 'nonce': self.n_o } for name in to_add: @@ -688,7 +640,7 @@ class EncryptedStanzaSession(StanzaSession): result = xmpp.DataForm(typ='result') - self.c_s = self.decode_mpi(base64.b64decode(form['counter'])) + self.c_s = crypto.decode_mpi(base64.b64decode(form['counter'])) self.c_o = self.c_s ^ (2 ** (self.n - 1)) self.n_o = base64.b64decode(form['my_nonce']) @@ -698,7 +650,7 @@ class EncryptedStanzaSession(StanzaSession): x = self.xes[mod_p] e = self.es[mod_p] - self.d = self.decode_mpi(base64.b64decode(form['dhkeys'])) + self.d = crypto.decode_mpi(base64.b64decode(form['dhkeys'])) self.k = self.get_shared_secret(self.d, x, p) @@ -720,7 +672,7 @@ class EncryptedStanzaSession(StanzaSession): rshashes = [base64.b64encode(rshash) for rshash in rshashes] result.addChild(node=xmpp.DataField(name='rshashes', value=rshashes)) - result.addChild(node=xmpp.DataField(name='dhkeys', value=base64.b64encode(self.encode_mpi(e)))) + result.addChild(node=xmpp.DataField(name='dhkeys', value=base64.b64encode(crypto.encode_mpi(e)))) self.form_o = ''.join(map(lambda el: xmpp.c14n.c14n(el), form.getChildren())) @@ -752,10 +704,10 @@ class EncryptedStanzaSession(StanzaSession): assert field in form.asDict(), "alice's form didn't have a %s field" % field # 4.5.1 generating provisory session keys - e = self.decode_mpi(base64.b64decode(form['dhkeys'])) + e = crypto.decode_mpi(base64.b64decode(form['dhkeys'])) p = dh.primes[self.modp] - if self.sha256(self.encode_mpi(e)) != self.negotiated['He']: + if crypto.sha256(crypto.encode_mpi(e)) != self.negotiated['He']: raise exceptions.NegotiationError, 'SHA256(e) != He' k = self.get_shared_secret(e, self.y, p) @@ -778,19 +730,20 @@ class EncryptedStanzaSession(StanzaSession): srs = secret break - # other shared secret, we haven't got one. + # other shared secret + # (we're not using one) oss = '' - k = self.sha256(k + srs + oss) + k = crypto.sha256(k + srs + oss) self.kc_s, self.km_s, self.ks_s = self.generate_responder_keys(k) self.kc_o, self.km_o, self.ks_o = self.generate_initiator_keys(k) # 4.5.5 if srs: - srshash = self.hmac(srs, 'Shared Retained Secret') + srshash = crypto.hmac(srs, 'Shared Retained Secret') else: - srshash = self.random_bytes(32) + srshash = crypto.random_bytes(32) x.addChild(node=xmpp.DataField(name='FORM_TYPE', value='urn:xmpp:ssn')) x.addChild(node=xmpp.DataField(name='nonce', value=base64.b64encode(self.n_o))) @@ -823,7 +776,7 @@ class EncryptedStanzaSession(StanzaSession): break oss = '' - k = self.sha256(self.k + srs + oss) + k = crypto.sha256(self.k + srs + oss) del self.k self.do_retained_secret(k, srs) @@ -857,14 +810,6 @@ class EncryptedStanzaSession(StanzaSession): self.check_identity() gajim.interface.save_new_secret(account, bjid, new_srs) - # generate a random number between 'bottom' and 'top' - def srand(self, bottom, top): - # minimum number of bytes needed to represent that range - bytes = int(math.ceil(math.log(top - bottom, 256))) - - # in retrospect, this is horribly inadequate. - return (self.decode_mpi(self.random_bytes(bytes)) % (top - bottom)) + bottom - def make_dhfield(self, modp_options, sigmai): dhs = [] @@ -872,39 +817,24 @@ class EncryptedStanzaSession(StanzaSession): p = dh.primes[modp] g = dh.generators[modp] - x = self.srand(2 ** (2 * self.n - 1), p - 1) + x = crypto.srand(2 ** (2 * self.n - 1), p - 1) # XXX this may be a source of performance issues - e = self.powmod(g, x, p) + e = crypto.powmod(g, x, p) self.xes[modp] = x self.es[modp] = e if sigmai: - dhs.append(base64.b64encode(self.encode_mpi(e))) + dhs.append(base64.b64encode(crypto.encode_mpi(e))) name = 'dhkeys' else: - He = self.sha256(self.encode_mpi(e)) + He = crypto.sha256(crypto.encode_mpi(e)) dhs.append(base64.b64encode(He)) name = 'dhhashes' return xmpp.DataField(name=name, typ='hidden', value=dhs) - # a faster version of (base ** exp) % mod - # taken from - def powmod(self, base, exp, mod): - square = base % mod - result = 1 - - while exp > 0: - if exp & 1: # exponent is odd - result = (result * square) % mod - - square = (square * square) % mod - exp /= 2 - - return result - def terminate_e2e(self): self.terminate()