add listening server for incomming messages

This commit is contained in:
Dimitur Kirov 2006-09-17 22:57:41 +00:00
parent 7707c5e824
commit 1a325833c3
5 changed files with 425 additions and 15 deletions

View File

@ -400,7 +400,7 @@ class Dispatcher(PlugIn):
''' Serialise stanza and put it on the wire. Assign an unique ID to it before send.
Returns assigned ID.'''
if type(stanza) in [type(''), type(u'')]:
return self._owner.Connection.send(stanza)
return self._owner.send_stanza(stanza)
if not isinstance(stanza, Protocol):
_ID=None
elif not stanza.getID():
@ -423,7 +423,7 @@ class Dispatcher(PlugIn):
stanza=route
stanza.setNamespace(self._owner.Namespace)
stanza.setParent(self._metastream)
self._owner.Connection.send(stanza)
self._owner.send_stanza(stanza)
return _ID
def disconnect(self):

View File

@ -1,6 +1,7 @@
## common/zeroconf/client_zeroconf.py
##
## Copyright (C) 2006 Stefan Bethge <stefan@lanpartei.de>
## 2006 Dimitur Kirov <dkirov@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published
@ -11,14 +12,314 @@
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
from common import gajim
from common.xmpp.idlequeue import IdleObject
from common.xmpp import dispatcher_nb, debug
from common.xmpp.client import *
from common.xmpp.simplexml import ustr
from dialogs import BindPortError
import socket
import errno
from common.zeroconf import roster_zeroconf
class ClientZeroconf:
def __init__(self, zeroconf):
self.roster = roster_zeroconf.Roster(zeroconf)
MAX_BUFF_LEN = 65536
DATA_RECEIVED='DATA RECEIVED'
DATA_SENT='DATA SENT'
class ZeroconfListener(IdleObject):
def __init__(self, port, caller = None):
''' handle all incomming connections on ('0.0.0.0', port)'''
self.port = port
self.queue_idx = -1
#~ self.queue = None
self.started = False
self._sock = None
self.fd = -1
self.caller = caller
def bind(self):
self._serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._serv.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self._serv.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# will fail when port as busy, or we don't have rights to bind
try:
self._serv.bind(('0.0.0.0', self.port))
except Exception, e:
# unable to bind, show error dialog
return None
self._serv.listen(socket.SOMAXCONN)
self._serv.setblocking(False)
self.fd = self._serv.fileno()
gajim.idlequeue.plug_idle(self, False, True)
self.started = True
def pollend(self):
''' called when we stop listening on (host, port) '''
self.disconnect2()
def pollin(self):
''' accept a new incomming connection and notify queue'''
sock = self.accept_conn()
P2PConnection('', sock[0], sock[1][0], sock[1][1], self.caller)
def disconnect(self):
''' free all resources, we are not listening anymore '''
gajim.idlequeue.remove_timeout(self.fd)
gajim.idlequeue.unplug_idle(self.fd)
self.fd = -1
self.started = False
try:
self._serv.close()
except:
pass
def accept_conn(self):
''' accepts a new incomming connection '''
_sock = self._serv.accept()
_sock[0].setblocking(False)
return _sock
class P2PConnection(IdleObject, PlugIn):
''' class for sending file to socket over socks5 '''
def __init__(self, sock_hash, _sock, host = None, port = None, caller = None):
PlugIn.__init__(self)
self.sendqueue = []
self.sendbuff = None
self._sock = _sock
self._sock.setblocking(False)
self.fd = _sock.fileno()
self._recv = _sock.recv
self._send = _sock.send
self.connected = True
self.state = 1
self.writable = False
self.readable = False
# waiting for first bytes
# start waiting for data
self.Namespace = 'jabber:client'
self.defaultNamespace = self.Namespace
self._component=0
self._caller = caller
self.Server = host
self.Connection = self
self._registered_name = None
self.DBG = 'client'
debug = ['always', 'nodebuilder']
self._DEBUG = Debug.Debug(debug)
self.DEBUG = self._DEBUG.Show
self.debug_flags = self._DEBUG.debug_flags
self.debug_flags.append(self.DBG)
self._owner = self
self._exported_methods=[self.send_stanza, self.disconnect2, self.pollend]
self.on_receive = None
gajim.idlequeue.plug_idle(self, False, True)
self.onreceive(self._on_receive_document_attrs)
dispatcher_nb.Dispatcher().PlugIn(self)
self.RegisterHandler('message', self._messageCB)
def _messageCB(self, conn, data):
self._caller._messageCB(self.Server, conn, data)
def onreceive(self, recv_handler):
if not recv_handler:
if hasattr(self._owner, 'Dispatcher'):
self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
else:
self.on_receive = None
return
_tmp = self.on_receive
# make sure this cb is not overriden by recursive calls
if not recv_handler(None) and _tmp == self.on_receive:
self.on_receive = recv_handler
def _on_receive_document_attrs(self, data):
if data:
self.Dispatcher.ProcessNonBlocking(data)
if not hasattr(self, 'Dispatcher') or \
self.Dispatcher.Stream._document_attrs is None:
return
self.onreceive(None)
if self.Dispatcher.Stream._document_attrs.has_key('version') and \
self.Dispatcher.Stream._document_attrs['version'] == '1.0':
#~ self.onreceive(self._on_receive_stream_features)
#XXX continue with TLS
return
self.onreceive(None)
return True
def send_stanza(self, stanza):
'''Append stanza to the queue of messages to be send.
If supplied data is unicode string, encode it to utf-8.
'''
if self.state <= 0:
return
r = stanza
if isinstance(r, unicode):
r = r.encode('utf-8')
elif not isinstance(r, str):
r = ustr(r).encode('utf-8')
self.sendqueue.append(r)
self._plug_idle()
def read_timeout(self):
gajim.idlequeue.remove_timeout(self.fd)
# no activity for foo seconds
# self.pollend()
def pollout(self):
if not self.connected:
self.disconnect2()
return
gajim.idlequeue.remove_timeout(self.fd)
self._do_send()
# self.idlequeue.plug_idle(self, False, True)
def pollend(self):
self.state = -1
self.disconnect2()
def pollin(self):
''' Reads all pending incoming data. Calls owner's disconnected() method if appropriate.'''
received = ''
errnum = 0
try:
# get as many bites, as possible, but not more than RECV_BUFSIZE
received = self._recv(MAX_BUFF_LEN)
except Exception, e:
if len(e.args) > 0 and isinstance(e.args[0], int):
errnum = e[0]
sys.exc_clear()
# "received" will be empty anyhow
if errnum == socket.SSL_ERROR_WANT_READ:
pass
elif errnum in [errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN]:
self.pollend()
# don't proccess result, cas it will raise error
return
elif not received :
if errnum != socket.SSL_ERROR_EOF:
# 8 EOF occurred in violation of protocol
self.pollend()
if self.state >= 0:
self.disconnect2()
return
if self.state < 0:
return
if self.on_receive:
if hasattr(self._owner, 'Dispatcher'):
self._owner.Dispatcher.Event('', DATA_RECEIVED, received)
self.on_receive(received)
else:
# This should never happed, so we need the debug
self.DEBUG('Unhandled data received: %s' % received,'got')
self.disconnect2()
if self.on_connect_failure:
self.on_connect_failure()
return True
def onreceive(self, recv_handler):
if not recv_handler:
if hasattr(self._owner, 'Dispatcher'):
self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
else:
self.on_receive = None
return
_tmp = self.on_receive
# make sure this cb is not overriden by recursive calls
if not recv_handler(None) and _tmp == self.on_receive:
self.on_receive = recv_handler
def disconnect2(self):
''' Closes the socket. '''
gajim.idlequeue.remove_timeout(self.fd)
gajim.idlequeue.unplug_idle(self.fd)
try:
self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close()
except:
# socket is already closed
pass
self.connected = False
self.fd = -1
self.state = -1
def _do_send(self):
if not self.sendbuff:
if not self.sendqueue:
return None # nothing to send
self.sendbuff = self.sendqueue.pop(0)
self.sent_data = self.sendbuff
try:
send_count = self._send(self.sendbuff)
if send_count:
self.sendbuff = self.sendbuff[send_count:]
if not self.sendbuff and not self.sendqueue:
if self.state < 0:
gajim.idlequeue.unplug_idle(self.fd)
self._on_send()
self.disconnect2()
return
# we are not waiting for write
self._plug_idle()
self._on_send()
except socket.error, e:
sys.exc_clear()
if e[0] == socket.SSL_ERROR_WANT_WRITE:
return True
if self.state < 0:
self.disconnect2()
return
if self._on_send_failure:
self._on_send_failure()
return
return True
def _plug_idle(self):
readable = self.state != 0
if self.sendqueue or self.sendbuff:
writable = True
else:
writable = False
if self.writable != writable or self.readable != readable:
gajim.idlequeue.plug_idle(self, writable, readable)
def _on_send(self):
if self.sent_data and self.sent_data.strip():
#~ self.DEBUG(self.sent_data,'sent')
if hasattr(self._owner, 'Dispatcher'):
self._owner.Dispatcher.Event('', DATA_SENT, self.sent_data)
self.sent_data = None
def _on_send_failure(self):
self.DEBUG("Socket error while sending data",'error')
self._owner.disconnected()
self.sent_data = None
class ClientZeroconf:
def __init__(self, zeroconf, caller):
self.roster = roster_zeroconf.Roster(zeroconf)
self.caller = caller
self.start_listener(zeroconf.port)
def start_listener(self, port):
self.listener = ZeroconfListener(port, self.caller)
self.listener.bind()
if self.listener.started is False:
self.listener = None
# We cannot bind port, call error
# dialog from dialogs.py and fail
BindPortError(port)
return None
#~ self.connected += 1
def getRoster(self):
return self.roster.getRoster()

View File

@ -33,7 +33,7 @@ import common.xmpp
from common import GnuPG
from common import helpers
from common import gajim
from common.zeroconf import zeroconf
STATUS_LIST = ['offline', 'connecting', 'online', 'chat', 'away', 'xa', 'dnd',
'invisible']
# kind of events we can wait for an answer
@ -227,6 +227,108 @@ class ConnectionHandlersZeroconf(ConnectionVcard):
idle.init()
except:
HAS_IDLE = False
def _messageCB(self, ip, con, msg):
'''Called when we receive a message'''
msgtxt = msg.getBody()
mtype = msg.getType()
subject = msg.getSubject() # if not there, it's None
tim = msg.getTimestamp()
tim = time.strptime(tim, '%Y%m%dT%H:%M:%S')
tim = time.localtime(timegm(tim))
frm = helpers.get_full_jid_from_iq(msg)
if frm == 'none':
for key in self.zeroconf.contacts:
if ip == self.zeroconf.contacts[key][zeroconf.C_ADDRESS]:
frm = key
jid = helpers.get_jid_from_iq(msg)
print 'jid', jid
no_log_for = gajim.config.get_per('accounts', self.name,
'no_log_for').split()
encrypted = False
chatstate = None
encTag = msg.getTag('x', namespace = common.xmpp.NS_ENCRYPTED)
decmsg = ''
# invitations
invite = None
if not encTag:
invite = msg.getTag('x', namespace = common.xmpp.NS_MUC_USER)
if invite and not invite.getTag('invite'):
invite = None
delayed = msg.getTag('x', namespace = common.xmpp.NS_DELAY) != None
msg_id = None
composing_jep = None
# FIXME: Msn transport (CMSN1.2.1 and PyMSN0.10) do NOT RECOMMENDED
# invitation
# stanza (MUC JEP) remove in 2007, as we do not do NOT RECOMMENDED
xtags = msg.getTags('x')
# chatstates - look for chatstate tags in a message if not delayed
if not delayed:
composing_jep = False
children = msg.getChildren()
for child in children:
if child.getNamespace() == 'http://jabber.org/protocol/chatstates':
chatstate = child.getName()
composing_jep = 'JEP-0085'
break
# No JEP-0085 support, fallback to JEP-0022
if not chatstate:
chatstate_child = msg.getTag('x', namespace = common.xmpp.NS_EVENT)
if chatstate_child:
chatstate = 'active'
composing_jep = 'JEP-0022'
if not msgtxt and chatstate_child.getTag('composing'):
chatstate = 'composing'
# JEP-0172 User Nickname
user_nick = msg.getTagData('nick')
if not user_nick:
user_nick = ''
if encTag and GnuPG.USE_GPG:
#decrypt
encmsg = encTag.getData()
keyID = gajim.config.get_per('accounts', self.name, 'keyid')
if keyID:
decmsg = self.gpg.decrypt(encmsg, keyID)
if decmsg:
msgtxt = decmsg
encrypted = True
if mtype == 'error':
error_msg = msg.getError()
if not error_msg:
error_msg = msgtxt
msgtxt = None
if self.name not in no_log_for:
gajim.logger.write('error', frm, error_msg, tim = tim,
subject = subject)
self.dispatch('MSGERROR', (frm, msg.getErrorCode(), error_msg, msgtxt,
tim))
elif mtype == 'chat': # it's type 'chat'
if not msg.getTag('body') and chatstate is None: #no <body>
return
if msg.getTag('body') and self.name not in no_log_for and jid not in\
no_log_for and msgtxt:
msg_id = gajim.logger.write('chat_msg_recv', frm, msgtxt, tim = tim,
subject = subject)
self.dispatch('MSG', (frm, msgtxt, tim, encrypted, mtype, subject,
chatstate, msg_id, composing_jep, user_nick))
else: # it's single message
if self.name not in no_log_for and jid not in no_log_for and msgtxt:
gajim.logger.write('single_msg_recv', frm, msgtxt, tim = tim,
subject = subject)
if invite is not None:
item = invite.getTag('invite')
jid_from = item.getAttr('from')
if jid_from == None:
jid_from = frm
reason = item.getTagData('reason')
item = invite.getTag('password')
password = invite.getTagData('password')
self.dispatch('GC_INVITATION',(frm, jid_from, reason, password))
else:
self.dispatch('MSG', (frm, msgtxt, tim, encrypted, 'normal',
subject, chatstate, msg_id, composing_jep, user_nick))
# END messageCB
'''
def build_http_auth_answer(self, iq_obj, answer):
if answer == 'yes':

View File

@ -184,7 +184,7 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf):
return self.connection, ''
if self.zeroconf.connect():
self.connection = client_zeroconf.ClientZeroconf(self.zeroconf)
self.connection = client_zeroconf.ClientZeroconf(self.zeroconf, self)
self.roster = self.connection.getRoster()
self.dispatch('ROSTER', self.roster)
@ -197,7 +197,7 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf):
# refresh all contacts data every second
self.call_resolve_timeout = True
gobject.timeout_add(1000, self._on_resolve_timeout)
gobject.timeout_add(10000, self._on_resolve_timeout)
else:
pass
#TODO: display visual notification that we could not connect to avahi
@ -487,7 +487,13 @@ class ConnectionZeroconf(ConnectionHandlersZeroconf):
def send_keepalive(self):
# nothing received for the last foo seconds (60 secs by default)
if self.connection:
self.connection.send(' ')
pass
def _event_dispatcher(self, realm, event, data):
if realm == '':
if event == common.xmpp.transports.DATA_RECEIVED:
self.dispatch('STANZA_ARRIVED', unicode(data, errors = 'ignore'))
elif event == common.xmpp.transports.DATA_SENT:
self.dispatch('STANZA_SENT', unicode(data))
# END ConnectionZeroconf

View File

@ -300,7 +300,7 @@ class Zeroconf:
return False
def send (self, msg, sock):
print 'send:'+msg
print 'send:', msg
totalsent = 0
while totalsent < len(msg):
sent = sock.send(msg[totalsent:])
@ -309,13 +309,14 @@ class Zeroconf:
totalsent = totalsent + sent
def send_message(self, jid, msg, type = 'chat'):
print 'zeroconf.py: send_message:'+ msg
print 'zeroconf.py: send_message:', msg
if not msg :
return
sock = socket.socket ( socket.AF_INET, socket.SOCK_STREAM )
#sock.setblocking(False)
sock.connect ( ( self.contacts[jid][C_ADDRESS], self.contacts[jid][C_PORT] ) )
#print (self.txt_array_to_dict(self.contacts[jid][C_TXT]))['port.p2pj']
print (self.txt_array_to_dict(self.contacts[jid][C_TXT]))['port.p2pj']
#was for adium which uses the txt record
#sock.connect ( ( self.contacts[jid][5], int((self.txt_array_to_dict(self.contacts[jid][7]))['port.p2pj']) ) )