def run(self): import sys v = [0 for i in range(1500) ] # Rxsocket 只有一个 # txSocket 是一个数组, 每个输出码流一个 socket rxSocket = self.rxSocket txSocket = self.txSocket # 这是死循环,在线程的生命周期里面一直收包,处理,发包,收下一包 # Processing net package while(self.__running.isSet()): # --1-- 收一个RTP包(MUX只开一个接收socket,所有Node的数据都在这个socket接收) try: buf, addr = rxSocket.recvfrom(1500) except socket.error, e : pass else : # --2-- 判断当前包来自哪个码流 # 从接收到的包的源地址(addr),判断这个包来自哪个码流 # from which source ip = addr[0] indexOfInStream = -1 if ip in self.sourceIP : ## sourceIP是一个数组,数组中每一项存的内容是一个IP地址,例如sourceIP[1] = '192.168.1.80',意思 ## 是码流1 的源地址是 192.168.1.80 indexOfInStream = self.sourceIP.index(ip) if indexOfInStream == 1 : pass # --3-- 判断当前包需要转发给哪个码流 # #在这之前,已经在上面的代码知道这个数据包是来源于哪个码流了, #接下来要判断,这个码流要送给谁 # # who need this source toWhichOutStream = [] if indexOfInStream >= 0 and indexOfInStream < self.maxStreamNum : toWhichOutStream = self.toWhichOutStream[indexOfInStream] # toWhichOutStream 是一个不定长数组(链表?), 里面存着当前数据包需要转发给哪些目的地(码流编号) #计算包中有效数据长度, 样点数 = (包长 - 12字节RTP头) / 2(声道数) / 2 (每样点2字节,16bit音频) # Send to who need it n = (len(buf)-12)/2 # ..Mark 1.., 提取RTP包中的音频数据,,str2value()将buf中的数据,提取到一个全局的数组中,以数值方式 # 存储, 方便对数值处理 self.str2value(buf[12:]) nSamples = n/2 # --4-- 转发出去 ## ## 发给每一个目的码流 ## for indexOfOutStream in toWhichOutStream : # Mark 5, 重新生成RTP 头,替换里 包序号(2Byte)/ 时间戳(4byte)/ SRS段(4Byte) hdr = self.hdrProcess( hdr = buf[:12], sn = self.outputSn[indexOfOutStream], ts = self.outputTs[indexOfOutStream] ) #print( '%d %s' %(nSamples, hdr[2:8] ) ) # 包头和数据拼接 , , 见上文 .. Mark 1.., value2str()从全局数组中,计算(音量调整)并生成RTP包需要的格式。 s = hdr + self.value2str(n, self.outVolume[indexOfOutStream]) # 发出去 self.txSocket[indexOfOutStream].send(s) # 包序号递增,留给( Mark 5)下一包使用 if self.outputSn[indexOfOutStream] == 65535 : self.outputSn[indexOfOutStream] = 0 else : self.outputSn[indexOfOutStream] += 1 # 时间戳递增,留给( Mark 5)下一包使用 self.outputTs[indexOfOutStream] += nSamples if self.outputTs[indexOfOutStream] >= (2**32) : self.outputTs[indexOfOutStream] -= 2**32 # receive a package # while #线程退出,,, 收到线程终止信号才会跑到这里,正常运行中不会到达这里 # Service exit rxSocket.close() for i in range(self.maxStreamNum) : txSocket[i].close()