## xmlstream.py ## ## Copyright (C) 2001 Matthew Allum ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU Lesser General Public License as published ## by the Free Software Foundation; either version 2, or (at your option) ## any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU Lesser General Public License for more details. """\ xmlstream.py provides simple functionality for implementing XML stream based network protocols. It is used as a base for jabber.py. xmlstream.py manages the network connectivity and xml parsing of the stream. When a complete 'protocol element' ( meaning a complete child of the xmlstreams root ) is parsed the dipatch method is called with a 'Node' instance of this structure. The Node class is a very simple XML DOM like class for manipulating XML documents or 'protocol elements' in this case. """ # $Id: xmlstream.py,v 1.42 2004/01/08 15:47:40 snakeru Exp $ import time, sys, re, socket from select import select from base64 import encodestring import xml.parsers.expat import debug _debug=debug VERSION = "0.5-rc1" False = 0 True = 1 TCP = 1 STDIO = 0 TCP_SSL = 2 ENCODING = 'utf-8' # Though it is uncommon, this is the only right setting. ustr = str BLOCK_SIZE = 1024 ## Number of bytes to get at at time via socket ## transactions DBG_INIT, DBG_ALWAYS = debug.DBG_INIT, debug.DBG_ALWAYS DBG_CONN_ERROR = 'conn-error' ; debug.debug_flags.append( DBG_CONN_ERROR ) DBG_XML_PARSE = 'xml-parse' ; debug.debug_flags.append( DBG_XML_PARSE ) DBG_XML_RAW = 'xml-raw' ; debug.debug_flags.append( DBG_XML_RAW ) DBG_XML = [ DBG_XML_PARSE, DBG_XML_RAW ] # sample multiflag def XMLescape(txt): "Escape XML entities" txt = txt.replace("&", "&") txt = txt.replace("<", "<") txt = txt.replace(">", ">") return txt def XMLunescape(txt): "Unescape XML entities" txt = txt.replace(">", ">") txt = txt.replace("<", "<") txt = txt.replace("&", "&") return txt class error: def __init__(self, value): self.value = str(value) def __str__(self): return self.value class Node: """A simple XML DOM like class""" def __init__(self, tag=None, parent=None, attrs={}, payload=[], node=None): if node: if type(node)<>type(self): node=NodeBuilder(node).getDom() self.name,self.namespace,self.attrs,self.data,self.kids,self.parent = \ node.name,node.namespace,node.attrs,node.data,node.kids,node.parent else: self.name,self.namespace,self.attrs,self.data,self.kids,self.parent = 'tag','',{},[],[],None if tag: self.namespace, self.name = (['']+tag.split())[-2:] if parent: self.parent = parent # if self.parent and not self.namespace: self.namespace=self.parent.namespace # Doesn't checked if this neccessary for attr in attrs.keys(): self.attrs[attr]=attrs[attr] for i in payload: if type(i)==type(self): self.insertNode(i) else: self.insertXML(i) # self.insertNode(Node(node=i)) # Alternative way. Needs perfomance testing. def setParent(self, node): "Set the nodes parent node." self.parent = node def getParent(self): "return the nodes parent node." return self.parent def getName(self): "Set the nodes tag name." return self.name def setName(self,val): "Set the nodes tag name." self.name = val def putAttr(self, key, val): "Add a name/value attribute to the node." self.attrs[key] = val def getAttr(self, key): "Get a value for the nodes named attribute." try: return self.attrs[key] except: return None def putData(self, data): "Set the nodes textual data" self.data.append(data) def insertData(self, data): "Set the nodes textual data" self.data.append(data) def getData(self): "Return the nodes textual data" return ''.join(self.data) def getDataAsParts(self): "Return the node data as an array" return self.data def getNamespace(self): "Returns the nodes namespace." return self.namespace def setNamespace(self, namespace): "Set the nodes namespace." self.namespace = namespace def insertTag(self, name=None, attrs={}, payload=[], node=None): """ Add a child tag of name 'name' to the node. Returns the newly created node. """ newnode = Node(tag=name, parent=self, attrs=attrs, payload=payload, node=node) self.kids.append(newnode) return newnode def insertNode(self, node): "Add a child node to the node" self.kids.append(node) return node def insertXML(self, xml_str): "Add raw xml as a child of the node" newnode = NodeBuilder(xml_str).getDom() self.kids.append(newnode) return newnode def __str__(self): return self._xmlnode2str() def _xmlnode2str(self, parent=None): """Returns an xml ( string ) representation of the node and it children""" s = "<" + self.name if self.namespace: if parent and parent.namespace != self.namespace: s = s + " xmlns = '%s' " % self.namespace for key in self.attrs.keys(): val = ustr(self.attrs[key]) s = s + " %s='%s'" % ( key, XMLescape(val) ) s = s + ">" cnt = 0 if self.kids != None: for a in self.kids: if (len(self.data)-1) >= cnt: s = s + XMLescape(self.data[cnt]) s = s + a._xmlnode2str(parent=self) cnt=cnt+1 if (len(self.data)-1) >= cnt: s = s + XMLescape(self.data[cnt]) if not self.kids and s[-1:]=='>': s=s[:-1]+' />' else: s = s + "" return s def getTag(self, name): """Returns a child node with tag name. Returns None if not found.""" for node in self.kids: if node.getName() == name: return node return None def getTags(self, name): """Like getTag but returns a list with matching child nodes""" nodes=[] for node in self.kids: if node.getName() == name: nodes.append(node) return nodes def getChildren(self): """Returns a nodes children""" return self.kids def removeTag(self,tag): """Pops out specified child and returns it.""" if type(tag)==type(self): try: self.kids.remove(tag) return tag except: return None for node in self.kids: if node.getName()==tag: self.kids.remove(node) return node class NodeBuilder: """builds a 'minidom' from data parsed to it. Primarily for insertXML method of Node""" def __init__(self,data=None): self._parser = xml.parsers.expat.ParserCreate(namespace_separator=' ') self._parser.StartElementHandler = self.unknown_starttag self._parser.EndElementHandler = self.unknown_endtag self._parser.CharacterDataHandler = self.handle_data self.__depth = 0 self._dispatch_depth = 1 if data: self._parser.Parse(data,1) def unknown_starttag(self, tag, attrs): """XML Parser callback""" self.__depth = self.__depth + 1 self.DEBUG("DEPTH -> %i , tag -> %s, attrs -> %s" % \ (self.__depth, tag, str(attrs)),DBG_XML_PARSE ) if self.__depth == self._dispatch_depth: self._mini_dom = Node(tag=tag, attrs=attrs) self._ptr = self._mini_dom elif self.__depth > self._dispatch_depth: self._ptr.kids.append(Node(tag=tag,parent=self._ptr,attrs=attrs)) self._ptr = self._ptr.kids[-1] else: ## it the stream tag: if attrs.has_key('id'): self._incomingID = attrs['id'] self.last_is_data = False def unknown_endtag(self, tag ): """XML Parser callback""" self.DEBUG("DEPTH -> %i" % self.__depth,DBG_XML_PARSE) if self.__depth == self._dispatch_depth: self.dispatch(self._mini_dom) elif self.__depth > self._dispatch_depth: self._ptr = self._ptr.parent else: self.DEBUG("*** Stream terminated ? ****",DBG_CONN_ERROR) self.__depth = self.__depth - 1 self.last_is_data = False def handle_data(self, data): """XML Parser callback""" self.DEBUG("data-> " + data,DBG_XML_PARSE) if self.last_is_data: self._ptr.data[-1] += data else: self._ptr.data.append(data) self.last_is_data = True def dispatch(self,dom): pass def DEBUG(self,dup1,dup2=None): pass def getDom(self): return self._mini_dom class Stream(NodeBuilder): """Extention of NodeBuilder class. Handles stream of XML stanzas. Calls dispatch method for every child of root node (stream:stream for jabber stream). attributes _read, _write and _reader must be set by external entity """ def __init__(self, namespace, debug=[DBG_ALWAYS], log=None, id=None, timestampLog=True): self._namespace = namespace self._read , self._reader , self._write = None , None , None self._incomingID = None self._outgoingID = id self._debug = _debug.Debug(debug,encoding=ENCODING) self.DEBUG = self._debug.show # makes it backwards compatible with v0.4 code self.DEBUG("stream init called",DBG_INIT) if log: if type(log) is type(""): try: self._logFH = open(log,'w') except: print "ERROR: can open %s for writing" % log sys.exit(0) else: ## assume its a stream type object self._logFH = log else: self._logFH = None self._timestampLog = timestampLog def connect(self): NodeBuilder.__init__(self) self._dispatch_depth = 2 def timestampLog(self,timestamp): """ Enable or disable the showing of a timestamp in the log. By default, timestamping is enabled. """ self._timestampLog = timestamp def read(self): """Reads incoming data. Blocks until done. Calls self.disconnected(self) if appropriate.""" try: received = self._read(BLOCK_SIZE) except: received = '' while select([self._reader],[],[],0)[0]: add = self._read(BLOCK_SIZE) received +=add if not add: break if len(received): # length of 0 means disconnect self.DEBUG("got data " + received , DBG_XML_RAW ) self.log(received, 'RECV:') else: self.disconnected(self) return received def write(self,raw_data): """Writes raw outgoing data. Blocks until done. If supplied data is not unicode string, ENCODING is used for convertion. Avoid this! Always send your data as a unicode string.""" if type(raw_data) == type(''): self.DEBUG('Non-utf-8 string "%s" passed to Stream.write! Treating it as %s encoded.'%(raw_data,ENCODING)) raw_data = unicode(raw_data,ENCODING) data_out = raw_data.encode('utf-8') try: self._write(data_out) self.log(data_out, 'SENT:') self.DEBUG("sent %s" % data_out,DBG_XML_RAW) except: self.DEBUG("xmlstream write threw error",DBG_CONN_ERROR) self.disconnected(self) def process(self, timeout=0): """Receives incoming data (if any) and processes it. Waits for data no more than timeout seconds.""" if select([self._reader],[],[],timeout)[0]: data = self.read() self._parser.Parse(data) return len(data) return '0' # Zero means that nothing received but link is alive. def disconnect(self): """Close the stream and socket""" self.write ( u"" ) while self.process(): pass self._sock.close() self._sock = None def disconnected(self,conn): """Called when a Network Error or disconnection occurs.""" try: self.disconnectHandler(conn) except TypeError: self.disconnectHandler() def disconnectHandler(self,conn): ## To be overidden ## """Called when a Network Error or disconnection occurs. Designed to be overidden""" raise error("Standart disconnectionHandler called. Replace it with appropriate for your client.") def log(self, data, inout=''): """Logs data to the specified filehandle. Data is time stamped and prefixed with inout""" if self._logFH is not None: if self._timestampLog: self._logFH.write("%s - %s - %s\n" % (time.asctime(), inout, data)) else: self._logFH.write("%s - %s\n" % (inout, data ) ) self._logFH.flush() def getIncomingID(self): """Returns the streams ID""" return self._incomingID def getOutgoingID(self): """Returns the streams ID""" return self._incomingID class Client(Stream): def __init__(self, host, port, namespace, debug=[DBG_ALWAYS], log=None, sock=None, id=None, connection=TCP, hostIP=None, proxy=None): Stream.__init__(self, namespace, debug, log, id) self._host = host self._port = port self._sock = sock self._connection = connection if hostIP: self._hostIP = hostIP else: self._hostIP = host self._proxy = proxy self._sslObj = None self._sslIssuer = None self._sslServer = None def getSocket(self): return self._sock def connect(self): """Attempt to connect to specified host""" self.DEBUG("client connect called to %s %s type %i" % (self._host, self._port, self._connection), DBG_INIT ) Stream.connect(self) ## TODO: check below that stdin/stdout are actually open if self._connection == STDIO: self._setupComms() return self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: if self._proxy: self._sock.connect((self._proxy['host'], self._proxy['port'])) else: self._sock.connect((self._hostIP, self._port)) except socket.error, e: self.DEBUG("socket error: "+str(e),DBG_CONN_ERROR) raise if self._connection == TCP_SSL: try: self.DEBUG("Attempting to create ssl socket",DBG_INIT) self._sslObj = socket.ssl( self._sock, None, None ) self._sslIssuer = self._sslObj.issuer() self._sslServer = self._sslObj.server() except: self.DEBUG("Socket Error: No SSL Support",DBG_CONN_ERROR) raise self._setupComms() if self._proxy: self.DEBUG("Proxy connected",DBG_INIT) if self._proxy.has_key('type'): type = self._proxy['type'].upper() else: type = 'CONNECT' connector = [] if type == 'CONNECT': connector.append(u'CONNECT %s:%s HTTP/1.0'%(self._hostIP,self._port)) elif type == 'PUT': connector.append(u'PUT http://%s:%s/ HTTP/1.0'%(self._hostIP,self._port)) else: self.DEBUG("Proxy Error: unknown proxy type",DBG_CONN_ERROR) raise error('Unknown proxy type: '+type) connector.append('Proxy-Connection: Keep-Alive') connector.append('Pragma: no-cache') connector.append('Host: %s:%s'%(self._hostIP,self._port)) connector.append('User-Agent: Jabberpy/'+VERSION) if self._proxy.has_key('user') and self._proxy.has_key('password'): credentials = '%s:%s'%(self._proxy['user'],self._proxy['password']) credentials = encodestring(credentials).strip() connector.append('Proxy-Authorization: Basic '+credentials) connector.append('\r\n') bak = self._read , self._write self.write('\r\n'.join(connector)) reply = self.read().replace('\r','') self._read , self._write = bak try: proto,code,desc=reply.split('\n')[0].split(' ',2) except: raise error('Invalid proxy reply') if code<>'200': raise error('Invalid proxy reply: %s %s %s'%(proto,code,desc)) while reply.find('\n\n') == -1: reply += self.read().replace('\r','') self.DEBUG("Jabber server connected",DBG_INIT) self.header() def _setupComms(self): if self._connection == TCP: self._read = self._sock.recv self._write = self._sock.sendall self._reader = self._sock elif self._connection == TCP_SSL: self._read = self._sslObj.read self._write = self._sslObj.write self._reader = self._sock elif self._connection == STDIO: self._read = self.stdin.read self._write = self.stdout.write self._reader = sys.stdin else: self.DEBUG('unknown connection type',DBG_CONN_ERROR) raise IOError('unknown connection type') class Server: def now(self): return time.ctime(time.time()) def __init__(self, maxclients=10): self.host = '' self.port = 5222 self.streams = [] # make main sockets for accepting new client requests self.mainsocks, self.readsocks, self.writesocks = [], [], [] self.portsock = socket(AF_INET, SOCK_STREAM) self.portsock.bind((self.host, self.port)) self.portsock.listen(maxclients) self.mainsocks.append(self.portsock) # add to main list to identify self.readsocks.append(self.portsock) # add to select inputs list # event loop: listen and multiplex until server process killed def serve(self): print 'select-server loop starting' while 1: print "LOOPING" readables, writeables, exceptions = select(self.readsocks, self.writesocks, []) for sockobj in readables: if sockobj in self. mainsocks: # for ready input sockets newsock, address = sockobj.accept() # accept not block print 'Connect:', address, id(newsock) self.readsocks.append(newsock) self._makeNewStream(newsock) # add to select list, wait else: # client socket: read next line data = sockobj.recv(1024) # recv should not block print '\tgot', data, 'on', id(sockobj) if not data: # if closed by the clients sockobj.close() # close here and remv from self.readsocks.remove(sockobj) else: # this may block: should really select for writes too sockobj.send('Echo=>%s' % data) def _makeNewStream(self, sckt): new_stream = Stream('localhost', 5222, 'jabber:client', sock=sckt) self.streams.append(new_stream) ## maybe overide for a 'server stream' new_stream.header() return new_stream def _getStreamSockets(self): socks = []; for s in self.streams: socks.append(s.getSocket()) return socks def _getStreamFromSocket(self, sock): for s in self.streams: if s.getSocket() == sock: return s return None