Simplify check_X509.py
as pyasn1 and pyOpenSSL are required now
This commit is contained in:
parent
6d10a5e2cf
commit
749a01a276
|
@ -1,183 +1,171 @@
|
||||||
import logging
|
import logging
|
||||||
log = logging.getLogger('gajim.c.check_X509')
|
log = logging.getLogger('gajim.c.check_X509')
|
||||||
|
|
||||||
try:
|
import OpenSSL.SSL
|
||||||
import OpenSSL.SSL
|
import OpenSSL.crypto
|
||||||
import OpenSSL.crypto
|
|
||||||
ver = OpenSSL.__version__
|
|
||||||
if ver < '0.12':
|
|
||||||
raise ImportError
|
|
||||||
from pyasn1.type import univ, constraint, char, namedtype, tag
|
|
||||||
from pyasn1.codec.der.decoder import decode
|
|
||||||
from gajim.common.helpers import prep, InvalidFormat
|
|
||||||
|
|
||||||
MAX = 64
|
from pyasn1.type import univ, constraint, char, namedtype, tag
|
||||||
oid_xmppaddr = '1.3.6.1.5.5.7.8.5'
|
from pyasn1.codec.der.decoder import decode
|
||||||
oid_dnssrv = '1.3.6.1.5.5.7.8.7'
|
from gajim.common.helpers import prep, InvalidFormat
|
||||||
|
|
||||||
|
MAX = 64
|
||||||
|
oid_xmppaddr = '1.3.6.1.5.5.7.8.5'
|
||||||
|
oid_dnssrv = '1.3.6.1.5.5.7.8.7'
|
||||||
|
|
||||||
|
|
||||||
|
class DirectoryString(univ.Choice):
|
||||||
|
componentType = namedtype.NamedTypes(
|
||||||
|
namedtype.NamedType(
|
||||||
|
'teletexString', char.TeletexString().subtype(
|
||||||
|
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
||||||
|
namedtype.NamedType(
|
||||||
|
'printableString', char.PrintableString().subtype(
|
||||||
|
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
||||||
|
namedtype.NamedType(
|
||||||
|
'universalString', char.UniversalString().subtype(
|
||||||
|
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
||||||
|
namedtype.NamedType(
|
||||||
|
'utf8String', char.UTF8String().subtype(
|
||||||
|
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
||||||
|
namedtype.NamedType(
|
||||||
|
'bmpString', char.BMPString().subtype(
|
||||||
|
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
||||||
|
namedtype.NamedType(
|
||||||
|
'ia5String', char.IA5String().subtype(
|
||||||
|
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
||||||
|
namedtype.NamedType(
|
||||||
|
'gString', univ.OctetString().subtype(
|
||||||
|
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
||||||
|
)
|
||||||
|
|
||||||
class DirectoryString(univ.Choice):
|
class AttributeValue(DirectoryString):
|
||||||
componentType = namedtype.NamedTypes(
|
pass
|
||||||
namedtype.NamedType(
|
|
||||||
'teletexString', char.TeletexString().subtype(
|
|
||||||
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
|
||||||
namedtype.NamedType(
|
|
||||||
'printableString', char.PrintableString().subtype(
|
|
||||||
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
|
||||||
namedtype.NamedType(
|
|
||||||
'universalString', char.UniversalString().subtype(
|
|
||||||
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
|
||||||
namedtype.NamedType(
|
|
||||||
'utf8String', char.UTF8String().subtype(
|
|
||||||
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
|
||||||
namedtype.NamedType(
|
|
||||||
'bmpString', char.BMPString().subtype(
|
|
||||||
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
|
||||||
namedtype.NamedType(
|
|
||||||
'ia5String', char.IA5String().subtype(
|
|
||||||
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
|
||||||
namedtype.NamedType(
|
|
||||||
'gString', univ.OctetString().subtype(
|
|
||||||
subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
|
|
||||||
)
|
|
||||||
|
|
||||||
class AttributeValue(DirectoryString):
|
class AttributeType(univ.ObjectIdentifier):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class AttributeType(univ.ObjectIdentifier):
|
class AttributeTypeAndValue(univ.Sequence):
|
||||||
pass
|
componentType = namedtype.NamedTypes(
|
||||||
|
namedtype.NamedType('type', AttributeType()),
|
||||||
|
namedtype.NamedType('value', AttributeValue()),
|
||||||
|
)
|
||||||
|
|
||||||
class AttributeTypeAndValue(univ.Sequence):
|
class RelativeDistinguishedName(univ.SetOf):
|
||||||
componentType = namedtype.NamedTypes(
|
componentType = AttributeTypeAndValue()
|
||||||
namedtype.NamedType('type', AttributeType()),
|
|
||||||
namedtype.NamedType('value', AttributeValue()),
|
|
||||||
)
|
|
||||||
|
|
||||||
class RelativeDistinguishedName(univ.SetOf):
|
class RDNSequence(univ.SequenceOf):
|
||||||
componentType = AttributeTypeAndValue()
|
componentType = RelativeDistinguishedName()
|
||||||
|
|
||||||
class RDNSequence(univ.SequenceOf):
|
class Name(univ.Choice):
|
||||||
componentType = RelativeDistinguishedName()
|
componentType = namedtype.NamedTypes(
|
||||||
|
namedtype.NamedType('', RDNSequence()),
|
||||||
|
)
|
||||||
|
|
||||||
class Name(univ.Choice):
|
class GeneralName(univ.Choice):
|
||||||
componentType = namedtype.NamedTypes(
|
componentType = namedtype.NamedTypes(
|
||||||
namedtype.NamedType('', RDNSequence()),
|
namedtype.NamedType('otherName', univ.Sequence().subtype(
|
||||||
)
|
implicitTag=tag.Tag(tag.tagClassContext,
|
||||||
|
tag.tagFormatConstructed, 0x0))),
|
||||||
|
namedtype.NamedType('rfc822Name', char.IA5String().subtype(
|
||||||
|
implicitTag=tag.Tag(tag.tagClassContext,
|
||||||
|
tag.tagFormatSimple, 1))),
|
||||||
|
namedtype.NamedType('dNSName', char.IA5String().subtype(
|
||||||
|
implicitTag=tag.Tag(tag.tagClassContext,
|
||||||
|
tag.tagFormatSimple, 2))),
|
||||||
|
namedtype.NamedType('x400Address', univ.Sequence().subtype(
|
||||||
|
implicitTag=tag.Tag(tag.tagClassContext,
|
||||||
|
tag.tagFormatConstructed, 0x3))),
|
||||||
|
namedtype.NamedType('directoryName', Name().subtype(
|
||||||
|
implicitTag=tag.Tag(tag.tagClassContext,
|
||||||
|
tag.tagFormatConstructed, 0x4))),
|
||||||
|
namedtype.NamedType('ediPartyName', univ.Sequence().subtype(
|
||||||
|
implicitTag=tag.Tag(tag.tagClassContext,
|
||||||
|
tag.tagFormatConstructed, 0x5))),
|
||||||
|
namedtype.NamedType('uniformResourceIdentifier',
|
||||||
|
char.IA5String().subtype(
|
||||||
|
implicitTag=tag.Tag(tag.tagClassContext,
|
||||||
|
tag.tagFormatSimple, 6))),
|
||||||
|
namedtype.NamedType('iPAddress', univ.OctetString().subtype(
|
||||||
|
implicitTag=tag.Tag(tag.tagClassContext,
|
||||||
|
tag.tagFormatSimple, 7))),
|
||||||
|
namedtype.NamedType('registeredID', univ.ObjectIdentifier().subtype(
|
||||||
|
implicitTag=tag.Tag(tag.tagClassContext,
|
||||||
|
tag.tagFormatSimple, 8))),
|
||||||
|
)
|
||||||
|
|
||||||
class GeneralName(univ.Choice):
|
class GeneralNames(univ.SequenceOf):
|
||||||
componentType = namedtype.NamedTypes(
|
componentType = GeneralName()
|
||||||
namedtype.NamedType('otherName', univ.Sequence().subtype(
|
sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
|
||||||
implicitTag=tag.Tag(tag.tagClassContext,
|
|
||||||
tag.tagFormatConstructed, 0x0))),
|
|
||||||
namedtype.NamedType('rfc822Name', char.IA5String().subtype(
|
|
||||||
implicitTag=tag.Tag(tag.tagClassContext,
|
|
||||||
tag.tagFormatSimple, 1))),
|
|
||||||
namedtype.NamedType('dNSName', char.IA5String().subtype(
|
|
||||||
implicitTag=tag.Tag(tag.tagClassContext,
|
|
||||||
tag.tagFormatSimple, 2))),
|
|
||||||
namedtype.NamedType('x400Address', univ.Sequence().subtype(
|
|
||||||
implicitTag=tag.Tag(tag.tagClassContext,
|
|
||||||
tag.tagFormatConstructed, 0x3))),
|
|
||||||
namedtype.NamedType('directoryName', Name().subtype(
|
|
||||||
implicitTag=tag.Tag(tag.tagClassContext,
|
|
||||||
tag.tagFormatConstructed, 0x4))),
|
|
||||||
namedtype.NamedType('ediPartyName', univ.Sequence().subtype(
|
|
||||||
implicitTag=tag.Tag(tag.tagClassContext,
|
|
||||||
tag.tagFormatConstructed, 0x5))),
|
|
||||||
namedtype.NamedType('uniformResourceIdentifier',
|
|
||||||
char.IA5String().subtype(
|
|
||||||
implicitTag=tag.Tag(tag.tagClassContext,
|
|
||||||
tag.tagFormatSimple, 6))),
|
|
||||||
namedtype.NamedType('iPAddress', univ.OctetString().subtype(
|
|
||||||
implicitTag=tag.Tag(tag.tagClassContext,
|
|
||||||
tag.tagFormatSimple, 7))),
|
|
||||||
namedtype.NamedType('registeredID', univ.ObjectIdentifier().subtype(
|
|
||||||
implicitTag=tag.Tag(tag.tagClassContext,
|
|
||||||
tag.tagFormatSimple, 8))),
|
|
||||||
)
|
|
||||||
|
|
||||||
class GeneralNames(univ.SequenceOf):
|
def _parse_asn1(asn1):
|
||||||
componentType = GeneralName()
|
obj = decode(asn1, asn1Spec=GeneralNames())[0]
|
||||||
sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
|
r = {}
|
||||||
|
for o in obj:
|
||||||
|
name = o.getName()
|
||||||
|
if name == 'dNSName':
|
||||||
|
if name not in r:
|
||||||
|
r[name] = []
|
||||||
|
r[name].append(str(o.getComponent()))
|
||||||
|
if name == 'otherName':
|
||||||
|
if name not in r:
|
||||||
|
r[name] = {}
|
||||||
|
tag = str(tuple(o.getComponent())[0])
|
||||||
|
val = str(tuple(o.getComponent())[1])
|
||||||
|
if tag not in r[name]:
|
||||||
|
r[name][tag] = []
|
||||||
|
r[name][tag].append(val)
|
||||||
|
if name == 'uniformResourceIdentifier':
|
||||||
|
r['uniformResourceIdentifier'] = True
|
||||||
|
return r
|
||||||
|
|
||||||
def _parse_asn1(asn1):
|
def check_certificate(cert, domain):
|
||||||
obj = decode(asn1, asn1Spec=GeneralNames())[0]
|
cnt = cert.get_extension_count()
|
||||||
r = {}
|
if '.' in domain:
|
||||||
for o in obj:
|
compared_domain = domain.split('.', 1)[1]
|
||||||
name = o.getName()
|
else:
|
||||||
if name == 'dNSName':
|
compared_domain = ''
|
||||||
if name not in r:
|
srv_domain = '_xmpp-client.' + domain
|
||||||
r[name] = []
|
compared_srv_domain = '_xmpp-client.' + compared_domain
|
||||||
r[name].append(str(o.getComponent()))
|
for i in range(0, cnt):
|
||||||
if name == 'otherName':
|
ext = cert.get_extension(i)
|
||||||
if name not in r:
|
if ext.get_short_name() == b'subjectAltName':
|
||||||
r[name] = {}
|
try:
|
||||||
tag = str(tuple(o.getComponent())[0])
|
r = _parse_asn1(ext.get_data())
|
||||||
val = str(tuple(o.getComponent())[1])
|
except:
|
||||||
if tag not in r[name]:
|
log.error('Wrong data in certificate: subjectAltName=%s' % \
|
||||||
r[name][tag] = []
|
ext.get_data())
|
||||||
r[name][tag].append(val)
|
continue
|
||||||
if name == 'uniformResourceIdentifier':
|
if 'otherName' in r:
|
||||||
r['uniformResourceIdentifier'] = True
|
if oid_xmppaddr in r['otherName']:
|
||||||
return r
|
for host in r['otherName'][oid_xmppaddr]:
|
||||||
|
try:
|
||||||
def check_certificate(cert, domain):
|
host = prep(None, host, None)
|
||||||
cnt = cert.get_extension_count()
|
except InvalidFormat:
|
||||||
if '.' in domain:
|
|
||||||
compared_domain = domain.split('.', 1)[1]
|
|
||||||
else:
|
|
||||||
compared_domain = ''
|
|
||||||
srv_domain = '_xmpp-client.' + domain
|
|
||||||
compared_srv_domain = '_xmpp-client.' + compared_domain
|
|
||||||
for i in range(0, cnt):
|
|
||||||
ext = cert.get_extension(i)
|
|
||||||
if ext.get_short_name() == b'subjectAltName':
|
|
||||||
try:
|
|
||||||
r = _parse_asn1(ext.get_data())
|
|
||||||
except:
|
|
||||||
log.error('Wrong data in certificate: subjectAltName=%s' % \
|
|
||||||
ext.get_data())
|
|
||||||
continue
|
|
||||||
if 'otherName' in r:
|
|
||||||
if oid_xmppaddr in r['otherName']:
|
|
||||||
for host in r['otherName'][oid_xmppaddr]:
|
|
||||||
try:
|
|
||||||
host = prep(None, host, None)
|
|
||||||
except InvalidFormat:
|
|
||||||
continue
|
|
||||||
if host == domain:
|
|
||||||
return True
|
|
||||||
if oid_dnssrv in r['otherName']:
|
|
||||||
for host in r['otherName'][oid_dnssrv]:
|
|
||||||
if host.startswith('_xmpp-client.*.'):
|
|
||||||
if host.replace('*.', '', 1) == compared_srv_domain:
|
|
||||||
return True
|
|
||||||
continue
|
|
||||||
if host == srv_domain:
|
|
||||||
return True
|
|
||||||
if 'dNSName' in r:
|
|
||||||
for host in r['dNSName']:
|
|
||||||
if host.startswith('*.'):
|
|
||||||
if host[2:] == compared_domain:
|
|
||||||
return True
|
|
||||||
continue
|
continue
|
||||||
if host == domain:
|
if host == domain:
|
||||||
return True
|
return True
|
||||||
if r:
|
if oid_dnssrv in r['otherName']:
|
||||||
return False
|
for host in r['otherName'][oid_dnssrv]:
|
||||||
break
|
if host.startswith('_xmpp-client.*.'):
|
||||||
|
if host.replace('*.', '', 1) == compared_srv_domain:
|
||||||
|
return True
|
||||||
|
continue
|
||||||
|
if host == srv_domain:
|
||||||
|
return True
|
||||||
|
if 'dNSName' in r:
|
||||||
|
for host in r['dNSName']:
|
||||||
|
if host.startswith('*.'):
|
||||||
|
if host[2:] == compared_domain:
|
||||||
|
return True
|
||||||
|
continue
|
||||||
|
if host == domain:
|
||||||
|
return True
|
||||||
|
if r:
|
||||||
|
return False
|
||||||
|
break
|
||||||
|
|
||||||
subject = cert.get_subject()
|
subject = cert.get_subject()
|
||||||
if subject.commonName == domain:
|
if subject.commonName == domain:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
except ImportError:
|
|
||||||
log.warning('Import of PyOpenSSL or pyasn1 failed. Cannot correctly check '
|
|
||||||
'SSL certificate')
|
|
||||||
|
|
||||||
def check_certificate(cert, domain):
|
|
||||||
subject = cert.get_subject()
|
|
||||||
if subject.commonName == domain:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
|
@ -45,10 +45,6 @@ class FeaturesWindow:
|
||||||
|
|
||||||
# {name: (available_function, unix_text, windows_text)}
|
# {name: (available_function, unix_text, windows_text)}
|
||||||
self.features = {
|
self.features = {
|
||||||
_('SSL certificate validation'): (self.pyopenssl_available,
|
|
||||||
_('A library used to validate server certificates to ensure a secure connection.'),
|
|
||||||
_('Requires python-pyopenssl > 0.12 and pyasn1.'),
|
|
||||||
_('Requires python-pyopenssl > 0.12 and pyasn1.')),
|
|
||||||
_('Bonjour / Zeroconf'): (self.zeroconf_available,
|
_('Bonjour / Zeroconf'): (self.zeroconf_available,
|
||||||
_('Serverless chatting with autodetected clients in a local network.'),
|
_('Serverless chatting with autodetected clients in a local network.'),
|
||||||
_('Requires python-avahi.'),
|
_('Requires python-avahi.'),
|
||||||
|
@ -151,19 +147,6 @@ class FeaturesWindow:
|
||||||
text = text + self.features[feature][2]
|
text = text + self.features[feature][2]
|
||||||
self.desc_label.set_text(text)
|
self.desc_label.set_text(text)
|
||||||
|
|
||||||
def pyopenssl_available(self):
|
|
||||||
try:
|
|
||||||
import OpenSSL.SSL
|
|
||||||
import OpenSSL.crypto
|
|
||||||
ver = OpenSSL.__version__
|
|
||||||
ver_l = [int(i) for i in ver.split('.')]
|
|
||||||
if ver_l < [0, 12]:
|
|
||||||
raise ImportError
|
|
||||||
import pyasn1
|
|
||||||
except Exception:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def zeroconf_available(self):
|
def zeroconf_available(self):
|
||||||
return app.HAVE_ZEROCONF
|
return app.HAVE_ZEROCONF
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue