123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469 |
- # -*- coding:utf-8 -*-
- import os, sys, time
- from audio_recorder import READ_BUF_TIME,ARecorder
- from ssat_sdk.sat_environment import getSoundList
- import numpy as np
- import matplotlib.pyplot as plt
- import wave
- import thread
- import Queue
- def getSoundLevel():
- hasSoundLevel, noSoundLevel = getSoundList()
- return round(hasSoundLevel, 2)
- TH_POWER = getSoundLevel() #平均短时能有无声的阈值
- TH_PLOSIVE_TIME = 0.1 #短于0.1秒的声音,判断为爆破音,大于0.1秒,判断为正常声音
- TH_BLOCK_TIME = 0.1#声音间断间隔时间,单位s
- class AAnalysis():
- TH_POWER = getSoundLevel() # 平均短时能有无声的阈值
- def __init__(self):
- self.varList = [] #记录各声道有无声变化
- self.nChannels = 2
- self.initStatus()
- def initStatus(self):
- self.power = 0
- self.hasSound = False
- self.hasBlock = False
- self.hasPlosive = False
- self.soundPowerCHS = []
- self.allPowerCHS = []
- self.fftPowerCHS = []
- def showLanguage(self, time=5, waveFile=None):
- if waveFile is None:
- recorder = ARecorder()
- waveFile = "test.wav"
- recorder.recordWave(waveFile,time)
- # 调用wave模块中的open函数,打开语音文件。
- f = wave.open(waveFile, 'rb')
- # 得到语音参数
- params = f.getparams()
- nchannels, sampwidth, framerate, nframes = params[:4]
- # 得到的数据是字符串,需要将其转成int型
- strData = f.readframes(nframes)
- wavaData = np.fromstring(strData, dtype=np.int16)
- # 归一化
- wavaData = wavaData * 1.0 / max(abs(wavaData))
- # .T 表示转置
- wavaData = np.reshape(wavaData, [nframes, nchannels]).T
- f.close()
- # 绘制频谱
- plt.specgram(wavaData[0], Fs=framerate, scale_by_freq=True, sides='default')
- plt.ylabel('Frequency')
- plt.xlabel('Time(s)')
- plt.show()
- def showFregForm(self,waveFile):
- wf = wave.open(waveFile, "rb")
- nframes = wf.getnframes()
- framerate = wf.getframerate()
- frame_data = wf.readframes(nframes)
- wf.close()
- time_data = np.fromstring(frame_data, dtype=np.int16)
- time_data.shape = -1, 2
- time_dataT = time_data.T
- freq = [n for n in range(0, framerate)]
- start = 0
- end = framerate
- time_dataT2 = time_dataT[0][start:start + end]
- # self.time2Frequence(time_dataT2[2000:2000+1000])
- c = self.time2Frequence(time_dataT2, 1)
- print "showFregForm:c,", c.shape, np.max(np.abs(c)), np.average(np.abs(c))
- for i in range(1,5):
- start = framerate*i
- end = framerate*(i+1)
- print "showFregForm:start,end:",start,end
- time_dataT2 = time_dataT[0][start:end]
- c = self.time2Frequence(time_dataT2, 1)
- print "showFregForm:c,", c.shape, np.max(np.abs(c)), np.average(np.abs(c)),time_dataT2.shape
- plt.plot(freq[:], abs(c[:]), 'r')
- plt.show()
- def showWaveForm(self, waveFile):
- # -*- coding: utf-8 -*-
- # 打开WAV文档
- f = wave.open(waveFile, "rb")
- # 读取格式信息
- # (nchannels, sampwidth, framerate, nframes, comptype, compname)
- params = f.getparams()
- nchannels, sampwidth, framerate, nframes = params[:4]
- # 读取波形数据
- print "nchannels, sampwidth, framerate, nframes:",nchannels, sampwidth, framerate, nframes
- str_data = f.readframes(nframes)
- f.close()
- # 将波形数据转换为数组
- print "str_data:",str_data.__len__()
- wave_data = np.fromstring(str_data, dtype=np.int16)
- print "wave_data.shape:",wave_data.shape
- # 声道处理
- wave_data.shape = -1, nchannels
- wave_data = wave_data.T
- start = 0
- per_frames = int(framerate * READ_BUF_TIME)
- chunks = int(nframes/framerate/READ_BUF_TIME)
- for i in range(2,chunks):
- start = per_frames*i
- end = per_frames * (i + 1)
- self.getFrameAVGPower(wave_data[0][start:end])
- self.getFrameFFTPower(wave_data[0][start:end],READ_BUF_TIME)
- # self.STE(wave_data[0][start:end])
- # self.ZCR(wave_data[0][start:end])
- self.STE(wave_data[0])
- # 声道处理 End
- print "channel 0:",wave_data[0].shape, len(wave_data[0])
- print "channel 1:", wave_data[1].shape, len(wave_data[1])
- # time = np.arange(0, nframes) * (1.0 / framerate)
- time = np.arange(0, nframes) * (1.0 / framerate)
- print "time:", time.shape
- # 绘制波形
- plt.subplot(211)
- plt.plot(time, wave_data[0], "b")
- plt.subplot(212)
- plt.plot(time, wave_data[1], c="g")
- plt.xlabel("time (seconds)")
- plt.ylabel("power (hz)")
- plt.show()
- '''
- 计算帧短时能
- '''
- def STE(self, frameL):
- amp = np.sum(np.abs(frameL))
- print "STE amp:",amp
- return amp
- '''
- 将列表,转换成numpy的数组。
- '''
- def list2NP(self, srcList):
- if srcList.__len__() > 0:
- return np.array(srcList)
- else:
- return np.array([-1])
- '''
- 计算有声的平均强度,即帧的强度超过有声阈值,才被统计
- '''
- def getSoundAVGPower(self, LR=False):
- # print "getSoundAVGPower,self.soundPowerCHS:",self.soundPowerCHS
- if LR is False:
- if self.soundPowerCHS.__len__() < 1:
- return 0
- soundPowersL = self.list2NP(self.soundPowerCHS[0])
- if self.soundPowerCHS.__len__() == 2:
- soundPowersR = self.list2NP(self.soundPowerCHS[1])
- soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR)
- return np.average(soundPowers)
- return np.average(soundPowersL)
- else:
- if self.soundPowerCHS.__len__() < 1:
- return 0,0
- soundPowersL = self.list2NP(self.soundPowerCHS[0])
- soundPowersR = self.list2NP(self.soundPowerCHS[1])
- return np.average(soundPowersL),np.average(soundPowersR)
- '''
- :return result,valueList。
- result:-1代表声音检测异常;1表示单声道;2 表示双声道
- valueList:-1代表没有声音,0代表仅左声道,1代表仅右声道,2代表左右声道有声
- '''
- def getLRVariation(self):
- varList = []
- if self.soundPowerCHS.__len__() < 1:
- return -1, self.varList
- elif self.soundPowerCHS.__len__() == 1:
- return 1, self.varList
- else:
- return 2, self.varList
- varLRCount = 0
- def recLRVariation(self, channelPowers):
- ret = self.calLRVar(channelPowers)
- if self.varList.__len__() == 0:
- self.varList.append(ret)
- else:
- if self.varList[self.varList.__len__()-1] <> ret:
- self.varLRCount += 1
- if self.varLRCount * self.frameTime > 0.5:#大于0.5秒钟声音,才算入变化
- self.varList.append(ret)
- else:
- self.varLRCount = 0
- def calLRVar(self, channelPowers):
- if channelPowers.__len__() == 1:
- #单声道,左声道有声,或者无声
- return 0 if channelPowers[0] >= TH_POWER else -1
- else:
- if channelPowers[0] >= TH_POWER and channelPowers[1] >= TH_POWER:
- return 2
- elif channelPowers[0] >= TH_POWER and channelPowers[1] < TH_POWER:
- return 0
- elif channelPowers[0] < TH_POWER and channelPowers[1] >= TH_POWER:
- return 1
- else:
- return -1
- '''
- 计算整个录音的声音平均强度,实际意义不太大,除非是固定幅度音频检测。
- '''
- def getTotalAVGPower(self, LR=False):
- if LR is False:
- if self.allPowerCHS.__len__() < 1:
- return 0
- soundPowersL = self.list2NP(self.allPowerCHS[0])
- if self.allPowerCHS.__len__() == 2:
- soundPowersR = self.list2NP(self.allPowerCHS[1])
- soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR)
- return np.average(soundPowers)
- else:
- return np.average(soundPowersL)
- else:
- if self.allPowerCHS.__len__() < 1:
- return 0,0
- soundPowersL = self.list2NP(self.allPowerCHS[0])
- soundPowersR = self.list2NP(self.allPowerCHS[1])
- return np.average(soundPowersL),np.average(soundPowersR)
- '''
- 计算帧的平均声音强度
- '''
- def getFrameAVGPower(self, frameL):
- avgPower = np.average(np.abs(frameL))
- # print "getFrameAVGPower:",avgPower
- return avgPower
- '''
- 计算帧的平均声音强度
- '''
- def getFrameMaxPower(self, frameL):
- maxPower = np.max(np.abs(frameL))
- # print "getFrameMaxPower:",maxPower
- return maxPower
- '''
- 根据傅里叶变化,计算声音强度
- '''
- def getFrameFFTPower(self, frameL, timeL):
- fftFreq = self.time2Frequence(frameL, timeL)
- fftPower = np.max(np.abs(fftFreq))
- # print "getFrameFFTPower:",fftPower
- return fftPower
- '''
- 计算整个录音过程的fft转换后得到的声音强度
- '''
- def getFFTPower(self, LR=False):
- if LR is False:
- if self.fftPowerCHS.__len__() < 1:
- return 0
- soundPowersL = self.list2NP(self.fftPowerCHS[0])
- if self.fftPowerCHS.__len__() == 2:
- soundPowersR = self.list2NP(self.fftPowerCHS[1])
- soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR)
- return np.average(soundPowers)
- return np.average(soundPowersL)
- else:
- if self.fftPowerCHS.__len__() < 1:
- return 0,0
- soundPowersL = self.list2NP(self.fftPowerCHS[0])
- soundPowersR = self.list2NP(self.fftPowerCHS[1])
- return np.average(soundPowersL),np.average(soundPowersR)
- '''
- 计算帧过零率
- '''
- def ZCR(self,curFrame):
- # 过零率
- tmp1 = curFrame[:-1] #
- tmp2 = curFrame[1:]
- sings = tmp1 * tmp2 <= 0 #帧左右错位1,如果相邻2个数一正一负,则相乘后小于<0,表示1次过零。
- zcr = float(np.sum(sings)) / len(sings)
- print "ZCR:",zcr
- return zcr
- '''
- 将声音时域数据,转换成频谱数据。
- 注意:如果帧时长,不是1秒,无法建立频谱图。
- '''
- def time2Frequence(self, frameL, timeL=1.0):
- fftFreq = np.fft.fft(frameL) * 2 / len(frameL)
- # freq = np.arange(0,len(frameL),1)
- # plt.plot(freq[:], abs(fftFreq[:]), 'r')
- # plt.show()
- return fftFreq
- '''
- 展现一帧音频的时域波形
- '''
- def showFrameTime(self, frameL, timeL, width):
- print "showFrameTime,frameL:", frameL.shape, np.ndim(frameL)
- ndim = np.ndim(frameL)
- if ndim == 1:
- frameCount = len(frameL)
- time = np.arange(0, frameCount) * (timeL / frameCount)
- print "showFrameTime,time:",time.shape
- plt.plot(time, frameL, "g")
- plt.xlabel("time (seconds)")
- plt.ylabel("power (hz)")
- plt.show()
- elif ndim == 2:
- rows, cols = frameL.shape
- time = np.arange(0, cols) * (timeL / cols)
- for i in range(rows):
- plt.subplot(rows,1,i+1)
- plt.plot(time, frameL[i])
- plt.xlabel("time (seconds)")
- plt.ylabel("power (hz)")
- plt.show()
- def getNPDtype(self, byteNum):
- if byteNum == 1:
- return np.int8
- elif byteNum == 2:
- return np.int16
- else:
- return np.int16
- '''
- 用于音频录制过程中的帧分析。分析结束后,可以使用get类型接口,获取结果,例如:getSoundAVGPower
- 分析fft声音频谱时,timeL=1秒才有意义,才能建立频谱图。
- 声音有无判断:双声道,只要一个声道有声,判断为有声。
- '''
- def anaysisFrames(self, frameQue, channels, width, frameTime):
- self.frameTime = frameTime
- self.nChannels = channels
- self.varList = []
- soundCount = 0
- noSoundCount = 0
- self.soundPowerCHS = []
- self.allPowerCHS = []
- self.fftPowerCHS = []
- for i in range(channels):
- self.soundPowerCHS.append([])
- self.allPowerCHS.append([])
- self.fftPowerCHS.append([])
- while self.isAF is True or frameQue.qsize() > 0:
- frames = frameQue.get()
- dtype = self.getNPDtype(width)
- # print "anaysisFrames,dtype size:", len(frames), channels, dtype
- frameL = np.fromstring(frames, dtype=dtype)
- frameL.shape = -1,channels
- frameL = frameL.T
- channelPowers = []
- for i in range(channels):
- avgPower = self.getFrameMaxPower(frameL[i])
- self.allPowerCHS[i].append(avgPower)
- fftPower = self.getFrameFFTPower(frameL[i], frameTime)
- self.fftPowerCHS[i].append(fftPower)
- channelPowers.append(avgPower)
- #每一帧各通道数值计算完毕,开始进行帧的有无声判断
- if i == channels - 1:
- #获取当前帧的各通道最强声音强度。
- framePower = self.getCHMaxPower(channelPowers)
- #左右声道有无声的记录
- self.recLRVariation(channelPowers)
- del channelPowers
- if framePower >= TH_POWER:
- noSoundCount = 0
- soundCount += 1
- if soundCount * frameTime >= TH_PLOSIVE_TIME:
- self.hasSound = True
- else:
- # 爆破音检测。还缺失:有声音的情况下,爆破音检测。
- if soundCount * frameTime < TH_PLOSIVE_TIME and soundCount > 0:
- self.hasPlosive = True
- noSoundCount += 1
- soundCount = 0
- if noSoundCount * frameTime >= TH_BLOCK_TIME:
- self.hasBlock = True
- #左右声道分开记录有声的帧的声音强度
- if avgPower >= TH_POWER:
- self.soundPowerCHS[i].append(avgPower)
- '''
- 返回当前帧的各通道的最大音量
- '''
- def getCHMaxPower(self, channelPowers):
- channelPowers_np = self.list2NP(channelPowers)
- return np.max(channelPowers_np)
- '''开启音频后台分析线程'''
- def startFramesAnalysis(self, frameQue, channels, width, buf_time):
- self.initStatus()
- self.isAF = True
- thread.start_new_thread(self.anaysisFrames, (frameQue,channels,width, buf_time))
- '''关闭音频后台分析,需要数据全部处理完后,线程才会停止'''
- def endFramesAnalysis(self):
- self.isAF = False
- def getWavReader(self, waveFile):
- wfReader = None
- try:
- wfReader = wave.open(waveFile,"rb")
- except Exception as e:
- print "Wave不存在",e
- return wfReader
- '''
- 分析wav音频文件后,可以使用get类型接口,获取结果,例如:getTotalAVGPower
- 分析fft声音频谱时,timeL=1秒才有意义,才能建立频谱图。
- '''
- def analyWav(self, waveFile, buf_time=READ_BUF_TIME):
- self.frameTime = buf_time
- self.initStatus()
- frameQue = Queue.Queue()
- wfReader = self.getWavReader(waveFile)
- if wfReader is None:
- return
- nChannels, width, frameRate, nframes = wfReader.getparams()[0:4]
- stepnFrames = int(frameRate*buf_time)
- times = int(nframes/stepnFrames)
- print "analyWav:",buf_time,stepnFrames,times
- for i in range(times):
- frameL = wfReader.readframes(stepnFrames)
- frameQue.put_nowait(frameL)
- # self.startFramesAnalysis(frameQue, nChannels, width,timeL)
- # self.endFramesAnalysis()
- self.isAF = False
- self.anaysisFrames(frameQue, nChannels, width,buf_time)
- wfReader.close()
- if __name__ == "__main__":
- analysis = AAnalysis()
- # waveFile = "test1.wav"
- # waveFile = "eq_10khz_v0.wav"
- # waveFile = "tv/DVB_DTV_automatic_search.wav"
- waveFile = "wav_balance_v15.wav"
- # waveFile = r"D:\sound\sound_preset\sound_preset_mode_music.wav"
- # waveFile = r"D:\sound\5k\eq_5khz_v100.wav"
- # waveFile = r"D:\sound\monitorSound_balance.wav"
- # analysis.showLanguage(waveFile=waveFile)
- # analysis.showFregForm(waveFile)
- # analysis.showWaveForm(waveFile)
- analysis.analyWav(waveFile)
- print "sound,status hasSound,hasBlock,hasPlosive:",analysis.hasSound,analysis.hasBlock,analysis.hasPlosive
- print "0,getSoundAVGPower:", analysis.getSoundAVGPower(LR=True)
- print "0,getTotalAVGPower:", analysis.getTotalAVGPower(LR=True)
- print "0,getFFTPower:",analysis.getFFTPower(LR=True)
- #
- # waveFile = "eq_10khz_v100.wav"
- # analysis.analyWav(waveFile)
- # print "100,avgPower:", analysis.getSoundAVGPower()
- # print "100,fftPower:",analysis.getFFTPower()
|