split common crypto operations into a separate module

This commit is contained in:
Brendan Taylor 2007-09-16 04:16:45 +00:00
parent 9091496812
commit 37b755ecef
2 changed files with 115 additions and 112 deletions

73
src/common/crypto.py Normal file
View file

@ -0,0 +1,73 @@
# common crypto functions (mostly specific to XEP-0116, but useful elsewhere)
# convert a large integer to a big-endian bitstring
def encode_mpi(n):
if n >= 256:
return encode_mpi(n / 256) + chr(n % 256)
else:
return chr(n)
# convert a large integer to a big-endian bitstring, padded with \x00s to
# 16 bytes
def encode_mpi_with_padding(n):
ret = encode_mpi(n)
mod = len(ret) % 16
if mod != 0:
ret = ((16 - mod) * '\x00') + ret
return ret
# convert a big-endian bitstring to an integer
def decode_mpi(s):
if len(s) == 0:
return 0
else:
return 256 * decode_mpi(s[:-1]) + ord(s[-1])
def sha256(string):
sh = SHA256.new()
sh.update(string)
return sh.digest()
base28_chr = "acdefghikmopqruvwxy123456789"
def sas_28x5(m_a, form_b):
sha = sha256(m_a + form_b + 'Short Authentication String')
lsb24 = decode_mpi(sha[-3:])
return base28(lsb24)
def base28(n):
if n >= 28:
return base28(n / 28) + base28_chr[n % 28]
else:
return base28_chr[n]
def random_bytes(bytes):
return os.urandom(bytes)
def generate_nonce():
return random_bytes(8)
# generate a random number between 'bottom' and 'top'
def srand(bottom, top):
# minimum number of bytes needed to represent that range
bytes = int(math.ceil(math.log(top - bottom, 256)))
# in retrospect, this is horribly inadequate.
return (decode_mpi(random_bytes(bytes)) % (top - bottom)) + bottom
# a faster version of (base ** exp) % mod
# taken from <http://lists.danga.com/pipermail/yadis/2005-September/001445.html>
def powmod(base, exp, mod):
square = base % mod
result = 1
while exp > 0:
if exp & 1: # exponent is odd
result = (result * square) % mod
square = (square * square) % mod
exp /= 2
return result

View file

