# Copyright (C) 2006 Stefan Bethge # 2006 Dimitur Kirov # # This file is part of Gajim. # # Gajim is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation; version 3 only. # Gajim is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Gajim. If not, see . import socket import platform import ssl import errno import sys import os import string import logging from random import Random from unittest.mock import Mock import nbxmpp from nbxmpp import dispatcher_nb from nbxmpp import simplexml from nbxmpp.plugin import PlugIn from nbxmpp.idlequeue import IdleObject from nbxmpp.transports_nb import DATA_RECEIVED from nbxmpp.transports_nb import DATA_SENT from nbxmpp.transports_nb import DATA_ERROR from gajim.common import app from gajim.common.zeroconf import zeroconf from gajim.common.zeroconf import roster_zeroconf log = logging.getLogger('gajim.c.z.client_zeroconf') MAX_BUFF_LEN = 65536 TYPE_SERVER, TYPE_CLIENT = range(2) # wait XX sec to establish a connection CONNECT_TIMEOUT_SECONDS = 10 # after XX sec with no activity, close the stream ACTIVITY_TIMEOUT_SECONDS = 30 class ZeroconfListener(IdleObject): def __init__(self, port, conn_holder): """ Handle all incomming connections on ('0.0.0.0', port) """ self.port = port self.queue_idx = -1 #~ self.queue = None self.started = False self._sock = None self.fd = -1 self.caller = conn_holder.caller self.conn_holder = conn_holder def bind(self): flags = socket.AI_PASSIVE if hasattr(socket, 'AI_ADDRCONFIG'): flags |= socket.AI_ADDRCONFIG ai = socket.getaddrinfo(None, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, flags)[0] self._serv = socket.socket(ai[0], ai[1]) self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) self._serv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) if os.name == 'nt': if int(platform.win32_ver()[0]) >= 6: # Win Vista + # 47 is socket.IPPROTO_IPV6 # 27 is socket.IPV6_V6ONLY under windows, but not defined ... self._serv.setsockopt(41, 27, 0) # will fail when port is busy, or we don't have rights to bind try: self._serv.bind((ai[4][0], self.port)) except Exception: # unable to bind, show error dialog return None self._serv.listen(socket.SOMAXCONN) self._serv.setblocking(False) self.fd = self._serv.fileno() app.idlequeue.plug_idle(self, False, True) self.started = True def pollend(self): """ Called when we stop listening on (host, port) """ self.disconnect() def pollin(self): """ Accept a new incomming connection and notify queue """ sock = self.accept_conn() # loop through roster to find who has connected to us from_jid = None ipaddr = sock[1][0] for jid in self.conn_holder.getRoster().keys(): entry = self.conn_holder.getRoster().getItem(jid) for address in entry['addresses']: if address['address'] == ipaddr: from_jid = jid break P2PClient(sock[0], [{'host': ipaddr, 'address': ipaddr, 'port': sock[1][1]}], self.conn_holder, [], from_jid) def disconnect(self, message=''): """ Free all resources, we are not listening anymore """ log.info('Disconnecting ZeroconfListener: %s', message) app.idlequeue.remove_timeout(self.fd) app.idlequeue.unplug_idle(self.fd) self.fd = -1 self.started = False try: self._serv.close() except socket.error: pass self.conn_holder.kill_all_connections() def accept_conn(self): """ Accept a new incoming connection """ _sock = self._serv.accept() _sock[0].setblocking(False) return _sock class P2PClient(IdleObject): def __init__(self, _sock, addresses, conn_holder, stanzaqueue, to=None, on_ok=None, on_not_ok=None): self._owner = self self.Namespace = 'jabber:client' self.protocol_type = 'XMPP' self.defaultNamespace = self.Namespace self.Smacks = Mock() self._component = 0 self._registered_name = None self._caller = conn_holder.caller self.conn_holder = conn_holder self.stanzaqueue = stanzaqueue self.to = to #self.Server = addresses[0]['host'] self.on_ok = on_ok self.on_not_ok = on_not_ok self.Connection = None self.sock_hash = None if _sock: self.sock_type = TYPE_SERVER else: self.sock_type = TYPE_CLIENT self.fd = -1 conn = P2PConnection('', _sock, addresses, self._caller, self.on_connect, self) self.Server = conn.host # set Server to the last host name / address tried if not self.conn_holder: # An error occured, disconnect() has been called if on_not_ok: on_not_ok('Connection to host could not be established.') return self.sock_hash = conn._sock.__hash__ self.fd = conn.fd self.conn_holder.add_connection(self, self.Server, conn.port, self.to) # count messages in queue for val in self.stanzaqueue: stanza, is_message = val if is_message: if self.fd == -1: if on_not_ok: on_not_ok( 'Connection to host could not be established.') return thread_id = stanza.getThread() id_ = stanza.getID() if not id_: id_ = self.Dispatcher.getAnID() if self.fd in self.conn_holder.ids_of_awaiting_messages: self.conn_holder.ids_of_awaiting_messages[self.fd].append(( id_, thread_id)) else: self.conn_holder.ids_of_awaiting_messages[self.fd] = [(id_, thread_id)] self.on_responses = {} def get_bound_jid(self): return self._caller.get_own_jid() def add_stanza(self, stanza, is_message=False): if self.Connection: if self.Connection.state == -1: return False self.send(stanza, is_message) else: self.stanzaqueue.append((stanza, is_message)) if is_message: thread_id = stanza.getThread() id_ = stanza.getID() if not id_: id_ = self.Dispatcher.getAnID() if self.fd in self.conn_holder.ids_of_awaiting_messages: self.conn_holder.ids_of_awaiting_messages[self.fd].append((id_, thread_id)) else: self.conn_holder.ids_of_awaiting_messages[self.fd] = [(id_, thread_id)] return True def on_message_sent(self, connection_id): id_, _thread_id = \ self.conn_holder.ids_of_awaiting_messages[connection_id].pop(0) if self.on_ok: self.on_ok(id_) # use on_ok only on first message. For others it's called in # ClientZeroconf self.on_ok = None def on_connect(self, conn): self.Connection = conn self.Connection.PlugIn(self) dispatcher_nb.Dispatcher().PlugIn(self) self._register_handlers() def StreamInit(self): """ Send an initial stream header """ self.Dispatcher.Stream = simplexml.NodeBuilder() self.Dispatcher.Stream._dispatch_depth = 2 self.Dispatcher.Stream.dispatch = self.Dispatcher.dispatch self.Dispatcher.Stream.stream_header_received = self._check_stream_start self.Dispatcher.Stream.features = None if self.sock_type == TYPE_CLIENT: self.send_stream_header() def send_stream_header(self): self.Dispatcher._metastream = nbxmpp.Node('stream:stream') self.Dispatcher._metastream.setNamespace(self.Namespace) self.Dispatcher._metastream.setAttr('version', '1.0') self.Dispatcher._metastream.setAttr('xmlns:stream', nbxmpp.NS_STREAMS) self.Dispatcher._metastream.setAttr('from', self.conn_holder.zeroconf.name) if self.to: self.Dispatcher._metastream.setAttr('to', self.to) self.Dispatcher.send("%s>" % str( self.Dispatcher._metastream)[:-2]) def _check_stream_start(self, ns, tag, attrs): if ns != nbxmpp.NS_STREAMS or tag != 'stream': log.error('Incorrect stream start: (%s,%s).Terminating!', tag, ns) self.Connection.disconnect() if self.on_not_ok: self.on_not_ok('Connection to host could not be established: ' 'Incorrect answer from server.') return if self.sock_type == TYPE_SERVER: if 'from' in attrs: self.to = attrs['from'] self.send_stream_header() if 'version' in attrs and attrs['version'] == '1.0': # other part supports stream features features = nbxmpp.Node('stream:features') self.Dispatcher.send(features) while self.stanzaqueue: stanza, is_message = self.stanzaqueue.pop(0) self.send(stanza, is_message) elif self.sock_type == TYPE_CLIENT: while self.stanzaqueue: stanza, is_message = self.stanzaqueue.pop(0) self.send(stanza, is_message) def on_disconnect(self): if self.conn_holder: if self.fd in self.conn_holder.ids_of_awaiting_messages: del self.conn_holder.ids_of_awaiting_messages[self.fd] self.conn_holder.remove_connection(self.sock_hash) if 'Dispatcher' in self.__dict__: self._caller._unregister_new_handlers(self) self.Dispatcher.PlugOut() if 'P2PConnection' in self.__dict__: self.P2PConnection.PlugOut() self.Connection = None self._caller = None self.conn_holder = None def force_disconnect(self): if self.Connection: self.disconnect() else: self.on_disconnect() def _on_receive_document_attrs(self, data): if data: self.Dispatcher.ProcessNonBlocking(data) if not hasattr(self, 'Dispatcher') or \ self.Dispatcher.Stream._document_attrs is None: return self.onreceive(None) if 'version' in self.Dispatcher.Stream._document_attrs and \ self.Dispatcher.Stream._document_attrs['version'] == '1.0': #~ self.onreceive(self._on_receive_stream_features) #XXX continue with TLS return self.onreceive(None) return True def remove_timeout(self): pass def _register_handlers(self): self._caller.peerhost = self.Connection._sock.getsockname() self.RegisterHandler('message', lambda conn, data: self._caller._messageCB(self.Server, conn, data)) self.RegisterHandler('iq', self._caller._siSetCB, 'set', nbxmpp.NS_SI) self.RegisterHandler('iq', self._caller._siErrorCB, 'error', nbxmpp.NS_SI) self.RegisterHandler('iq', self._caller._siResultCB, 'result', nbxmpp.NS_SI) self.RegisterHandler('iq', self._caller._bytestreamSetCB, 'set', nbxmpp.NS_BYTESTREAM) self.RegisterHandler('iq', self._caller._bytestreamResultCB, 'result', nbxmpp.NS_BYTESTREAM) self.RegisterHandler('iq', self._caller._bytestreamErrorCB, 'error', nbxmpp.NS_BYTESTREAM) self.RegisterHandler('iq', self._caller._JingleCB, 'result') self.RegisterHandler('iq', self._caller._JingleCB, 'error') self.RegisterHandler('iq', self._caller._JingleCB, 'set', nbxmpp.NS_JINGLE) self._caller._register_new_handlers(self) class P2PConnection(IdleObject, PlugIn): def __init__(self, sock_hash, _sock, addresses=None, caller=None, on_connect=None, client=None): IdleObject.__init__(self) self._owner = client PlugIn.__init__(self) self.sendqueue = [] self.sendbuff = None self.buff_is_message = False self._sock = _sock self.sock_hash = None self.addresses = addresses self.on_connect = on_connect self.client = client self.writable = False self.readable = False self._exported_methods = [self.send, self.disconnect, self.onreceive] self.on_receive = None if _sock: self.host = addresses[0]['host'] self.port = addresses[0]['port'] self._sock = _sock self.state = 1 self._sock.setblocking(False) self.fd = self._sock.fileno() self.on_connect(self) else: self.state = 0 self.addresses_ = self.addresses self.get_next_addrinfo() def get_next_addrinfo(self): address = self.addresses_.pop(0) self.host = address['host'] self.port = address['port'] try: self.ais = socket.getaddrinfo(address['host'], address['port'], socket.AF_UNSPEC, socket.SOCK_STREAM) except socket.gaierror as e: log.info('Lookup failure for %s: %s[%s]', self.host, e[1], repr(e[0]), exc_info=True) if self.addresses_: return self.get_next_addrinfo() else: self.connect_to_next_ip() def connect_to_next_ip(self): if not self.ais: log.error('Connection failure to %s', str(self.host), exc_info=True) if self.addresses_: return self.get_next_addrinfo() self.disconnect() return ai = self.ais.pop(0) log.info('Trying to connect to %s through %s:%s', str(self.host), ai[4][0], ai[4][1], exc_info=True) try: self._sock = socket.socket(*ai[:3]) self._sock.setblocking(False) self._server = ai[4] except socket.error: if sys.exc_value[0] != errno.EINPROGRESS: # for all errors, we try other addresses self.connect_to_next_ip() return self.fd = self._sock.fileno() app.idlequeue.plug_idle(self, True, False) self.set_timeout(CONNECT_TIMEOUT_SECONDS) self.do_connect() def set_timeout(self, timeout): app.idlequeue.remove_timeout(self.fd) if self.state >= 0: app.idlequeue.set_read_timeout(self.fd, timeout) def plugin(self, owner): self.onreceive(owner._on_receive_document_attrs) self._plug_idle() return True def plugout(self): """ Disconnect from the remote server and unregister self.disconnected method from the owner's dispatcher """ self.disconnect() self._owner = None def onreceive(self, recv_handler): if not recv_handler: if hasattr(self._owner, 'Dispatcher'): self.on_receive = self._owner.Dispatcher.ProcessNonBlocking else: self.on_receive = None return _tmp = self.on_receive # make sure this cb is not overriden by recursive calls if not recv_handler(None) and _tmp == self.on_receive: self.on_receive = recv_handler def send(self, packet, is_message=False, now=False): """ Append stanza to the queue of messages to be send if now is False, else send it instantly """ if self.state <= 0: return r = str(packet).encode('utf-8') if now: self.sendqueue.insert(0, (r, is_message)) self._do_send() else: self.sendqueue.append((r, is_message)) self._plug_idle() def read_timeout(self): ids = self.client.conn_holder.ids_of_awaiting_messages if self.fd in ids and ids[self.fd]: for (_id, thread_id) in ids[self.fd]: if hasattr(self._owner, 'Dispatcher'): self._owner.Dispatcher.Event('', DATA_ERROR, ( self.client.to, thread_id)) else: self._owner.on_not_ok('connection timeout') ids[self.fd] = [] self.pollend() def do_connect(self): errnum = 0 try: self._sock.connect(self._server[:2]) self._sock.setblocking(False) except Exception as ee: errnum = ee.errno errstr = ee.strerror errors = (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK) if 'WSAEINVAL' in errno.__dict__: errors += (errno.WSAEINVAL,) if errnum in errors: return # win32 needs this if errnum not in (0, 10056, errno.EISCONN) or self.state != 0: log.error('Could not connect to %s: %s [%s]', str(self.host), errnum, errstr) self.connect_to_next_ip() return # socket is already connected self._sock.setblocking(False) self.state = 1 # connected # we are connected self.on_connect(self) def pollout(self): if self.state == 0: self.do_connect() return app.idlequeue.remove_timeout(self.fd) self._do_send() def pollend(self): if self.state == 0: # error in connect()? #self.disconnect() self.connect_to_next_ip() else: self.state = -1 self.disconnect() def pollin(self): """ Reads all pending incoming data. Call owner's disconnected() method if appropriate """ received = '' errnum = 0 try: # get as many bites, as possible, but not more than RECV_BUFSIZE received = self._sock.recv(MAX_BUFF_LEN) except Exception as e: errnum = e.errno # "received" will be empty anyhow if errnum == ssl.SSL_ERROR_WANT_READ: pass elif errnum in [errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN]: self.pollend() # don't proccess result, cas it will raise error return elif not received: if errnum != ssl.SSL_ERROR_EOF: # 8 EOF occurred in violation of protocol self.pollend() if self.state >= 0: self.disconnect() return if self.state < 0: return received = received.decode('utf-8') if self.on_receive: if self._owner.sock_type == TYPE_CLIENT: self.set_timeout(ACTIVITY_TIMEOUT_SECONDS) if received.strip(): log.debug('received: %s', received) if hasattr(self._owner, 'Dispatcher'): self._owner.Dispatcher.Event('', DATA_RECEIVED, received) self.on_receive(received) else: # This should never happed, so we need the debug log.error('Unhandled data received: %s', received) self.disconnect() return True def disconnect(self, message=''): """ Close the socket """ app.idlequeue.remove_timeout(self.fd) app.idlequeue.unplug_idle(self.fd) try: self._sock.shutdown(socket.SHUT_RDWR) self._sock.close() except socket.error: # socket is already closed pass self.fd = -1 self.state = -1 if self._owner: self._owner.on_disconnect() def _do_send(self): if not self.sendbuff: if not self.sendqueue: return None # nothing to send self.sendbuff, self.buff_is_message = self.sendqueue.pop(0) self.sent_data = self.sendbuff try: send_count = self._sock.send(self.sendbuff) if send_count: self.sendbuff = self.sendbuff[send_count:] if not self.sendbuff and not self.sendqueue: if self.state < 0: app.idlequeue.unplug_idle(self.fd) self._on_send() self.disconnect() return # we are not waiting for write self._plug_idle() self._on_send() except socket.error as e: if e.errno == ssl.SSL_ERROR_WANT_WRITE: return True if self.state < 0: self.disconnect() return self._on_send_failure() return if self._owner.sock_type == TYPE_CLIENT: self.set_timeout(ACTIVITY_TIMEOUT_SECONDS) return True def _plug_idle(self): readable = self.state != 0 writable = self.sendqueue or self.sendbuff if self.writable != writable or self.readable != readable: app.idlequeue.plug_idle(self, writable, readable) def _on_send(self): if self.sent_data and self.sent_data.strip(): log.debug('sent: %s', self.sent_data) if hasattr(self._owner, 'Dispatcher'): self._owner.Dispatcher.Event( '', DATA_SENT, self.sent_data.decode('utf-8')) self.sent_data = None if self.buff_is_message: self._owner.on_message_sent(self.fd) self.buff_is_message = False def _on_send_failure(self): log.error('Socket error while sending data') self._owner.on_disconnect() self.sent_data = None class ClientZeroconf: def __init__(self, caller): self.caller = caller self.zeroconf = None self.roster = None self.last_msg = '' self.connections = {} self.recipient_to_hash = {} self.ip_to_hash = {} self.hash_to_port = {} self.listener = None self.ids_of_awaiting_messages = {} self.disconnect_handlers = [] self.disconnecting = False def connect(self, show, msg): self.port = self.start_listener(self.caller.port) if not self.port: return False self.zeroconf_init(show, msg) if not self.zeroconf.connect(): self.disconnect() return None self.roster = roster_zeroconf.Roster(self.zeroconf) return True def remove_announce(self): if self.zeroconf: return self.zeroconf.remove_announce() def announce(self): if self.zeroconf: return self.zeroconf.announce() def set_show_msg(self, show, msg): if self.zeroconf: self.zeroconf.txt['msg'] = msg self.last_msg = msg return self.zeroconf.update_txt(show) def resolve_all(self): if self.zeroconf: return self.zeroconf.resolve_all() def reannounce(self, txt): self.remove_announce() self.zeroconf.txt = txt self.zeroconf.port = self.port self.zeroconf.username = self.caller.username return self.announce() def zeroconf_init(self, show, msg): self.zeroconf = zeroconf.Zeroconf(self.caller._on_new_service, self.caller._on_remove_service, self.caller._on_name_conflictCB, self.caller._on_disconnect, self.caller._on_error, self.caller.username, self.caller.host, self.port) self.zeroconf.txt['msg'] = msg self.zeroconf.txt['status'] = show self.zeroconf.txt['1st'] = self.caller.first self.zeroconf.txt['last'] = self.caller.last self.zeroconf.txt['jid'] = self.caller.jabber_id self.zeroconf.txt['email'] = self.caller.email self.zeroconf.username = self.caller.username self.zeroconf.host = self.caller.host self.zeroconf.port = self.port self.last_msg = msg def disconnect(self): # to avoid recursive calls if self.disconnecting: return if self.listener: self.listener.disconnect() self.listener = None if self.zeroconf: self.zeroconf.disconnect() self.zeroconf = None if self.roster: self.roster.zeroconf = None self.roster._data = None self.roster = None self.disconnecting = True for i in reversed(self.disconnect_handlers): log.debug('Calling disconnect handler %s', i) i() self.disconnecting = False def start_disconnect(self): self.disconnect() def kill_all_connections(self): for connection in list(self.connections.values()): connection.force_disconnect() def add_connection(self, connection, ip, port, recipient): sock_hash = connection.sock_hash if sock_hash not in self.connections: self.connections[sock_hash] = connection self.ip_to_hash[ip] = sock_hash self.hash_to_port[sock_hash] = port if recipient: self.recipient_to_hash[recipient] = sock_hash def remove_connection(self, sock_hash): if sock_hash in self.connections: del self.connections[sock_hash] for i in self.recipient_to_hash: if self.recipient_to_hash[i] == sock_hash: del self.recipient_to_hash[i] break for i in self.ip_to_hash: if self.ip_to_hash[i] == sock_hash: del self.ip_to_hash[i] break if sock_hash in self.hash_to_port: del self.hash_to_port[sock_hash] def start_listener(self, port): for p in range(port, port + 5): self.listener = ZeroconfListener(p, self) self.listener.bind() if self.listener.started: return p self.listener = None return False def getRoster(self): if self.roster: return self.roster.getRoster() return {} def send(self, stanza, is_message=False, now=False, on_ok=None, on_not_ok=None): to = stanza.getTo() if to is None: # Can’t send undirected stanza over Zeroconf. return -1 to = to.getStripped() stanza.setFrom(self.roster.zeroconf.name) try: item = self.roster[to] except KeyError: # Contact offline return -1 # look for hashed connections if to in self.recipient_to_hash: conn = self.connections[self.recipient_to_hash[to]] id_ = stanza.getID() or '' if conn.add_stanza(stanza, is_message): if on_ok: on_ok(id_) return the_address = None for address in item['addresses']: if address['address'] in self.ip_to_hash: the_address = address if the_address and the_address['address'] in self.ip_to_hash: hash_ = self.ip_to_hash[the_address['address']] if self.hash_to_port[hash_] == the_address['port']: conn = self.connections[hash_] id_ = stanza.getID() or '' if conn.add_stanza(stanza, is_message): if on_ok: on_ok(id_) return # otherwise open new connection if not stanza.getID(): stanza.setID('zero') addresses_ = [] for address in item['addresses']: addresses_ += [{'host': address['address'], 'address': address['address'], 'port': address['port']}] P2PClient(None, addresses_, self, [(stanza, is_message)], to, on_ok=on_ok, on_not_ok=on_not_ok) def getAnID(self): """ Generate a random id """ return ''.join(Random().sample(string.ascii_letters + string.digits, 6)) def RegisterDisconnectHandler(self, handler): """ Register handler that will be called on disconnect """ self.disconnect_handlers.append(handler) def UnregisterDisconnectHandler(self, handler): """ Unregister handler that is called on disconnect """ self.disconnect_handlers.remove(handler) def SendAndWaitForResponse(self, stanza, timeout=None, func=None, args=None): """ Send stanza and wait for recipient's response to it. Will call transports on_timeout callback if response is not retrieved in time Be aware: Only timeout of latest call of SendAndWait is active. """ # if timeout is None: # timeout = DEFAULT_TIMEOUT_SECONDS def on_ok(_waitid): # if timeout: # self._owner.set_timeout(timeout) to = stanza.getTo() to = app.get_jid_without_resource(to) try: item = self.roster[to] except KeyError: # Contact offline item = None conn = None if to in self.recipient_to_hash: conn = self.connections[self.recipient_to_hash[to]] elif item: the_address = None for address in item['addresses']: if address['address'] in self.ip_to_hash: the_address = address if the_address and the_address['address'] in self.ip_to_hash: hash_ = self.ip_to_hash[the_address['address']] if self.hash_to_port[hash_] == the_address['port']: conn = self.connections[hash_] if func: conn.Dispatcher.on_responses[_waitid] = (func, args) conn.onreceive(conn.Dispatcher._WaitForData) conn.Dispatcher._expected[_waitid] = None self.send(stanza, on_ok=on_ok) def SendAndCallForResponse(self, stanza, func=None, args=None): """ Put stanza on the wire and call back when recipient replies. Additional callback arguments can be specified in args. """ self.SendAndWaitForResponse(stanza, 0, func, args)