User Tools

Site Tools


Sidebar

Go Back

Refresh

You are not allowed to add pages

Direct Link

library:misc:temp

mux.py中 run是运行的线程, while 中循环运行

py
  def run(self):
		import sys
		v = [0 for i in range(1500) ]
 
                # Rxsocket 只有一个
                # txSocket 是一个数组, 每个输出码流一个 socket
		rxSocket = self.rxSocket
		txSocket = self.txSocket
 
                # 这是死循环,在线程的生命周期里面一直收包,处理,发包,收下一包
		# Processing net package
		while(self.__running.isSet()):
 
  # --1-- 收一个RTP包(MUX只开一个接收socket,所有Node的数据都在这个socket接收)
			try:
				buf, addr = rxSocket.recvfrom(1500)
			except socket.error, e :
				pass
			else :
 
  # --2-- 判断当前包来自哪个码流 			
 
				# 从接收到的包的源地址(addr),判断这个包来自哪个码流
				# from which source
				ip = addr[0]
				indexOfInStream = -1
				if ip in self.sourceIP :  
				## sourceIP是一个数组,数组中每一项存的内容是一个IP地址,例如sourceIP[1] = '192.168.1.80',意思
				##	是码流1 的源地址是 192.168.1.80
					indexOfInStream = self.sourceIP.index(ip)
 
				if indexOfInStream == 1 :
					pass
 
   # --3-- 判断当前包需要转发给哪个码流		
				#	
				#在这之前,已经在上面的代码知道这个数据包是来源于哪个码流了,
				#接下来要判断,这个码流要送给谁
				#
				# who need this source
				toWhichOutStream = []
				if indexOfInStream >= 0 and indexOfInStream < self.maxStreamNum :
					toWhichOutStream = self.toWhichOutStream[indexOfInStream]
 
				# toWhichOutStream 是一个不定长数组(链表?), 里面存着当前数据包需要转发给哪些目的地(码流编号)
 
				#计算包中有效数据长度, 样点数 = (包长 - 12字节RTP头) / 2(声道数) / 2 (每样点2字节,16bit音频)
				# Send to who need it
				n = (len(buf)-12)/2
 
				# ..Mark 1..,  提取RTP包中的音频数据,,str2value()将buf中的数据,提取到一个全局的数组中,以数值方式
				# 				存储, 方便对数值处理
				self.str2value(buf[12:])
				nSamples = n/2
 
  # --4-- 转发出去
				##
				## 发给每一个目的码流
				##
				for indexOfOutStream in toWhichOutStream :
 
					# Mark 5,  重新生成RTP 头,替换里 包序号(2Byte)/ 时间戳(4byte)/ SRS段(4Byte)
					hdr = self.hdrProcess( hdr = buf[:12], sn = self.outputSn[indexOfOutStream], ts = self.outputTs[indexOfOutStream] )
 
 
					#print( '%d %s' %(nSamples, hdr[2:8] ) )
 
					# 包头和数据拼接  , , 见上文 .. Mark 1..,  value2str()从全局数组中,计算(音量调整)并生成RTP包需要的格式。
					s = hdr + self.value2str(n, self.outVolume[indexOfOutStream])
 
					# 发出去
					self.txSocket[indexOfOutStream].send(s)
 
 
					# 包序号递增,留给( Mark 5)下一包使用
					if self.outputSn[indexOfOutStream] == 65535 :
						self.outputSn[indexOfOutStream] = 0
					else :
						self.outputSn[indexOfOutStream] += 1
 
					# 时间戳递增,留给( Mark 5)下一包使用
					self.outputTs[indexOfOutStream] += nSamples
 
					if self.outputTs[indexOfOutStream] >= (2**32) :
						self.outputTs[indexOfOutStream] -= 2**32
 
 
 
			# receive a package
		# while
 
		#线程退出,,, 收到线程终止信号才会跑到这里,正常运行中不会到达这里
		# Service exit
		rxSocket.close()
		for i in range(self.maxStreamNum) :
			txSocket[i].close()
library/misc/temp.txt · Last modified: 2022/05/02 00:32 (external edit)