coding standards

This commit is contained in:
Yann Leboulanger 2008-09-17 19:04:17 +00:00
parent c876e8f65e
commit 6df6e9ddf1
1 changed files with 128 additions and 80 deletions

View File

@ -73,10 +73,12 @@ class StanzaSession(object):
def remove_events(self, types):
any_removed = False
for event in gajim.events.get_events(self.conn.name, self.jid, types=types):
for event in gajim.events.get_events(self.conn.name, self.jid,
types=types):
# the event wasn't in this session
if (event.type_ == 'chat' and event.parameters[8] != self) or \
(event.type_ == 'printed_chat' and event.parameters[0].session != self):
(event.type_ == 'printed_chat' and event.parameters[0].session != \
self):
continue
# events.remove_events returns True when there were no events
@ -89,7 +91,8 @@ class StanzaSession(object):
return any_removed
def generate_thread_id(self):
return "".join([random.choice(string.ascii_letters) for x in xrange(0,32)])
return ''.join([random.choice(string.ascii_letters) for x in xrange(0,
32)])
def send(self, msg):
if self.thread_id:
@ -101,7 +104,7 @@ class StanzaSession(object):
if isinstance(msg, xmpp.Message):
self.last_send = time.time()
def reject_negotiation(self, body = None):
def reject_negotiation(self, body=None):
msg = xmpp.Message()
feature = msg.NT.feature
feature.setNamespace(xmpp.NS_FEATURE)
@ -120,7 +123,8 @@ class StanzaSession(object):
self.cancelled_negotiation()
def cancelled_negotiation(self):
'''A negotiation has been cancelled, so reset this session to its default state.'''
'''A negotiation has been cancelled, so reset this session to its default
state.'''
if self.control:
self.control.on_cancel_session_negotiation()
@ -185,8 +189,8 @@ if gajim.HAVE_PYCRYPTO:
# handle_session_negotiation method.
class EncryptedStanzaSession(StanzaSession):
def __init__(self, conn, jid, thread_id, type = 'chat'):
StanzaSession.__init__(self, conn, jid, thread_id, type = 'chat')
def __init__(self, conn, jid, thread_id, type='chat'):
StanzaSession.__init__(self, conn, jid, thread_id, type='chat')
self.xes = {}
self.es = {}
@ -286,13 +290,13 @@ class EncryptedStanzaSession(StanzaSession):
def generate_initiator_keys(self, k):
return (self.hmac(k, 'Initiator Cipher Key'),
self.hmac(k, 'Initiator MAC Key'),
self.hmac(k, 'Initiator SIGMA Key') )
self.hmac(k, 'Initiator MAC Key'),
self.hmac(k, 'Initiator SIGMA Key'))
def generate_responder_keys(self, k):
return (self.hmac(k, 'Responder Cipher Key'),
self.hmac(k, 'Responder MAC Key'),
self.hmac(k, 'Responder SIGMA Key') )
self.hmac(k, 'Responder MAC Key'),
self.hmac(k, 'Responder SIGMA Key'))
def compress(self, plaintext):
if self.compression is None:
@ -336,7 +340,8 @@ class EncryptedStanzaSession(StanzaSession):
try:
parsed = xmpp.Node(node='<node>' + plaintext + '</node>')
except:
raise exceptions.DecryptionError, 'decrypted <data/> not parseable as XML'
raise exceptions.DecryptionError,
'decrypted <data/> not parseable as XML'
for child in parsed.getChildren():
stanza.addChild(node=child)
@ -348,13 +353,13 @@ class EncryptedStanzaSession(StanzaSession):
def logging_preference(self):
if gajim.config.get('log_encrypted_sessions'):
return ["may", "mustnot"]
return ['may', 'mustnot']
else:
return ["mustnot", "may"]
return ['mustnot', 'may']
def get_shared_secret(self, e, y, p):
if (not 1 < e < (p - 1)):
raise exceptions.NegotiationError, "invalid DH value"
raise exceptions.NegotiationError, 'invalid DH value'
return crypto.sha256(crypto.encode_mpi(crypto.powmod(e, y, p)))
@ -370,7 +375,8 @@ class EncryptedStanzaSession(StanzaSession):
m_o_calculated = self.hmac(self.km_o, crypto.encode_mpi(self.c_o) + id_o)
if m_o_calculated != m_o:
raise exceptions.NegotiationError, 'calculated m_%s differs from received m_%s' % (i_o, i_o)
raise exceptions.NegotiationError,
'calculated m_%s differs from received m_%s' % (i_o, i_o)
if i_o == 'a' and self.sas_algs == 'sas28x5':
# we don't need to calculate this if there's a verified retained secret
@ -401,7 +407,7 @@ class EncryptedStanzaSession(StanzaSession):
enc_sig = parsed.getTag(name='SignatureValue',
namespace=XmlDsig).getData()
signature = (crypto.decode_mpi(base64.b64decode(enc_sig)),)
signature = (crypto.decode_mpi(base64.b64decode(enc_sig)), )
else:
mac_o = self.decrypt(id_o)
pubkey_o = ''
@ -423,10 +429,12 @@ class EncryptedStanzaSession(StanzaSession):
hash = crypto.sha256(mac_o_calculated)
if not eir_pubkey.verify(hash, signature):
raise exceptions.NegotiationError, 'public key signature verification failed!'
raise exceptions.NegotiationError,
'public key signature verification failed!'
elif mac_o_calculated != mac_o:
raise exceptions.NegotiationError, 'calculated mac_%s differs from received mac_%s' % (i_o, i_o)
raise exceptions.NegotiationError,
'calculated mac_%s differs from received mac_%s' % (i_o, i_o)
def make_identity(self, form, dh_i):
if self.negotiated['send_pubkey']:
@ -434,23 +442,28 @@ class EncryptedStanzaSession(StanzaSession):
pubkey = secrets.secrets().my_pubkey(self.conn.name)
fields = (pubkey.n, pubkey.e)
cb_fields = map(lambda f: base64.b64encode(crypto.encode_mpi(f)), fields)
cb_fields = map(lambda f: base64.b64encode(crypto.encode_mpi(f)),
fields)
pubkey_s = '<RSAKeyValue xmlns="http://www.w3.org/2000/09/xmldsig#"><Modulus>%s</Modulus><Exponent>%s</Exponent></RSAKeyValue>' % tuple(cb_fields)
pubkey_s = '<RSAKeyValue xmlns="http://www.w3.org/2000/09/xmldsig#"'
'><Modulus>%s</Modulus><Exponent>%s</Exponent></RSAKeyValue>' % \
tuple(cb_fields)
else:
pubkey_s = ''
form_s2 = ''.join(map(lambda el: xmpp.c14n.c14n(el), form.getChildren()))
old_c_s = self.c_s
content = self.n_o + self.n_s + crypto.encode_mpi(dh_i) + pubkey_s + self.form_s + form_s2
content = self.n_o + self.n_s + crypto.encode_mpi(dh_i) + pubkey_s + \
self.form_s + form_s2
mac_s = self.hmac(self.ks_s, content)
if self.negotiated['send_pubkey']:
signature = self.sign(mac_s)
sign_s = '<SignatureValue xmlns="http://www.w3.org/2000/09/xmldsig#">%s</SignatureValue>' % base64.b64encode(signature)
sign_s = '<SignatureValue xmlns="http://www.w3.org/2000/09/xmldsig#">'
'%s</SignatureValue>' % base64.b64encode(signature)
if self.negotiated['send_pubkey'] == 'hash':
b64ed = base64.b64encode(self.hash(pubkey_s))
@ -471,8 +484,8 @@ class EncryptedStanzaSession(StanzaSession):
# XXX save retained secret?
self.check_identity(lambda : ())
return (xmpp.DataField(name='identity', value=base64.b64encode(id_s)), \
xmpp.DataField(name='mac', value=base64.b64encode(m_s)))
return (xmpp.DataField(name='identity', value=base64.b64encode(id_s)),
xmpp.DataField(name='mac', value=base64.b64encode(m_s)))
def negotiate_e2e(self, sigmai):
self.negotiated = {}
@ -483,41 +496,58 @@ class EncryptedStanzaSession(StanzaSession):
x = xmpp.DataForm(typ='form')
x.addChild(node=xmpp.DataField(name='FORM_TYPE', value='urn:xmpp:ssn', typ='hidden'))
x.addChild(node=xmpp.DataField(name='accept', value='1', typ='boolean', required=True))
x.addChild(node=xmpp.DataField(name='FORM_TYPE', value='urn:xmpp:ssn',
typ='hidden'))
x.addChild(node=xmpp.DataField(name='accept', value='1', typ='boolean',
required=True))
# this field is incorrectly called 'otr' in XEPs 0116 and 0217
x.addChild(node=xmpp.DataField(name='logging', typ='list-single', options=self.logging_preference(), required=True))
x.addChild(node=xmpp.DataField(name='logging', typ='list-single',
options=self.logging_preference(), required=True))
# unsupported options: 'disabled', 'enabled'
x.addChild(node=xmpp.DataField(name='disclosure', typ='list-single', options=['never'], required=True))
x.addChild(node=xmpp.DataField(name='security', typ='list-single', options=['e2e'], required=True))
x.addChild(node=xmpp.DataField(name='crypt_algs', value='aes128-ctr', typ='hidden'))
x.addChild(node=xmpp.DataField(name='hash_algs', value='sha256', typ='hidden'))
x.addChild(node=xmpp.DataField(name='compress', value='none', typ='hidden'))
x.addChild(node=xmpp.DataField(name='disclosure', typ='list-single',
options=['never'], required=True))
x.addChild(node=xmpp.DataField(name='security', typ='list-single',
options=['e2e'], required=True))
x.addChild(node=xmpp.DataField(name='crypt_algs', value='aes128-ctr',
typ='hidden'))
x.addChild(node=xmpp.DataField(name='hash_algs', value='sha256',
typ='hidden'))
x.addChild(node=xmpp.DataField(name='compress', value='none',
typ='hidden'))
# unsupported options: 'iq', 'presence'
x.addChild(node=xmpp.DataField(name='stanzas', typ='list-multi', options=['message']))
x.addChild(node=xmpp.DataField(name='stanzas', typ='list-multi',
options=['message']))
x.addChild(node=xmpp.DataField(name='init_pubkey', options=['none', 'key', 'hash'], typ='list-single'))
x.addChild(node=xmpp.DataField(name='init_pubkey', options=['none', 'key',
'hash'], typ='list-single'))
# XXX store key, use hash
x.addChild(node=xmpp.DataField(name='resp_pubkey', options=['none', 'key'], typ='list-single'))
x.addChild(node=xmpp.DataField(name='resp_pubkey', options=['none',
'key'], typ='list-single'))
x.addChild(node=xmpp.DataField(name='ver', value='1.0', typ='hidden'))
x.addChild(node=xmpp.DataField(name='rekey_freq', value='4294967295', typ='hidden'))
x.addChild(node=xmpp.DataField(name='rekey_freq', value='4294967295',
typ='hidden'))
x.addChild(node=xmpp.DataField(name='sas_algs', value='sas28x5', typ='hidden'))
x.addChild(node=xmpp.DataField(name='sign_algs', value='http://www.w3.org/2000/09/xmldsig#rsa-sha256', typ='hidden'))
x.addChild(node=xmpp.DataField(name='sas_algs', value='sas28x5',
typ='hidden'))
x.addChild(node=xmpp.DataField(name='sign_algs',
value='http://www.w3.org/2000/09/xmldsig#rsa-sha256', typ='hidden'))
self.n_s = crypto.generate_nonce()
x.addChild(node=xmpp.DataField(name='my_nonce', value=base64.b64encode(self.n_s), typ='hidden'))
x.addChild(node=xmpp.DataField(name='my_nonce',
value=base64.b64encode(self.n_s), typ='hidden'))
modp_options = [ int(g) for g in gajim.config.get('esession_modp').split(',') ]
modp_options = [ int(g) for g in gajim.config.get('esession_modp').split(
',') ]
x.addChild(node=xmpp.DataField(name='modp', typ='list-single', options=map(lambda x: [ None, x ], modp_options)))
x.addChild(node=xmpp.DataField(name='modp', typ='list-single',
options=map(lambda x: [ None, x ], modp_options)))
x.addChild(node=self.make_dhfield(modp_options, sigmai))
self.sigmai = sigmai
@ -536,16 +566,10 @@ class EncryptedStanzaSession(StanzaSession):
not_acceptable = []
ask_user = {}
fixed = { 'disclosure': 'never',
'security': 'e2e',
'crypt_algs': 'aes128-ctr',
'hash_algs': 'sha256',
'compress': 'none',
'stanzas': 'message',
'init_pubkey': 'none',
'resp_pubkey': 'none',
'ver': '1.0',
'sas_algs': 'sas28x5' }
fixed = { 'disclosure': 'never', 'security': 'e2e',
'crypt_algs': 'aes128-ctr', 'hash_algs': 'sha256', 'compress': 'none',
'stanzas': 'message', 'init_pubkey': 'none', 'resp_pubkey': 'none',
'ver': '1.0', 'sas_algs': 'sas28x5' }
self.encryptable_stanzas = ['message']
@ -554,7 +578,8 @@ class EncryptedStanzaSession(StanzaSession):
self.hash_alg = SHA256
self.compression = None
for name, field in map(lambda name: (name, form.getField(name)), form.asDict().keys()):
for name, field in map(lambda name: (name, form.getField(name)),
form.asDict().keys()):
options = map(lambda x: x[1], field.getOptions())
values = field.getValues()
@ -598,7 +623,8 @@ class EncryptedStanzaSession(StanzaSession):
if (XmlDsig + 'rsa-sha256') in options:
negotiated['sign_algs'] = XmlDsig + 'rsa-sha256'
else:
# XXX some things are handled elsewhere, some things are not-implemented
# XXX some things are handled elsewhere, some things are
# not-implemented
pass
return (negotiated, not_acceptable, ask_user)
@ -632,28 +658,31 @@ class EncryptedStanzaSession(StanzaSession):
self.n_o = base64.b64decode(form['my_nonce'])
dhhashes = form.getField('dhhashes').getValues()
self.negotiated['He'] = base64.b64decode(dhhashes[group_order].encode("utf8"))
self.negotiated['He'] = base64.b64decode(dhhashes[group_order].encode(
'utf8'))
bytes = int(self.n / 8)
self.n_s = crypto.generate_nonce()
self.c_o = crypto.decode_mpi(crypto.random_bytes(bytes)) # n-bit random number
# n-bit random number
self.c_o = crypto.decode_mpi(crypto.random_bytes(bytes))
self.c_s = self.c_o ^ (2 ** (self.n - 1))
self.y = crypto.srand(2 ** (2 * self.n - 1), p - 1)
self.d = crypto.powmod(g, self.y, p)
to_add = { 'my_nonce': self.n_s,
'dhkeys': crypto.encode_mpi(self.d),
'counter': crypto.encode_mpi(self.c_o),
'nonce': self.n_o }
to_add = {'my_nonce': self.n_s,
'dhkeys': crypto.encode_mpi(self.d),
'counter': crypto.encode_mpi(self.c_o),
'nonce': self.n_o}
for name in to_add:
b64ed = base64.b64encode(to_add[name])
x.addChild(node=xmpp.DataField(name=name, value=b64ed))
self.form_o = ''.join(map(lambda el: xmpp.c14n.c14n(el), form.getChildren()))
self.form_o = ''.join(map(lambda el: xmpp.c14n.c14n(el),
form.getChildren()))
self.form_s = ''.join(map(lambda el: xmpp.c14n.c14n(el), x.getChildren()))
self.status = 'responded-e2e'
@ -687,14 +716,15 @@ class EncryptedStanzaSession(StanzaSession):
else:
negotiated['logging'] = self.logging_preference()[0]
for r,a in (('recv_pubkey', 'resp_pubkey'), ('send_pubkey', 'init_pubkey')):
for r,a in (('recv_pubkey', 'resp_pubkey'), ('send_pubkey',
'init_pubkey')):
negotiated[r] = None
if a in form.asDict() and form[a] in ('key', 'hash'):
negotiated[r] = form[a]
if 'sign_algs' in form.asDict():
if form['sign_algs'] in (XmlDsig + 'rsa-sha256',):
if form['sign_algs'] in (XmlDsig + 'rsa-sha256', ):
negotiated['sign_algs'] = form['sign_algs']
else:
not_acceptable.append(form['sign_algs'])
@ -731,9 +761,11 @@ class EncryptedStanzaSession(StanzaSession):
self.k = self.get_shared_secret(self.d, x, p)
result.addChild(node=xmpp.DataField(name='FORM_TYPE', value='urn:xmpp:ssn'))
result.addChild(node=xmpp.DataField(name='FORM_TYPE',
value='urn:xmpp:ssn'))
result.addChild(node=xmpp.DataField(name='accept', value='1'))
result.addChild(node=xmpp.DataField(name='nonce', value=base64.b64encode(self.n_o)))
result.addChild(node=xmpp.DataField(name='nonce',
value=base64.b64encode(self.n_o)))
self.kc_s, self.km_s, self.ks_s = self.generate_initiator_keys(self.k)
@ -741,7 +773,8 @@ class EncryptedStanzaSession(StanzaSession):
self.kc_o, self.km_o, self.ks_o = self.generate_responder_keys(self.k)
self.verify_identity(form, self.d, True, 'b')
else:
srses = secrets.secrets().retained_secrets(self.conn.name, self.jid.getStripped())
srses = secrets.secrets().retained_secrets(self.conn.name,
self.jid.getStripped())
rshashes = [self.hmac(self.n_s, rs) for (rs,v) in srses]
if not rshashes:
@ -751,11 +784,14 @@ class EncryptedStanzaSession(StanzaSession):
rshashes = [base64.b64encode(rshash) for rshash in rshashes]
result.addChild(node=xmpp.DataField(name='rshashes', value=rshashes))
result.addChild(node=xmpp.DataField(name='dhkeys', value=base64.b64encode(crypto.encode_mpi(e))))
result.addChild(node=xmpp.DataField(name='dhkeys',
value=base64.b64encode(crypto.encode_mpi(e))))
self.form_o = ''.join(map(lambda el: xmpp.c14n.c14n(el), form.getChildren()))
self.form_o = ''.join(map(lambda el: xmpp.c14n.c14n(el),
form.getChildren()))
# MUST securely destroy K unless it will be used later to generate the final shared secret
# MUST securely destroy K unless it will be used later to generate the
# final shared secret
for datafield in self.make_identity(result, e):
result.addChild(node=datafield)
@ -779,7 +815,8 @@ class EncryptedStanzaSession(StanzaSession):
x = xmpp.DataForm(typ='result')
for field in ('nonce', 'dhkeys', 'rshashes', 'identity', 'mac'):
assert field in form.asDict(), "alice's form didn't have a %s field" % field
assert field in form.asDict(), "alice's form didn't have a %s field" \
% field
# 4.5.1 generating provisory session keys
e = crypto.decode_mpi(base64.b64decode(form['dhkeys']))
@ -800,8 +837,10 @@ class EncryptedStanzaSession(StanzaSession):
srs = ''
srses = secrets.secrets().retained_secrets(self.conn.name, self.jid.getStripped())
rshashes = [base64.b64decode(rshash) for rshash in form.getField('rshashes').getValues()]
srses = secrets.secrets().retained_secrets(self.conn.name,
self.jid.getStripped())
rshashes = [base64.b64decode(rshash) for rshash in form.getField(
'rshashes').getValues()]
for (secret, verified) in srses:
if self.hmac(self.n_o, secret) in rshashes:
@ -824,8 +863,10 @@ class EncryptedStanzaSession(StanzaSession):
srshash = crypto.random_bytes(32)
x.addChild(node=xmpp.DataField(name='FORM_TYPE', value='urn:xmpp:ssn'))
x.addChild(node=xmpp.DataField(name='nonce', value=base64.b64encode(self.n_o)))
x.addChild(node=xmpp.DataField(name='srshash', value=base64.b64encode(srshash)))
x.addChild(node=xmpp.DataField(name='nonce', value=base64.b64encode(
self.n_o)))
x.addChild(node=xmpp.DataField(name='srshash', value=base64.b64encode(
srshash)))
for datafield in self.make_identity(x, self.d):
x.addChild(node=datafield)
@ -847,7 +888,8 @@ class EncryptedStanzaSession(StanzaSession):
def final_steps_alice(self, form):
srs = ''
srses = secrets.secrets().retained_secrets(self.conn.name, self.jid.getStripped())
srses = secrets.secrets().retained_secrets(self.conn.name,
self.jid.getStripped())
srshash = base64.b64decode(form['srshash'])
@ -869,7 +911,8 @@ class EncryptedStanzaSession(StanzaSession):
# 4.6.2 Verifying Bob's Identity
self.verify_identity(form, self.d, False, 'b')
# Note: If Alice discovers an error then she SHOULD ignore any encrypted content she received in the stanza.
# Note: If Alice discovers an error then she SHOULD ignore any encrypted
# content she received in the stanza.
if self.negotiated['logging'] == 'mustnot':
self.loggable = False
@ -881,7 +924,9 @@ class EncryptedStanzaSession(StanzaSession):
self.control.print_esession_details()
def do_retained_secret(self, k, old_srs):
'''calculate the new retained secret. determine if the user needs to check the remote party's identity. set up callbacks for when the identity has been verified.'''
'''calculate the new retained secret. determine if the user needs to check
the remote party's identity. set up callbacks for when the identity has
been verified.'''
new_srs = self.hmac(k, 'New Retained Secret')
self.srs = new_srs
@ -899,16 +944,19 @@ class EncryptedStanzaSession(StanzaSession):
self.verified_identity = True
else:
# had a secret, but it wasn't verified.
secrets.secrets().replace_srs(account, bjid, old_srs, new_srs, False)
secrets.secrets().replace_srs(account, bjid, old_srs, new_srs,
False)
else:
# we don't even have an SRS
secrets.secrets().save_new_srs(account, bjid, new_srs, False)
def _verified_srs_cb(self):
secrets.secrets().replace_srs(self.conn.name, self.jid.getStripped(), self.srs, self.srs, True)
secrets.secrets().replace_srs(self.conn.name, self.jid.getStripped(),
self.srs, self.srs, True)
def _unverified_srs_cb(self):
secrets.secrets().replace_srs(self.conn.name, self.jid.getStripped(), self.srs, self.srs, False)
secrets.secrets().replace_srs(self.conn.name, self.jid.getStripped(),
self.srs, self.srs, False)
def make_dhfield(self, modp_options, sigmai):
dhs = []