diff --git a/src/common/jingle.py b/src/common/jingle.py index 2f52c0885..10b9ce3ee 100644 --- a/src/common/jingle.py +++ b/src/common/jingle.py @@ -61,20 +61,23 @@ class JingleSession(object): # use .prepend() to add new callbacks, especially when you're going # to send error instead of ack self.callbacks={ - 'content-accept': [self.__contentAcceptCB, self.__defaultCB], + 'content-accept': [self.__contentAcceptCB, self.__broadcastCB, self.__defaultCB], 'content-add': [self.__defaultCB], 'content-modify': [self.__defaultCB], 'content-remove': [self.__defaultCB], - 'session-accept': [self.__contentAcceptCB, self.__defaultCB], + 'session-accept': [self.__contentAcceptCB, self.__broadcastCB, self.__defaultCB], 'session-info': [self.__defaultCB], - 'session-initiate': [self.__sessionInitiateCB, self.__defaultCB], + 'session-initiate': [self.__sessionInitiateCB, self.__broadcastCB, self.__defaultCB], 'session-terminate': [self.__defaultCB], - 'transport-info': [self.__defaultCB], + 'transport-info': [self.__broadcastCB, self.__defaultCB], 'iq-result': [], 'iq-error': [], } # for making streams using farsight + import gc + gc.disable() + print self.weinitiate, "#farsight_session_factory_make" self.p2psession = farsight.farsight_session_factory_make('rtp') self.p2psession.connect('error', self.on_p2psession_error) @@ -123,39 +126,37 @@ class JingleSession(object): error = stanza.getTag('error') if error: # it's an iq-error stanza - callables = 'iq-error' + action = 'iq-error' elif jingle: # it's a jingle action action = jingle.getAttr('action') - callables = action else: # it's an iq-result (ack) stanza - callables = 'iq-result' + action = 'iq-result' - callables = self.callbacks[callables] + callables = self.callbacks[action] try: for callable in callables: - callable(stanza=stanza, jingle=jingle, error=error) + callable(stanza=stanza, jingle=jingle, error=error, action=action) except xmpp.NodeProcessed: pass - def __defaultCB(self, stanza, jingle, error): + def __defaultCB(self, stanza, jingle, error, action): ''' Default callback for action stanzas -- simple ack and stop processing. ''' response = stanza.buildReply('result') self.connection.connection.send(response) - def __contentAcceptCB(self, stanza, jingle, error): + def __contentAcceptCB(self, stanza, jingle, error, action): ''' Called when we get content-accept stanza or equivalent one (like session-accept).''' - # check which contents are accepted, call their callbacks + # check which contents are accepted for content in jingle.iterTags('content'): creator = content['creator'] name = content['name'] - - def __sessionInitiateCB(self, stanza, jingle, error): + def __sessionInitiateCB(self, stanza, jingle, error, action): ''' We got a jingle session request from other entity, therefore we are the receiver... Unpack the data. ''' self.initiator = jingle['initiator'] @@ -169,7 +170,7 @@ class JingleSession(object): tran_ns = element.getTag('transport').getNamespace() if desc_ns==xmpp.NS_JINGLE_AUDIO and tran_ns==xmpp.NS_JINGLE_ICE_UDP: # we've got voip content - self.addContent(element['name'], JingleVoiP(self, node=element), 'peer') + self.addContent(element['name'], JingleVoiP(self), 'peer') fail = False if fail: @@ -181,8 +182,16 @@ class JingleSession(object): self.state = JingleStates.pending + def __broadcastCB(self, stanza, jingle, error, action): + ''' Broadcast the stanza contents to proper content handlers. ''' + for content in jingle.iterTags('content'): + name = content['name'] + creator = content['creator'] + cn = self.contents[(creator, name)] + cn.stanzaCB(stanza, content, error, action) + def on_p2psession_error(self, *anything): - print "Farsight session error!" + print self.weinitiate, "Farsight session error!" ''' Methods that make/send proper pieces of XML. They check if the session is in appropriate state. ''' @@ -280,12 +289,12 @@ class Codec(object): return True def toXML(self): - return xmpp.Node('payload', + return xmpp.Node('payload-type', attrs=self.attrs, payload=(xmpp.Node('parameter', {'name': k, 'value': v}) for k,v in self.params)) class JingleAudioSession(object): - __metaclass__=meta.VerboseClassType +# __metaclass__=meta.VerboseClassType def __init__(self, content, fromNode): self.content = content @@ -353,19 +362,6 @@ class JingleAudioSession(object): self.responder_codecs = self.getOurCodecs(self.initiator_codecs) return self.__codecsList(self.responder_codecs) -class JingleICEUDPSession(object): - __metaclass__=meta.VerboseClassType - def __init__(self, content): - self.content = content - - def _sessionInitiateCB(self): - ''' Called when we initiate the session. ''' - pass - - def toXML(self): - ''' ICE-UDP doesn't send much in its transport stanza... ''' - return xmpp.Node(xmpp.NS_JINGLE_ICE_UDP+' transport') - class JingleContent(object): ''' An abstraction of content in Jingle sessions. ''' def __init__(self, session, node=None): @@ -379,10 +375,10 @@ class JingleContent(object): class JingleVoiP(JingleContent): ''' Jingle VoiP sessions consist of audio content transported over an ICE UDP protocol. ''' - __metaclass__=meta.VerboseClassType +# __metaclass__=meta.VerboseClassType def __init__(self, session, node=None): JingleContent.__init__(self, session, node) - self.codecs = None + self.got_codecs = False #if node is None: # self.audio = JingleAudioSession(self) @@ -391,12 +387,73 @@ class JingleVoiP(JingleContent): #self.transport = JingleICEUDPSession(self) self.setupStream() + def stanzaCB(self, stanza, content, error, action): + ''' Called when something related to our content was sent by peer. ''' + callbacks = { + 'content-accept': [self.__getRemoteCodecsCB], + 'content-add': [], + 'content-modify': [], + 'content-remove': [], + 'session-accept': [self.__getRemoteCodecsCB], + 'session-info': [], + 'session-initiate': [self.__getRemoteCodecsCB], + 'session-terminate': [], + 'transport-info': [self.__transportInfoCB], + 'iq-result': [], + 'iq-error': [], + }[action] + for callback in callbacks: + callback(stanza, content, error, action) + + def __getRemoteCodecsCB(self, stanza, content, error, action): + if self.got_codecs: return + + codecs = [] + for codec in content.getTag('description').iterTags('payload-type'): + c = {'id': int(codec['id']), + 'encoding_name': codec['name'], + 'media_type': farsight.MEDIA_TYPE_AUDIO, + 'channels': 1, + 'params': dict((p['name'], p['value']) for p in codec.iterTags('parameter'))} + if 'channels' in codec: c['channels']=codec['channels'] + codecs.append(c) + if len(codecs)==0: return + + print self.session.weinitiate, "#farsight_stream_set_remote_codecs" + self.p2pstream.set_remote_codecs(codecs) + self.got_codecs=True + + def __transportInfoCB(self, stanza, content, error, action): + ''' Got a new transport candidate. ''' + candidates = [] + for candidate in content.getTag('transport').iterTags('candidate'): + cand={ + # 'candidate_id': str(self.session.connection.connection.getAnID()), + 'candidate_id': candidate['cid'], + 'component': int(candidate['component']), + 'ip': candidate['ip'], + 'port': int(candidate['port']), + 'proto': candidate['protocol']=='udp' and farsight.NETWORK_PROTOCOL_UDP \ + or farsight.NETWORK_PROTOCOL_TCP, + 'proto_subtype':'RTP', + 'proto_profile':'AVP', + 'preference': float(candidate['priority'])/100000, + # 'type': farsight.CANDIDATE_TYPE_LOCAL, + 'type': int(candidate['type']), + } + if 'ufrag' in candidate: cand['username']=candidate['ufrag'] + if 'pwd' in candidate: cand['password']=candidate['pwd'] + + candidates.append(cand) + print self.session.weinitiate, "#add_remote_candidate" + self.p2pstream.add_remote_candidate(candidates) + def toXML(self): ''' Return proper XML for element. ''' return xmpp.Node('content', attrs={'name': self.name, 'creator': self.creator, 'profile': 'RTP/AVP'}, payload=[ - xmpp.Node(xmpp.NS_JINGLE_AUDIO+' description', payload=self.getCodecs()), + xmpp.Node(xmpp.NS_JINGLE_AUDIO+' description', payload=self.iterCodecs()), xmpp.Node(xmpp.NS_JINGLE_ICE_UDP+' transport') ]) @@ -407,6 +464,7 @@ class JingleVoiP(JingleContent): payload=payload) def setupStream(self): + print self.session.weinitiate, "#farsight_session_create_stream" self.p2pstream = self.session.p2psession.create_stream( farsight.MEDIA_TYPE_AUDIO, farsight.STREAM_DIRECTION_BOTH) self.p2pstream.set_property('transmitter', 'libjingle') @@ -416,37 +474,91 @@ class JingleVoiP(JingleContent): self.p2pstream.connect('native-candidates-prepared', self.on_p2pstream_native_candidates_prepared) self.p2pstream.connect('state-changed', self.on_p2pstream_state_changed) self.p2pstream.connect('new-native-candidate', self.on_p2pstream_new_native_candidate) + + self.p2pstream.set_remote_codecs(self.p2pstream.get_local_codecs()) + + print self.session.weinitiate, "#farsight_stream_prepare_transports" self.p2pstream.prepare_transports() + print self.session.weinitiate, "#farsight_stream_set_active_codec" + self.p2pstream.set_active_codec(8) #??? + + sink = gst.element_factory_make('alsasink') + sink.set_property('sync', False) + sink.set_property('latency-time', 20000) + sink.set_property('buffer-time', 80000) + + src = gst.element_factory_make('audiotestsrc') + src.set_property('blocksize', 320) + #src.set_property('latency-time', 20000) + src.set_property('is-live', True) + + print self.session.weinitiate, "#farsight_stream_set_sink" + self.p2pstream.set_sink(sink) + print self.session.weinitiate, "#farsight_stream_set_source" + self.p2pstream.set_source(src) + def on_p2pstream_error(self, *whatever): pass - def on_p2pstream_new_active_candidate_pair(self, *whatever): pass - def on_p2pstream_codec_changed(self, *whatever): pass - def on_p2pstream_native_candidates_prepared(self, *whatever): pass - def on_p2pstream_state_changed(self, *whatever): pass + def on_p2pstream_new_active_candidate_pair(self, stream, native, remote): + print self.session.weinitiate, "##new_active_candidate_pair" + #print "New native candidate pair: %s, %s" % (native, remote) + def on_p2pstream_codec_changed(self, stream, codecid): + print self.session.weinitiate, "##codec_changed" + #print "Codec changed: %d" % codecid + def on_p2pstream_native_candidates_prepared(self, *whatever): + print self.session.weinitiate, "##native_candidates_prepared" + #print "Native candidates prepared: %r" % whatever + for candidate in self.p2pstream.get_native_candidate_list(): + self.send_candidate(candidate) + def on_p2pstream_state_changed(self, stream, state, dir): + print self.session.weinitiate, "##state_changed" + #print "State: %d, Dir: %d" % (state, dir) + if state==farsight.STREAM_STATE_CONNECTED: + print self.session.weinitiate, "#farsight_stream_signal_native_candidates_prepared" + stream.signal_native_candidates_prepared() + print self.session.weinitiate, "#farsight_stream_start" + stream.start() def on_p2pstream_new_native_candidate(self, p2pstream, candidate_id): + print self.session.weinitiate, "##new_native_candidate" + print self.session.weinitiate, "#get_native_candidate" candidates = p2pstream.get_native_candidate(candidate_id) + print self.session.weinitiate, "#!", repr(candidates) for candidate in candidates: - attrs={ - 'component': candidate['component'], - 'foundation': '1', # hack - 'generation': '0', - 'ip': candidate['ip'], - 'network': '0', - 'port': candidate['port'], - 'priority': int(100000*candidate['preference']), # hack - 'protocol': candidate['proto']==farsight.NETWORK_PROTOCOL_UDP and 'udp' or 'tcp', - } - if 'username' in candidate: attrs['ufrag']=candidate['username'] - if 'password' in candidate: attrs['pwd']=candidate['password'] - c=self.__content() - t=c.addChild(xmpp.NS_JINGLE_ICE_UDP+' transport') - t.addChild('candidate', attrs=attrs) - self.session.sendTransportInfo(c) + self.send_candidate(candidate) + def send_candidate(self, candidate): + attrs={ + 'cid': candidate['candidate_id'], + 'component': candidate['component'], + 'foundation': '1', # hack + 'generation': '0', + 'type': candidate['type'], + 'ip': candidate['ip'], + 'network': '0', + 'port': candidate['port'], + 'priority': int(100000*candidate['preference']), # hack + 'protocol': candidate['proto']==farsight.NETWORK_PROTOCOL_UDP and 'udp' or 'tcp', + } + if 'username' in candidate: attrs['ufrag']=candidate['username'] + if 'password' in candidate: attrs['pwd']=candidate['password'] + c=self.__content() + t=c.addChild(xmpp.NS_JINGLE_ICE_UDP+' transport') + t.addChild('candidate', attrs=attrs) + self.session.sendTransportInfo(c) - def getCodecs(self): + def iterCodecs(self): + print self.session.weinitiate, "#farsight_stream_get_local_codecs" codecs=self.p2pstream.get_local_codecs() - return (xmpp.Node('payload', attrs=a) for a in codecs) + for codec in codecs: + a = {'name': codec['encoding_name'], + 'id': codec['id'], + 'channels': 1} + if 'clock_rate' in codec: a['clockrate']=codec['clock_rate'] + if 'optional_params' in codec: + p = (xmpp.Node('parameter', {'name': name, 'value': value}) + for name, value in codec['optional_params'].iteritems()) + else: p = () + yield xmpp.Node('payload-type', a, p) class ConnectionJingle(object): ''' This object depends on that it is a part of Connection class. '''