# -*- coding:utf-8 -*- import os, sys, time from audio_recorder import READ_BUF_TIME,ARecorder from ssat_sdk.sat_environment import getSoundList import numpy as np import matplotlib.pyplot as plt import wave import thread import Queue def getSoundLevel(): hasSoundLevel, noSoundLevel = getSoundList() return round(hasSoundLevel, 2) TH_POWER = getSoundLevel() #平均短时能有无声的阈值 TH_PLOSIVE_TIME = 0.1 #短于0.1秒的声音,判断为爆破音,大于0.1秒,判断为正常声音 TH_BLOCK_TIME = 0.1#声音间断间隔时间,单位s class AAnalysis(): TH_POWER = getSoundLevel() # 平均短时能有无声的阈值 def __init__(self): self.varList = [] #记录各声道有无声变化 self.nChannels = 2 self.initStatus() def initStatus(self): self.power = 0 self.hasSound = False self.hasBlock = False self.hasPlosive = False self.soundPowerCHS = [] self.allPowerCHS = [] self.fftPowerCHS = [] self.fftFrqCHS = [] def showLanguage(self, time=5, waveFile=None): if waveFile is None: recorder = ARecorder() waveFile = "test.wav" recorder.recordWave(waveFile,time) # 调用wave模块中的open函数,打开语音文件。 f = wave.open(waveFile, 'rb') # 得到语音参数 params = f.getparams() nchannels, sampwidth, framerate, nframes = params[:4] # 得到的数据是字符串,需要将其转成int型 strData = f.readframes(nframes) wavaData = np.fromstring(strData, dtype=np.int16) # 归一化 wavaData = wavaData * 1.0 / max(abs(wavaData)) # .T 表示转置 wavaData = np.reshape(wavaData, [nframes, nchannels]).T f.close() # 绘制频谱 plt.specgram(wavaData[0], Fs=framerate, scale_by_freq=True, sides='default') plt.ylabel('Frequency') plt.xlabel('Time(s)') plt.show() def showFregForm(self,waveFile): wf = wave.open(waveFile, "rb") nframes = wf.getnframes() framerate = wf.getframerate() frame_data = wf.readframes(nframes) wf.close() time_data = np.fromstring(frame_data, dtype=np.int16) time_data.shape = -1, 2 time_dataT = time_data.T freq = [n for n in range(0, framerate)] start = 0 end = framerate time_dataT2 = time_dataT[0][start:start + end] # self.time2Frequence(time_dataT2[2000:2000+1000]) c = self.time2Frequence(time_dataT2, 1) print "showFregForm:c,", c.shape, np.max(np.abs(c)), np.average(np.abs(c)) for i in range(1,5): start = framerate*i end = framerate*(i+1) print "showFregForm:start,end:",start,end time_dataT2 = time_dataT[0][start:end] c = self.time2Frequence(time_dataT2, 1) print "showFregForm:c,", c.shape, np.max(np.abs(c)), np.average(np.abs(c)),time_dataT2.shape plt.plot(freq[:], abs(c[:]), 'r') plt.show() def showWaveForm(self, waveFile): # -*- coding: utf-8 -*- # 打开WAV文档 f = wave.open(waveFile, "rb") # 读取格式信息 # (nchannels, sampwidth, framerate, nframes, comptype, compname) params = f.getparams() nchannels, sampwidth, framerate, nframes = params[:4] # 读取波形数据 print "nchannels, sampwidth, framerate, nframes:",nchannels, sampwidth, framerate, nframes str_data = f.readframes(nframes) f.close() # 将波形数据转换为数组 print "str_data:",str_data.__len__() wave_data = np.fromstring(str_data, dtype=np.int16) print "wave_data.shape:",wave_data.shape # 声道处理 wave_data.shape = -1, nchannels wave_data = wave_data.T start = 0 per_frames = int(framerate * READ_BUF_TIME) chunks = int(nframes/framerate/READ_BUF_TIME) for i in range(2,chunks): start = per_frames*i end = per_frames * (i + 1) self.getFrameAVGPower(wave_data[0][start:end]) self.getFrameFFTPower(wave_data[0][start:end],READ_BUF_TIME) # self.STE(wave_data[0][start:end]) # self.ZCR(wave_data[0][start:end]) self.STE(wave_data[0]) # 声道处理 End print "channel 0:",wave_data[0].shape, len(wave_data[0]) print "channel 1:", wave_data[1].shape, len(wave_data[1]) # time = np.arange(0, nframes) * (1.0 / framerate) time = np.arange(0, nframes) * (1.0 / framerate) print "time:", time.shape # 绘制波形 plt.subplot(211) plt.plot(time, wave_data[0], "b") plt.subplot(212) plt.plot(time, wave_data[1], c="g") plt.xlabel("time (seconds)") plt.ylabel("power (hz)") plt.show() ''' 计算帧短时能 ''' def STE(self, frameL): amp = np.sum(np.abs(frameL)) print "STE amp:",amp return amp ''' 将列表,转换成numpy的数组。 ''' def list2NP(self, srcList): if srcList.__len__() > 0: return np.array(srcList) else: return np.array([-1]) ''' 计算有声的平均强度,即帧的强度超过有声阈值,才被统计 ''' def getSoundAVGPower(self, LR=False): # print "getSoundAVGPower,self.soundPowerCHS:",self.soundPowerCHS if LR is False: if self.soundPowerCHS.__len__() < 1: return 0 soundPowersL = self.list2NP(self.soundPowerCHS[0]) if self.soundPowerCHS.__len__() == 2: soundPowersR = self.list2NP(self.soundPowerCHS[1]) soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR) return np.average(soundPowers) return np.average(soundPowersL) else: if self.soundPowerCHS.__len__() < 1: return 0,0 soundPowersL = self.list2NP(self.soundPowerCHS[0]) soundPowersR = self.list2NP(self.soundPowerCHS[1]) return np.average(soundPowersL),np.average(soundPowersR) ''' :return result,valueList。 result:-1代表声音检测异常;1表示单声道;2 表示双声道 valueList:-1代表没有声音,0代表仅左声道,1代表仅右声道,2代表左右声道有声 ''' def getLRVariation(self): varList = [] if self.soundPowerCHS.__len__() < 1: return -1, self.varList elif self.soundPowerCHS.__len__() == 1: return 1, self.varList else: return 2, self.varList varLRCount = 0 def recLRVariation(self, channelPowers): ret = self.calLRVar(channelPowers) if self.varList.__len__() == 0: self.varList.append(ret) else: if self.varList[self.varList.__len__()-1] <> ret: self.varLRCount += 1 if self.varLRCount * self.frameTime > 0.5:#大于0.5秒钟声音,才算入变化 self.varList.append(ret) else: self.varLRCount = 0 def calLRVar(self, channelPowers): if channelPowers.__len__() == 1: #单声道,左声道有声,或者无声 return 0 if channelPowers[0] >= TH_POWER else -1 else: if channelPowers[0] >= TH_POWER and channelPowers[1] >= TH_POWER: return 2 elif channelPowers[0] >= TH_POWER and channelPowers[1] < TH_POWER: return 0 elif channelPowers[0] < TH_POWER and channelPowers[1] >= TH_POWER: return 1 else: return -1 ''' 计算整个录音的声音平均强度,实际意义不太大,除非是固定幅度音频检测。 ''' def getTotalAVGPower(self, LR=False): if LR is False: if self.allPowerCHS.__len__() < 1: return 0 soundPowersL = self.list2NP(self.allPowerCHS[0]) if self.allPowerCHS.__len__() == 2: soundPowersR = self.list2NP(self.allPowerCHS[1]) soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR) return np.average(soundPowers) else: return np.average(soundPowersL) else: if self.allPowerCHS.__len__() < 1: return 0,0 soundPowersL = self.list2NP(self.allPowerCHS[0]) soundPowersR = self.list2NP(self.allPowerCHS[1]) return np.average(soundPowersL),np.average(soundPowersR) ''' 计算整个录音的声音最大强度,实际意义不太大,除非是固定幅度音频检测。 ''' def getTotalMaxPower(self, LR=False): if LR is False: if self.allPowerCHS.__len__() < 1: return 0 soundPowersL = self.list2NP(self.allPowerCHS[0]) if self.allPowerCHS.__len__() == 2: soundPowersR = self.list2NP(self.allPowerCHS[1]) soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR) return int(np.max(soundPowers)) else: return int(np.max(soundPowersL)) else: if self.allPowerCHS.__len__() < 1: return 0, 0 soundPowersL = self.list2NP(self.allPowerCHS[0]) soundPowersR = self.list2NP(self.allPowerCHS[1]) return int(np.max(soundPowersL)), int(np.max(soundPowersR)) ''' 计算帧的平均声音强度 ''' def getFrameAVGPower(self, frameL): avgPower = np.average(np.abs(frameL)) # print "getFrameAVGPower:",avgPower return avgPower ''' 计算帧的最大声音强度 ''' def getFrameMaxPower(self, frameL): maxPower = np.max(np.abs(frameL)) # print "getFrameMaxPower:",maxPower return maxPower ''' 根据傅里叶变化,计算声音强度 ''' def getFrameFFTPower(self, frameL, timeL): fftFreq = self.time2Frequence(frameL, timeL) fftPower = np.max(np.abs(fftFreq)) powerFreq = np.argmax(np.abs(fftFreq)) # print "getFrameFFTPower:",fftPower,powerFreq return fftPower,powerFreq ''' 计算整个录音过程的fft转换后得到的声音强度 ''' def getFFTPower(self, LR=False): if LR is False: if self.fftPowerCHS.__len__() < 1: return 0 soundPowersL = self.list2NP(self.fftPowerCHS[0]) if self.fftPowerCHS.__len__() == 2: soundPowersR = self.list2NP(self.fftPowerCHS[1]) soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR) return np.average(soundPowers) return np.average(soundPowersL) else: if self.fftPowerCHS.__len__() < 1: return 0,0 soundPowersL = self.list2NP(self.fftPowerCHS[0]) soundPowersR = self.list2NP(self.fftPowerCHS[1]) return np.average(soundPowersL),np.average(soundPowersR) ''' 获取整个录音文件每1秒的最大声音频率列表。 LR=False,表示左右声道一样,取一个声道的值 LR=True,表示左右声道不一样,取两个声道的值 :param LR:是否取左右两个声道值。 :return :LR=True,二维数组,左右两个声道值;LR=False,一位数组,一个声道值。None:表示数据异常 ''' def getFFTFreq(self, LR = False): # print "getFFTFreq:",self.fftFrqCHS if self.fftFrqCHS.__len__() < 1: return None if LR is True: return self.fftFrqCHS else: return self.fftFrqCHS[0] ''' 计算帧过零率 ''' def ZCR(self,curFrame): # 过零率 tmp1 = curFrame[:-1] # tmp2 = curFrame[1:] sings = tmp1 * tmp2 <= 0 #帧左右错位1,如果相邻2个数一正一负,则相乘后小于<0,表示1次过零。 zcr = float(np.sum(sings)) / len(sings) print "ZCR:",zcr return zcr ''' 将声音时域数据,转换成频谱数据。并指取人耳能听到的部分100Hz ~ 12KHz 注意:如果帧时长,不是1秒,无法建立频谱图。 ''' def time2Frequence(self, frameL, timeL=1.0): fftFreq = np.fft.fft(frameL) * 2 / len(frameL) if np.size(fftFreq) > 12000: fftFreq = fftFreq[0:12000] # freq = np.arange(0,np.size(fftFreq),1) # plt.plot(freq[:], abs(fftFreq[:]), 'r') # plt.show() return fftFreq ''' 展现一帧音频的时域波形 ''' def showFrameTime(self, frameL, timeL, width): print "showFrameTime,frameL:", frameL.shape, np.ndim(frameL) ndim = np.ndim(frameL) if ndim == 1: frameCount = len(frameL) time = np.arange(0, frameCount) * (timeL / frameCount) print "showFrameTime,time:",time.shape plt.plot(time, frameL, "g") plt.xlabel("time (seconds)") plt.ylabel("power (hz)") plt.show() elif ndim == 2: rows, cols = frameL.shape time = np.arange(0, cols) * (timeL / cols) for i in range(rows): plt.subplot(rows,1,i+1) plt.plot(time, frameL[i]) plt.xlabel("time (seconds)") plt.ylabel("power (hz)") plt.show() def getNPDtype(self, byteNum): if byteNum == 1: return np.int8 elif byteNum == 2: return np.int16 else: return np.int16 ''' 用于音频录制过程中的帧分析。分析结束后,可以使用get类型接口,获取结果,例如:getSoundAVGPower 分析fft声音频谱时,timeL=1秒才有意义,才能建立频谱图。 声音有无判断:双声道,只要一个声道有声,判断为有声。 ''' def anaysisFrames(self, frameQue, channels, width, frameTime): self.frameTime = frameTime self.nChannels = channels self.varList = [] soundCount = 0 noSoundCount = 0 self.soundPowerCHS = [] self.allPowerCHS = [] self.fftPowerCHS = [] self.fftFrqCHS = [] for i in range(channels): self.soundPowerCHS.append([]) self.allPowerCHS.append([]) self.fftPowerCHS.append([]) self.fftFrqCHS.append([]) while self.isAF is True or frameQue.qsize() > 0: # print "anaysisFrames,frameQue.qsize:",frameQue.qsize() frames = frameQue.get() dtype = self.getNPDtype(width) # print "anaysisFrames,dtype size:", len(frames), channels, dtype frameL = np.fromstring(frames, dtype=dtype) frameL.shape = -1,channels frameL = frameL.T channelPowers = [] for i in range(channels): avgPower = self.getFrameMaxPower(frameL[i]) self.allPowerCHS[i].append(avgPower) fftPower,powerFreq = self.getFrameFFTPower(frameL[i], frameTime) self.fftPowerCHS[i].append(fftPower) self.fftFrqCHS[i].append(powerFreq) channelPowers.append(avgPower) #每一帧各通道数值计算完毕,开始进行帧的有无声判断 if i == channels - 1: #获取当前帧的各通道最强声音强度。 framePower = self.getCHMaxPower(channelPowers) #左右声道有无声的记录 self.recLRVariation(channelPowers) del channelPowers if framePower >= TH_POWER: noSoundCount = 0 soundCount += 1 if soundCount * frameTime >= TH_PLOSIVE_TIME: self.hasSound = True else: # 爆破音检测。还缺失:有声音的情况下,爆破音检测。 if soundCount * frameTime < TH_PLOSIVE_TIME and soundCount > 0: self.hasPlosive = True noSoundCount += 1 soundCount = 0 if noSoundCount * frameTime >= TH_BLOCK_TIME: self.hasBlock = True #左右声道分开记录有声的帧的声音强度 if avgPower >= TH_POWER: self.soundPowerCHS[i].append(avgPower) ''' 返回当前帧的各通道的最大音量 ''' def getCHMaxPower(self, channelPowers): channelPowers_np = self.list2NP(channelPowers) return np.max(channelPowers_np) '''开启音频后台分析线程''' def startFramesAnalysis(self, frameQue, channels, width, buf_time): self.initStatus() self.isAF = True thread.start_new_thread(self.anaysisFrames, (frameQue,channels,width, buf_time)) '''关闭音频后台分析,需要数据全部处理完后,线程才会停止''' def endFramesAnalysis(self): self.isAF = False def getWavReader(self, waveFile): wfReader = None try: wfReader = wave.open(waveFile,"rb") except Exception as e: print "Wave不存在",e return wfReader ''' 分析wav音频文件后,可以使用get类型接口,获取结果,例如:getTotalAVGPower 分析fft声音频谱时,buf_time=1秒才有意义,才能建立频谱图。 ''' def analyWav(self, waveFile, buf_time=READ_BUF_TIME): self.frameTime = buf_time self.initStatus() frameQue = Queue.Queue() wfReader = self.getWavReader(waveFile) if wfReader is None: return nChannels, width, frameRate, nframes = wfReader.getparams()[0:4] stepnFrames = int(frameRate*buf_time) times = int(nframes/stepnFrames) print "analyWav:",buf_time,stepnFrames,times,frameQue.qsize() for i in range(times): frameL = wfReader.readframes(stepnFrames) frameQue.put_nowait(frameL) print "analyWav:", buf_time, stepnFrames, times, frameQue.qsize() # self.startFramesAnalysis(frameQue, nChannels, width,timeL) # self.endFramesAnalysis() self.isAF = False self.anaysisFrames(frameQue, nChannels, width,buf_time) wfReader.close() if __name__ == "__main__": analysis = AAnalysis() # waveFile = "test1.wav" waveFile = "eq_10khz_v0.wav" # waveFile = "tv/DVB_DTV_automatic_search.wav" # waveFile = "wav_balance_v15.wav" # waveFile = r"D:\sound\sound_preset\sound_preset_mode_music.wav" # waveFile = r"D:\sound\5k\eq_5khz_v100.wav" # waveFile = r"D:\sound\monitorSound_balance.wav" # analysis.showLanguage(waveFile=waveFile) # analysis.showFregForm(waveFile) # analysis.showWaveForm(waveFile) analysis.analyWav(waveFile, buf_time=1) analysis.getFFTFreq() # print "sound,status hasSound,hasBlock,hasPlosive:",analysis.hasSound,analysis.hasBlock,analysis.hasPlosive # print "0,getSoundAVGPower:", analysis.getSoundAVGPower(LR=True) # print "0,getTotalAVGPower:", analysis.getTotalAVGPower(LR=True) # print "0,getFFTPower:",analysis.getFFTPower(LR=True) # # waveFile = "eq_10khz_v100.wav" # analysis.analyWav(waveFile) # print "100,avgPower:", analysis.getSoundAVGPower() # print "100,fftPower:",analysis.getFFTPower()