@ -12,6 +12,7 @@ import os
import time
from common import dh
from common import crypto
import xmpp.c14n
import base64
@ -164,43 +165,18 @@ class EncryptedStanzaSession(StanzaSession):
kc_s = property(get_kc_s, set_kc_s)
kc_o = property(get_kc_o, set_kc_o)
# convert a large integer to a big-endian bitstring
def encode_mpi(self, n):
if n >= 256:
return self.encode_mpi(n / 256) + chr(n % 256)
else:
return chr(n)
# convert a large integer to a big-endian bitstring, padded with \x00s to
# 16 bytes
def encode_mpi_with_padding(self, n):
ret = self.encode_mpi(n)
mod = len(ret) % 16
if mod != 0:
ret = ((16 - mod) * '\x00') + ret
return ret
# convert a big-endian bitstring to an integer
def decode_mpi(self, s):
if len(s) == 0:
return 0
else:
return 256 * self.decode_mpi(s[:-1]) + ord(s[-1])
def encryptcounter(self):
self.c_s = (self.c_s + 1) % (2 ** self.n)
return self.encode_mpi_with_padding(self.c_s)
return crypto.encode_mpi_with_padding(self.c_s)
def decryptcounter(self):
self.c_o = (self.c_o + 1) % (2 ** self.n)
return self.encode_mpi_with_padding(self.c_o)
return crypto.encode_mpi_with_padding(self.c_o)
def sign(self, string):
if self.negotiated['sign_algs'] == (XmlDsig + 'rsa-sha256'):
hash = self.sha256(string)
return self.encode_mpi(gajim.pubkey.sign(hash, '')[0])
hash = crypto.sha256(string)
return crypto.encode_mpi(gajim.pubkey.sign(hash, '')[0])
def encrypt_stanza(self, stanza):
encryptable = filter(lambda x: x.getName() not in ('error', 'amp',
@ -229,31 +205,13 @@ class EncryptedStanzaSession(StanzaSession):
m_content = ''.join(map(str, c.getChildren()))
c.NT.mac = base64.b64encode(self.hmac(self.km_s, m_content + \
self.encode_mpi(old_en_counter)))
crypto.encode_mpi(old_en_counter)))
return stanza
def hmac(self, key, content):
return HMAC.new(key, content, self.hash_alg).digest()
def sha256(self, string):
sh = SHA256.new()
sh.update(string)
return sh.digest()
base28_chr = "acdefghikmopqruvwxy123456789"
def sas_28x5(self, m_a, form_b):
sha = self.sha256(m_a + form_b + 'Short Authentication String')
lsb24 = self.decode_mpi(sha[-3:])
return self.base28(lsb24)
def base28(self, n):
if n >= 28:
return self.base28(n / 28) + self.base28_chr[n % 28]
else:
return self.base28_chr[n]
def generate_initiator_keys(self, k):
return (self.hmac(k, 'Initiator Cipher Key'),
self.hmac(k, 'Initiator MAC Key'),
@ -279,13 +237,6 @@ class EncryptedStanzaSession(StanzaSession):
return self.encrypter.encrypt(encryptable)
# FIXME: use a real PRNG
def random_bytes(self, bytes):
return os.urandom(bytes)
def generate_nonce(self):
return self.random_bytes(8)
def decrypt_stanza(self, stanza):
c = stanza.getTag(name='c',
namespace='http://www.xmpp.org/extensions/xep-0200.html#ns')
@ -298,7 +249,7 @@ class EncryptedStanzaSession(StanzaSession):
received_mac = base64.b64decode(c.getTagData('mac'))
calculated_mac = self.hmac(self.km_o, macable + \
self.encode_mpi_with_padding(self.c_o))
crypto.encode_mpi_with_padding(self.c_o))
if not calculated_mac == received_mac:
raise exceptions.DecryptionError, 'bad signature'
@ -325,7 +276,7 @@ class EncryptedStanzaSession(StanzaSession):
return ["may", "mustnot"]
else:
return ["mustnot", "may"]
def get_shared_secret(self, e, y, p):
if (not 1 < e < (p - 1)):
raise exceptions.NegotiationError, "invalid DH value"
@ -341,14 +292,14 @@ class EncryptedStanzaSession(StanzaSession):
m_o = base64.b64decode(form['mac'])
id_o = base64.b64decode(form['identity'])
m_o_calculated = self.hmac(self.km_o, self.encode_mpi(self.c_o) + id_o)
m_o_calculated = self.hmac(self.km_o, crypto.encode_mpi(self.c_o) + id_o)
if m_o_calculated != m_o:
raise exceptions.NegotiationError, 'calculated m_%s differs from received m_%s' % (i_o, i_o)
if i_o == 'a' and self.sas_algs == 'sas28x5':
# XXX not necessary if there's a verified retained secret
self.sas = self.sas_28x5(m_o, self.form_s)
self.sas = crypto.sas_28x5(m_o, self.form_s)
if self.negotiated['recv_pubkey']:
plaintext = self.decrypt(id_o)
@ -363,7 +314,7 @@ class EncryptedStanzaSession(StanzaSession):
if self.negotiated['sign_algs'] == (XmlDsig + 'rsa-sha256'):
keyvalue = parsed.getTag(name='RSAKeyValue', namespace=XmlDsig)
n, e = map(lambda x: self.decode_mpi(base64.b64decode(
n, e = map(lambda x: crypto.decode_mpi(base64.b64decode(
keyvalue.getTagData(x))), ('Modulus', 'Exponent'))
eir_pubkey = RSA.construct((n,long(e)))
@ -374,14 +325,14 @@ class EncryptedStanzaSession(StanzaSession):
enc_sig = parsed.getTag(name='SignatureValue',
namespace=XmlDsig).getData()
signature = (self.decode_mpi(base64.b64decode(enc_sig)),)
signature = (crypto.decode_mpi(base64.b64decode(enc_sig)),)
else:
mac_o = self.decrypt(id_o)
pubkey_o = ''
c7l_form = self.c7lize_mac_id(form)
content = self.n_s + self.n_o + self.encode_mpi(dh_i) + pubkey_o
content = self.n_s + self.n_o + crypto.encode_mpi(dh_i) + pubkey_o
if sigmai:
self.form_o = c7l_form
@ -393,7 +344,7 @@ class EncryptedStanzaSession(StanzaSession):
mac_o_calculated = self.hmac(self.ks_o, content)
if self.negotiated['recv_pubkey']:
hash = self.sha256(mac_o_calculated)
hash = crypto.sha256(mac_o_calculated)
if not eir_pubkey.verify(hash, signature):
raise exceptions.NegotiationError, 'public key signature verification failed!'
@ -404,9 +355,10 @@ class EncryptedStanzaSession(StanzaSession):
def make_identity(self, form, dh_i):
if self.negotiated['send_pubkey']:
if self.negotiated['sign_algs'] == (XmlDsig + 'rsa-sha256'):
fields = (gajim.pubkey.n, gajim.pubkey.e)
pubkey = gajim.interface.get_pubkey(self.conn.name)
fields = (pubkey.n, pubkey.e)
cb_fields = map(lambda f: base64.b64encode(self.encode_mpi(f)), fields)
cb_fields = map(lambda f: base64.b64encode(crypto.encode_mpi(f)), fields)
pubkey_s = '<RSAKeyValue xmlns="http://www.w3.org/2000/09/xmldsig#"><Modulus>%s</Modulus><Exponent>%s</Exponent></RSAKeyValue>' % tuple(cb_fields)
else:
@ -415,7 +367,7 @@ class EncryptedStanzaSession(StanzaSession):
form_s2 = ''.join(map(lambda el: xmpp.c14n.c14n(el), form.getChildren()))
old_c_s = self.c_s
content = self.n_o + self.n_s + self.encode_mpi(dh_i) + pubkey_s + self.form_s + form_s2
content = self.n_o + self.n_s + crypto.encode_mpi(dh_i) + pubkey_s + self.form_s + form_s2
mac_s = self.hmac(self.ks_s, content)
@ -432,12 +384,12 @@ class EncryptedStanzaSession(StanzaSession):
else:
id_s = self.encrypt(mac_s)
m_s = self.hmac(self.km_s, self.encode_mpi(old_c_s) + id_s)
m_s = self.hmac(self.km_s, crypto.encode_mpi(old_c_s) + id_s)
if self.status == 'requested-e2e' and self.sas_algs == 'sas28x5':
# we're alice; check for a retained secret
# if none exists, prompt the user with the SAS
self.sas = self.sas_28x5(m_s, self.form_o)
self.sas = crypto.sas_28x5(m_s, self.form_o)
if self.sigmai:
self.check_identity()
@ -482,7 +434,7 @@ class EncryptedStanzaSession(StanzaSession):
x.addChild(node=xmpp.DataField(name='sas_algs', value='sas28x5', typ='hidden'))
x.addChild(node=xmpp.DataField(name='sign_algs', value='http://www.w3.org/2000/09/xmldsig#rsa-sha256', typ='hidden'))
self.n_s = self.generate_nonce()
self.n_s = crypto.generate_nonce()
x.addChild(node=xmpp.DataField(name='my_nonce', value=base64.b64encode(self.n_s), typ='hidden'))
@ -607,17 +559,17 @@ class EncryptedStanzaSession(StanzaSession):
bytes = int(self.n / 8)
self.n_s = self.generate_nonce()
self.n_s = crypto.generate_nonce()
self.c_o = self.decode_mpi(self.random_bytes(bytes)) # n-bit random number
self.c_o = crypto.decode_mpi(crypto.random_bytes(bytes)) # n-bit random number
self.c_s = self.c_o ^ (2 ** (self.n - 1))
self.y = self.srand(2 ** (2 * self.n - 1), p - 1)
self.d = self.powmod(g, self.y, p)
self.y = crypto.srand(2 ** (2 * self.n - 1), p - 1)
self.d = crypto.powmod(g, self.y, p)
to_add = { 'my_nonce': self.n_s,
'dhkeys': self.encode_mpi(self.d),
'counter': self.encode_mpi(self.c_o),
'dhkeys': crypto.encode_mpi(self.d),
'counter': crypto.encode_mpi(self.c_o),
'nonce': self.n_o }
for name in to_add:
@ -688,7 +640,7 @@ class EncryptedStanzaSession(StanzaSession):
result = xmpp.DataForm(typ='result')
self.c_s = self.decode_mpi(base64.b64decode(form['counter']))
self.c_s = crypto.decode_mpi(base64.b64decode(form['counter']))
self.c_o = self.c_s ^ (2 ** (self.n - 1))
self.n_o = base64.b64decode(form['my_nonce'])
@ -698,7 +650,7 @@ class EncryptedStanzaSession(StanzaSession):
x = self.xes[mod_p]
e = self.es[mod_p]
self.d = self.decode_mpi(base64.b64decode(form['dhkeys']))
self.d = crypto.decode_mpi(base64.b64decode(form['dhkeys']))
self.k = self.get_shared_secret(self.d, x, p)
@ -720,7 +672,7 @@ class EncryptedStanzaSession(StanzaSession):
rshashes = [base64.b64encode(rshash) for rshash in rshashes]
result.addChild(node=xmpp.DataField(name='rshashes', value=rshashes))
result.addChild(node=xmpp.DataField(name='dhkeys', value=base64.b64encode(self.encode_mpi(e))))
result.addChild(node=xmpp.DataField(name='dhkeys', value=base64.b64encode(crypto.encode_mpi(e))))
self.form_o = ''.join(map(lambda el: xmpp.c14n.c14n(el), form.getChildren()))
@ -752,10 +704,10 @@ class EncryptedStanzaSession(StanzaSession):
assert field in form.asDict(), "alice's form didn't have a %s field" % field
# 4.5.1 generating provisory session keys
e = self.decode_mpi(base64.b64decode(form['dhkeys']))
e = crypto.decode_mpi(base64.b64decode(form['dhkeys']))
p = dh.primes[self.modp]
if self.sha256(self.encode_mpi(e)) != self.negotiated['He']:
if crypto.sha256(crypto.encode_mpi(e)) != self.negotiated['He']:
raise exceptions.NegotiationError, 'SHA256(e) != He'
k = self.get_shared_secret(e, self.y, p)
@ -778,19 +730,20 @@ class EncryptedStanzaSession(StanzaSession):
srs = secret
break
# other shared secret, we haven't got one.
# other shared secret
# (we're not using one)
oss = ''
k = self.sha256(k + srs + oss)
k = crypto.sha256(k + srs + oss)
self.kc_s, self.km_s, self.ks_s = self.generate_responder_keys(k)
self.kc_o, self.km_o, self.ks_o = self.generate_initiator_keys(k)
# 4.5.5
if srs:
srshash = self.hmac(srs, 'Shared Retained Secret')
srshash = crypto.hmac(srs, 'Shared Retained Secret')
else:
srshash = self.random_bytes(32)
srshash = crypto.random_bytes(32)
x.addChild(node=xmpp.DataField(name='FORM_TYPE', value='urn:xmpp:ssn'))
x.addChild(node=xmpp.DataField(name='nonce', value=base64.b64encode(self.n_o)))
@ -823,7 +776,7 @@ class EncryptedStanzaSession(StanzaSession):
break
oss = ''
k = self.sha256(self.k + srs + oss)
k = crypto.sha256(self.k + srs + oss)
del self.k
self.do_retained_secret(k, srs)
@ -857,14 +810,6 @@ class EncryptedStanzaSession(StanzaSession):
self.check_identity()
gajim.interface.save_new_secret(account, bjid, new_srs)
# generate a random number between 'bottom' and 'top'
def srand(self, bottom, top):
# minimum number of bytes needed to represent that range
bytes = int(math.ceil(math.log(top - bottom, 256)))
# in retrospect, this is horribly inadequate.
return (self.decode_mpi(self.random_bytes(bytes)) % (top - bottom)) + bottom
def make_dhfield(self, modp_options, sigmai):
dhs = []
@ -872,39 +817,24 @@ class EncryptedStanzaSession(StanzaSession):
p = dh.primes[modp]
g = dh.generators[modp]
x = self.srand(2 ** (2 * self.n - 1), p - 1)
x = crypto.srand(2 ** (2 * self.n - 1), p - 1)
# XXX this may be a source of performance issues
e = self.powmod(g, x, p)
e = crypto.powmod(g, x, p)
self.xes[modp] = x
self.es[modp] = e
if sigmai:
dhs.append(base64.b64encode(self.encode_mpi(e)))
dhs.append(base64.b64encode(crypto.encode_mpi(e)))
name = 'dhkeys'
else:
He = self.sha256(self.encode_mpi(e))
He = crypto.sha256(crypto.encode_mpi(e))
dhs.append(base64.b64encode(He))
name = 'dhhashes'
return xmpp.DataField(name=name, typ='hidden', value=dhs)
# a faster version of (base ** exp) % mod
# taken from <http://lists.danga.com/pipermail/yadis/2005-September/001445.html>
def powmod(self, base, exp, mod):
square = base % mod
result = 1
while exp > 0:
if exp & 1: # exponent is odd
result = (result * square) % mod
square = (square * square) % mod
exp /= 2
return result
def terminate_e2e(self):
self.terminate()