audio_analysis.py 17 KB


  1. # -*- coding:utf-8 -*-
  2. import os, sys, time
  3. from audio_recorder import READ_BUF_TIME,ARecorder
  4. from ssat_sdk.sat_environment import getSoundList
  5. import numpy as np
  6. import matplotlib.pyplot as plt
  7. import wave
  8. import thread
  9. import Queue
  10. def getSoundLevel():
  11. hasSoundLevel, noSoundLevel = getSoundList()
  12. return round(hasSoundLevel, 2)
  13. TH_POWER = getSoundLevel() #平均短时能有无声的阈值
  14. TH_PLOSIVE_TIME = 0.1 #短于0.1秒的声音,判断为爆破音,大于0.1秒,判断为正常声音
  15. TH_BLOCK_TIME = 0.1#声音间断间隔时间,单位s
  16. class AAnalysis():
  17. TH_POWER = getSoundLevel() # 平均短时能有无声的阈值
  18. def __init__(self):
  19. self.varList = [] #记录各声道有无声变化
  20. self.nChannels = 2
  21. self.initStatus()
  22. def initStatus(self):
  23. self.power = 0
  24. self.hasSound = False
  25. self.hasBlock = False
  26. self.hasPlosive = False
  27. self.soundPowerCHS = []
  28. self.allPowerCHS = []
  29. self.fftPowerCHS = []
  30. def showLanguage(self, time=5, waveFile=None):
  31. if waveFile is None:
  32. recorder = ARecorder()
  33. waveFile = "test.wav"
  34. recorder.recordWave(waveFile,time)
  35. # 调用wave模块中的open函数,打开语音文件。
  36. f = wave.open(waveFile, 'rb')
  37. # 得到语音参数
  38. params = f.getparams()
  39. nchannels, sampwidth, framerate, nframes = params[:4]
  40. # 得到的数据是字符串,需要将其转成int型
  41. strData = f.readframes(nframes)
  42. wavaData = np.fromstring(strData, dtype=np.int16)
  43. # 归一化
  44. wavaData = wavaData * 1.0 / max(abs(wavaData))
  45. # .T 表示转置
  46. wavaData = np.reshape(wavaData, [nframes, nchannels]).T
  47. f.close()
  48. # 绘制频谱
  49. plt.specgram(wavaData[0], Fs=framerate, scale_by_freq=True, sides='default')
  50. plt.ylabel('Frequency')
  51. plt.xlabel('Time(s)')
  52. plt.show()
  53. def showFregForm(self,waveFile):
  54. wf = wave.open(waveFile, "rb")
  55. nframes = wf.getnframes()
  56. framerate = wf.getframerate()
  57. frame_data = wf.readframes(nframes)
  58. wf.close()
  59. time_data = np.fromstring(frame_data, dtype=np.int16)
  60. time_data.shape = -1, 2
  61. time_dataT = time_data.T
  62. freq = [n for n in range(0, framerate)]
  63. start = 0
  64. end = framerate
  65. time_dataT2 = time_dataT[0][start:start + end]
  66. # self.time2Frequence(time_dataT2[2000:2000+1000])
  67. c = self.time2Frequence(time_dataT2, 1)
  68. print "showFregForm:c,", c.shape, np.max(np.abs(c)), np.average(np.abs(c))
  69. for i in range(1,5):
  70. start = framerate*i
  71. end = framerate*(i+1)
  72. print "showFregForm:start,end:",start,end
  73. time_dataT2 = time_dataT[0][start:end]
  74. c = self.time2Frequence(time_dataT2, 1)
  75. print "showFregForm:c,", c.shape, np.max(np.abs(c)), np.average(np.abs(c)),time_dataT2.shape
  76. plt.plot(freq[:], abs(c[:]), 'r')
  77. plt.show()
  78. def showWaveForm(self, waveFile):
  79. # -*- coding: utf-8 -*-
  80. # 打开WAV文档
  81. f = wave.open(waveFile, "rb")
  82. # 读取格式信息
  83. # (nchannels, sampwidth, framerate, nframes, comptype, compname)
  84. params = f.getparams()
  85. nchannels, sampwidth, framerate, nframes = params[:4]
  86. # 读取波形数据
  87. print "nchannels, sampwidth, framerate, nframes:",nchannels, sampwidth, framerate, nframes
  88. str_data = f.readframes(nframes)
  89. f.close()
  90. # 将波形数据转换为数组
  91. print "str_data:",str_data.__len__()
  92. wave_data = np.fromstring(str_data, dtype=np.int16)
  93. print "wave_data.shape:",wave_data.shape
  94. # 声道处理
  95. wave_data.shape = -1, nchannels
  96. wave_data = wave_data.T
  97. start = 0
  98. per_frames = int(framerate * READ_BUF_TIME)
  99. chunks = int(nframes/framerate/READ_BUF_TIME)
  100. for i in range(2,chunks):
  101. start = per_frames*i
  102. end = per_frames * (i + 1)
  103. self.getFrameAVGPower(wave_data[0][start:end])
  104. self.getFrameFFTPower(wave_data[0][start:end],READ_BUF_TIME)
  105. # self.STE(wave_data[0][start:end])
  106. # self.ZCR(wave_data[0][start:end])
  107. self.STE(wave_data[0])
  108. # 声道处理 End
  109. print "channel 0:",wave_data[0].shape, len(wave_data[0])
  110. print "channel 1:", wave_data[1].shape, len(wave_data[1])
  111. # time = np.arange(0, nframes) * (1.0 / framerate)
  112. time = np.arange(0, nframes) * (1.0 / framerate)
  113. print "time:", time.shape
  114. # 绘制波形
  115. plt.subplot(211)
  116. plt.plot(time, wave_data[0], "b")
  117. plt.subplot(212)
  118. plt.plot(time, wave_data[1], c="g")
  119. plt.xlabel("time (seconds)")
  120. plt.ylabel("power (hz)")
  121. plt.show()
  122. '''
  123. 计算帧短时能
  124. '''
  125. def STE(self, frameL):
  126. amp = np.sum(np.abs(frameL))
  127. print "STE amp:",amp
  128. return amp
  129. '''
  130. 将列表,转换成numpy的数组。
  131. '''
  132. def list2NP(self, srcList):
  133. if srcList.__len__() > 0:
  134. return np.array(srcList)
  135. else:
  136. return np.array([-1])
  137. '''
  138. 计算有声的平均强度,即帧的强度超过有声阈值,才被统计
  139. '''
  140. def getSoundAVGPower(self, LR=False):
  141. # print "getSoundAVGPower,self.soundPowerCHS:",self.soundPowerCHS
  142. if LR is False:
  143. if self.soundPowerCHS.__len__() < 1:
  144. return 0
  145. soundPowersL = self.list2NP(self.soundPowerCHS[0])
  146. if self.soundPowerCHS.__len__() == 2:
  147. soundPowersR = self.list2NP(self.soundPowerCHS[1])
  148. soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR)
  149. return np.average(soundPowers)
  150. return np.average(soundPowersL)
  151. else:
  152. if self.soundPowerCHS.__len__() < 1:
  153. return 0,0
  154. soundPowersL = self.list2NP(self.soundPowerCHS[0])
  155. soundPowersR = self.list2NP(self.soundPowerCHS[1])
  156. return np.average(soundPowersL),np.average(soundPowersR)
  157. '''
  158. :return result,valueList。
  159. result:-1代表声音检测异常;1表示单声道;2 表示双声道
  160. valueList:-1代表没有声音,0代表仅左声道,1代表仅右声道,2代表左右声道有声
  161. '''
  162. def getLRVariation(self):
  163. varList = []
  164. if self.soundPowerCHS.__len__() < 1:
  165. return -1, self.varList
  166. elif self.soundPowerCHS.__len__() == 1:
  167. return 1, self.varList
  168. else:
  169. return 2, self.varList
  170. varLRCount = 0
  171. def recLRVariation(self, channelPowers):
  172. ret = self.calLRVar(channelPowers)
  173. if self.varList.__len__() == 0:
  174. self.varList.append(ret)
  175. else:
  176. if self.varList[self.varList.__len__()-1] <> ret:
  177. self.varLRCount += 1
  178. if self.varLRCount * self.frameTime > 0.5:#大于0.5秒钟声音,才算入变化
  179. self.varList.append(ret)
  180. else:
  181. self.varLRCount = 0
  182. def calLRVar(self, channelPowers):
  183. if channelPowers.__len__() == 1:
  184. #单声道,左声道有声,或者无声
  185. return 0 if channelPowers[0] >= TH_POWER else -1
  186. else:
  187. if channelPowers[0] >= TH_POWER and channelPowers[1] >= TH_POWER:
  188. return 2
  189. elif channelPowers[0] >= TH_POWER and channelPowers[1] < TH_POWER:
  190. return 0
  191. elif channelPowers[0] < TH_POWER and channelPowers[1] >= TH_POWER:
  192. return 1
  193. else:
  194. return -1
  195. '''
  196. 计算整个录音的声音平均强度,实际意义不太大,除非是固定幅度音频检测。
  197. '''
  198. def getTotalAVGPower(self, LR=False):
  199. if LR is False:
  200. if self.allPowerCHS.__len__() < 1:
  201. return 0
  202. soundPowersL = self.list2NP(self.allPowerCHS[0])
  203. if self.allPowerCHS.__len__() == 2:
  204. soundPowersR = self.list2NP(self.allPowerCHS[1])
  205. soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR)
  206. return np.average(soundPowers)
  207. else:
  208. return np.average(soundPowersL)
  209. else:
  210. if self.allPowerCHS.__len__() < 1:
  211. return 0,0
  212. soundPowersL = self.list2NP(self.allPowerCHS[0])
  213. soundPowersR = self.list2NP(self.allPowerCHS[1])
  214. return np.average(soundPowersL),np.average(soundPowersR)
  215. '''
  216. 计算帧的平均声音强度
  217. '''
  218. def getFrameAVGPower(self, frameL):
  219. avgPower = np.average(np.abs(frameL))
  220. # print "getFrameAVGPower:",avgPower
  221. return avgPower
  222. '''
  223. 计算帧的平均声音强度
  224. '''
  225. def getFrameMaxPower(self, frameL):
  226. maxPower = np.max(np.abs(frameL))
  227. # print "getFrameMaxPower:",maxPower
  228. return maxPower
  229. '''
  230. 根据傅里叶变化,计算声音强度
  231. '''
  232. def getFrameFFTPower(self, frameL, timeL):
  233. fftFreq = self.time2Frequence(frameL, timeL)
  234. fftPower = np.max(np.abs(fftFreq))
  235. # print "getFrameFFTPower:",fftPower
  236. return fftPower
  237. '''
  238. 计算整个录音过程的fft转换后得到的声音强度
  239. '''
  240. def getFFTPower(self, LR=False):
  241. if LR is False:
  242. if self.fftPowerCHS.__len__() < 1:
  243. return 0
  244. soundPowersL = self.list2NP(self.fftPowerCHS[0])
  245. if self.fftPowerCHS.__len__() == 2:
  246. soundPowersR = self.list2NP(self.fftPowerCHS[1])
  247. soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR)
  248. return np.average(soundPowers)
  249. return np.average(soundPowersL)
  250. else:
  251. if self.fftPowerCHS.__len__() < 1:
  252. return 0,0
  253. soundPowersL = self.list2NP(self.fftPowerCHS[0])
  254. soundPowersR = self.list2NP(self.fftPowerCHS[1])
  255. return np.average(soundPowersL),np.average(soundPowersR)
  256. '''
  257. 计算帧过零率
  258. '''
  259. def ZCR(self,curFrame):
  260. # 过零率
  261. tmp1 = curFrame[:-1] #
  262. tmp2 = curFrame[1:]
  263. sings = tmp1 * tmp2 <= 0 #帧左右错位1,如果相邻2个数一正一负,则相乘后小于<0,表示1次过零。
  264. zcr = float(np.sum(sings)) / len(sings)
  265. print "ZCR:",zcr
  266. return zcr
  267. '''
  268. 将声音时域数据,转换成频谱数据。
  269. 注意:如果帧时长,不是1秒,无法建立频谱图。
  270. '''
  271. def time2Frequence(self, frameL, timeL=1.0):
  272. fftFreq = np.fft.fft(frameL) * 2 / len(frameL)
  273. # freq = np.arange(0,len(frameL),1)
  274. # plt.plot(freq[:], abs(fftFreq[:]), 'r')
  275. # plt.show()
  276. return fftFreq
  277. '''
  278. 展现一帧音频的时域波形
  279. '''
  280. def showFrameTime(self, frameL, timeL, width):
  281. print "showFrameTime,frameL:", frameL.shape, np.ndim(frameL)
  282. ndim = np.ndim(frameL)
  283. if ndim == 1:
  284. frameCount = len(frameL)
  285. time = np.arange(0, frameCount) * (timeL / frameCount)
  286. print "showFrameTime,time:",time.shape
  287. plt.plot(time, frameL, "g")
  288. plt.xlabel("time (seconds)")
  289. plt.ylabel("power (hz)")
  290. plt.show()
  291. elif ndim == 2:
  292. rows, cols = frameL.shape
  293. time = np.arange(0, cols) * (timeL / cols)
  294. for i in range(rows):
  295. plt.subplot(rows,1,i+1)
  296. plt.plot(time, frameL[i])
  297. plt.xlabel("time (seconds)")
  298. plt.ylabel("power (hz)")
  299. plt.show()
  300. def getNPDtype(self, byteNum):
  301. if byteNum == 1:
  302. return np.int8
  303. elif byteNum == 2:
  304. return np.int16
  305. else:
  306. return np.int16
  307. '''
  308. 用于音频录制过程中的帧分析。分析结束后,可以使用get类型接口,获取结果,例如:getSoundAVGPower
  309. 分析fft声音频谱时,timeL=1秒才有意义,才能建立频谱图。
  310. 声音有无判断:双声道,只要一个声道有声,判断为有声。
  311. '''
  312. def anaysisFrames(self, frameQue, channels, width, frameTime):
  313. self.frameTime = frameTime
  314. self.nChannels = channels
  315. self.varList = []
  316. soundCount = 0
  317. noSoundCount = 0
  318. self.soundPowerCHS = []
  319. self.allPowerCHS = []
  320. self.fftPowerCHS = []
  321. for i in range(channels):
  322. self.soundPowerCHS.append([])
  323. self.allPowerCHS.append([])
  324. self.fftPowerCHS.append([])
  325. while self.isAF is True or frameQue.qsize() > 0:
  326. frames = frameQue.get()
  327. dtype = self.getNPDtype(width)
  328. # print "anaysisFrames,dtype size:", len(frames), channels, dtype
  329. frameL = np.fromstring(frames, dtype=dtype)
  330. frameL.shape = -1,channels
  331. frameL = frameL.T
  332. channelPowers = []
  333. for i in range(channels):
  334. avgPower = self.getFrameMaxPower(frameL[i])
  335. self.allPowerCHS[i].append(avgPower)
  336. fftPower = self.getFrameFFTPower(frameL[i], frameTime)
  337. self.fftPowerCHS[i].append(fftPower)
  338. channelPowers.append(avgPower)
  339. #每一帧各通道数值计算完毕,开始进行帧的有无声判断
  340. if i == channels - 1:
  341. #获取当前帧的各通道最强声音强度。
  342. framePower = self.getCHMaxPower(channelPowers)
  343. #左右声道有无声的记录
  344. self.recLRVariation(channelPowers)
  345. del channelPowers
  346. if framePower >= TH_POWER:
  347. noSoundCount = 0
  348. soundCount += 1
  349. if soundCount * frameTime >= TH_PLOSIVE_TIME:
  350. self.hasSound = True
  351. else:
  352. # 爆破音检测。还缺失:有声音的情况下,爆破音检测。
  353. if soundCount * frameTime < TH_PLOSIVE_TIME and soundCount > 0:
  354. self.hasPlosive = True
  355. noSoundCount += 1
  356. soundCount = 0
  357. if noSoundCount * frameTime >= TH_BLOCK_TIME:
  358. self.hasBlock = True
  359. #左右声道分开记录有声的帧的声音强度
  360. if avgPower >= TH_POWER:
  361. self.soundPowerCHS[i].append(avgPower)
  362. '''
  363. 返回当前帧的各通道的最大音量
  364. '''
  365. def getCHMaxPower(self, channelPowers):
  366. channelPowers_np = self.list2NP(channelPowers)
  367. return np.max(channelPowers_np)
  368. '''开启音频后台分析线程'''
  369. def startFramesAnalysis(self, frameQue, channels, width, buf_time):
  370. self.initStatus()
  371. self.isAF = True
  372. thread.start_new_thread(self.anaysisFrames, (frameQue,channels,width, buf_time))
  373. '''关闭音频后台分析,需要数据全部处理完后,线程才会停止'''
  374. def endFramesAnalysis(self):
  375. self.isAF = False
  376. def getWavReader(self, waveFile):
  377. wfReader = None
  378. try:
  379. wfReader = wave.open(waveFile,"rb")
  380. except Exception as e:
  381. print "Wave不存在",e
  382. return wfReader
  383. '''
  384. 分析wav音频文件后,可以使用get类型接口,获取结果,例如:getTotalAVGPower
  385. 分析fft声音频谱时,timeL=1秒才有意义,才能建立频谱图。
  386. '''
  387. def analyWav(self, waveFile, buf_time=READ_BUF_TIME):
  388. self.frameTime = buf_time
  389. self.initStatus()
  390. frameQue = Queue.Queue()
  391. wfReader = self.getWavReader(waveFile)
  392. if wfReader is None:
  393. return
  394. nChannels, width, frameRate, nframes = wfReader.getparams()[0:4]
  395. stepnFrames = int(frameRate*buf_time)
  396. times = int(nframes/stepnFrames)
  397. print "analyWav:",buf_time,stepnFrames,times
  398. for i in range(times):
  399. frameL = wfReader.readframes(stepnFrames)
  400. frameQue.put_nowait(frameL)
  401. # self.startFramesAnalysis(frameQue, nChannels, width,timeL)
  402. # self.endFramesAnalysis()
  403. self.isAF = False
  404. self.anaysisFrames(frameQue, nChannels, width,buf_time)
  405. wfReader.close()
  406. if __name__ == "__main__":
  407. analysis = AAnalysis()
  408. # waveFile = "test1.wav"
  409. # waveFile = "eq_10khz_v0.wav"
  410. # waveFile = "tv/DVB_DTV_automatic_search.wav"
  411. waveFile = "wav_balance_v15.wav"
  412. # waveFile = r"D:\sound\sound_preset\sound_preset_mode_music.wav"
  413. # waveFile = r"D:\sound\5k\eq_5khz_v100.wav"
  414. # waveFile = r"D:\sound\monitorSound_balance.wav"
  415. # analysis.showLanguage(waveFile=waveFile)
  416. # analysis.showFregForm(waveFile)
  417. # analysis.showWaveForm(waveFile)
  418. analysis.analyWav(waveFile)
  419. print "sound,status hasSound,hasBlock,hasPlosive:",analysis.hasSound,analysis.hasBlock,analysis.hasPlosive
  420. print "0,getSoundAVGPower:", analysis.getSoundAVGPower(LR=True)
  421. print "0,getTotalAVGPower:", analysis.getTotalAVGPower(LR=True)
  422. print "0,getFFTPower:",analysis.getFFTPower(LR=True)
  423. #
  424. # waveFile = "eq_10khz_v100.wav"
  425. # analysis.analyWav(waveFile)
  426. # print "100,avgPower:", analysis.getSoundAVGPower()
  427. # print "100,fftPower:",analysis.getFFTPower()