audio_analysis.py 19 KB


  1. # -*- coding:utf-8 -*-
  2. import os, sys, time
  3. from audio_recorder import READ_BUF_TIME,ARecorder
  4. from ssat_sdk.sat_environment import getSoundList
  5. import numpy as np
  6. import matplotlib.pyplot as plt
  7. import wave
  8. import thread
  9. import Queue
  10. def getSoundLevel():
  11. hasSoundLevel, noSoundLevel = getSoundList()
  12. return round(hasSoundLevel, 2)
  13. TH_POWER = getSoundLevel() #平均短时能有无声的阈值
  14. TH_PLOSIVE_TIME = 0.1 #短于0.1秒的声音,判断为爆破音,大于0.1秒,判断为正常声音
  15. TH_BLOCK_TIME = 0.1#声音间断间隔时间,单位s
  16. class AAnalysis():
  17. TH_POWER = getSoundLevel() # 平均短时能有无声的阈值
  18. def __init__(self):
  19. self.varList = [] #记录各声道有无声变化
  20. self.nChannels = 2
  21. self.initStatus()
  22. def initStatus(self):
  23. self.power = 0
  24. self.hasSound = False
  25. self.hasBlock = False
  26. self.hasPlosive = False
  27. self.soundPowerCHS = []
  28. self.allPowerCHS = []
  29. self.fftPowerCHS = []
  30. self.fftFrqCHS = []
  31. def showLanguage(self, time=5, waveFile=None):
  32. if waveFile is None:
  33. recorder = ARecorder()
  34. waveFile = "test.wav"
  35. recorder.recordWave(waveFile,time)
  36. # 调用wave模块中的open函数,打开语音文件。
  37. f = wave.open(waveFile, 'rb')
  38. # 得到语音参数
  39. params = f.getparams()
  40. nchannels, sampwidth, framerate, nframes = params[:4]
  41. # 得到的数据是字符串,需要将其转成int型
  42. strData = f.readframes(nframes)
  43. wavaData = np.fromstring(strData, dtype=np.int16)
  44. # 归一化
  45. wavaData = wavaData * 1.0 / max(abs(wavaData))
  46. # .T 表示转置
  47. wavaData = np.reshape(wavaData, [nframes, nchannels]).T
  48. f.close()
  49. # 绘制频谱
  50. plt.specgram(wavaData[0], Fs=framerate, scale_by_freq=True, sides='default')
  51. plt.ylabel('Frequency')
  52. plt.xlabel('Time(s)')
  53. plt.show()
  54. def showFregForm(self,waveFile):
  55. wf = wave.open(waveFile, "rb")
  56. nframes = wf.getnframes()
  57. framerate = wf.getframerate()
  58. frame_data = wf.readframes(nframes)
  59. wf.close()
  60. time_data = np.fromstring(frame_data, dtype=np.int16)
  61. time_data.shape = -1, 2
  62. time_dataT = time_data.T
  63. freq = [n for n in range(0, framerate)]
  64. start = 0
  65. end = framerate
  66. time_dataT2 = time_dataT[0][start:start + end]
  67. # self.time2Frequence(time_dataT2[2000:2000+1000])
  68. c = self.time2Frequence(time_dataT2, 1)
  69. print "showFregForm:c,", c.shape, np.max(np.abs(c)), np.average(np.abs(c))
  70. for i in range(1,5):
  71. start = framerate*i
  72. end = framerate*(i+1)
  73. print "showFregForm:start,end:",start,end
  74. time_dataT2 = time_dataT[0][start:end]
  75. c = self.time2Frequence(time_dataT2, 1)
  76. print "showFregForm:c,", c.shape, np.max(np.abs(c)), np.average(np.abs(c)),time_dataT2.shape
  77. plt.plot(freq[:], abs(c[:]), 'r')
  78. plt.show()
  79. def showWaveForm(self, waveFile):
  80. # -*- coding: utf-8 -*-
  81. # 打开WAV文档
  82. f = wave.open(waveFile, "rb")
  83. # 读取格式信息
  84. # (nchannels, sampwidth, framerate, nframes, comptype, compname)
  85. params = f.getparams()
  86. nchannels, sampwidth, framerate, nframes = params[:4]
  87. # 读取波形数据
  88. print "nchannels, sampwidth, framerate, nframes:",nchannels, sampwidth, framerate, nframes
  89. str_data = f.readframes(nframes)
  90. f.close()
  91. # 将波形数据转换为数组
  92. print "str_data:",str_data.__len__()
  93. wave_data = np.fromstring(str_data, dtype=np.int16)
  94. print "wave_data.shape:",wave_data.shape
  95. # 声道处理
  96. wave_data.shape = -1, nchannels
  97. wave_data = wave_data.T
  98. start = 0
  99. per_frames = int(framerate * READ_BUF_TIME)
  100. chunks = int(nframes/framerate/READ_BUF_TIME)
  101. for i in range(2,chunks):
  102. start = per_frames*i
  103. end = per_frames * (i + 1)
  104. self.getFrameAVGPower(wave_data[0][start:end])
  105. self.getFrameFFTPower(wave_data[0][start:end],READ_BUF_TIME)
  106. # self.STE(wave_data[0][start:end])
  107. # self.ZCR(wave_data[0][start:end])
  108. self.STE(wave_data[0])
  109. # 声道处理 End
  110. print "channel 0:",wave_data[0].shape, len(wave_data[0])
  111. print "channel 1:", wave_data[1].shape, len(wave_data[1])
  112. # time = np.arange(0, nframes) * (1.0 / framerate)
  113. time = np.arange(0, nframes) * (1.0 / framerate)
  114. print "time:", time.shape
  115. # 绘制波形
  116. plt.subplot(211)
  117. plt.plot(time, wave_data[0], "b")
  118. plt.subplot(212)
  119. plt.plot(time, wave_data[1], c="g")
  120. plt.xlabel("time (seconds)")
  121. plt.ylabel("power (hz)")
  122. plt.show()
  123. '''
  124. 计算帧短时能
  125. '''
  126. def STE(self, frameL):
  127. amp = np.sum(np.abs(frameL))
  128. print "STE amp:",amp
  129. return amp
  130. '''
  131. 将列表,转换成numpy的数组。
  132. '''
  133. def list2NP(self, srcList):
  134. if srcList.__len__() > 0:
  135. return np.array(srcList)
  136. else:
  137. return np.array([-1])
  138. '''
  139. 计算有声的平均强度,即帧的强度超过有声阈值,才被统计
  140. '''
  141. def getSoundAVGPower(self, LR=False):
  142. # print "getSoundAVGPower,self.soundPowerCHS:",self.soundPowerCHS
  143. if LR is False:
  144. if self.soundPowerCHS.__len__() < 1:
  145. return 0
  146. soundPowersL = self.list2NP(self.soundPowerCHS[0])
  147. if self.soundPowerCHS.__len__() == 2:
  148. soundPowersR = self.list2NP(self.soundPowerCHS[1])
  149. soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR)
  150. return np.average(soundPowers)
  151. return np.average(soundPowersL)
  152. else:
  153. if self.soundPowerCHS.__len__() < 1:
  154. return 0,0
  155. soundPowersL = self.list2NP(self.soundPowerCHS[0])
  156. soundPowersR = self.list2NP(self.soundPowerCHS[1])
  157. return np.average(soundPowersL),np.average(soundPowersR)
  158. '''
  159. :return result,valueList。
  160. result:-1代表声音检测异常;1表示单声道;2 表示双声道
  161. valueList:-1代表没有声音,0代表仅左声道,1代表仅右声道,2代表左右声道有声
  162. '''
  163. def getLRVariation(self):
  164. varList = []
  165. if self.soundPowerCHS.__len__() < 1:
  166. return -1, self.varList
  167. elif self.soundPowerCHS.__len__() == 1:
  168. return 1, self.varList
  169. else:
  170. return 2, self.varList
  171. varLRCount = 0
  172. def recLRVariation(self, channelPowers):
  173. ret = self.calLRVar(channelPowers)
  174. if self.varList.__len__() == 0:
  175. self.varList.append(ret)
  176. else:
  177. if self.varList[self.varList.__len__()-1] <> ret:
  178. self.varLRCount += 1
  179. if self.varLRCount * self.frameTime > 0.5:#大于0.5秒钟声音,才算入变化
  180. self.varList.append(ret)
  181. else:
  182. self.varLRCount = 0
  183. def calLRVar(self, channelPowers):
  184. if channelPowers.__len__() == 1:
  185. #单声道,左声道有声,或者无声
  186. return 0 if channelPowers[0] >= TH_POWER else -1
  187. else:
  188. if channelPowers[0] >= TH_POWER and channelPowers[1] >= TH_POWER:
  189. return 2
  190. elif channelPowers[0] >= TH_POWER and channelPowers[1] < TH_POWER:
  191. return 0
  192. elif channelPowers[0] < TH_POWER and channelPowers[1] >= TH_POWER:
  193. return 1
  194. else:
  195. return -1
  196. '''
  197. 计算整个录音的声音平均强度,实际意义不太大,除非是固定幅度音频检测。
  198. '''
  199. def getTotalAVGPower(self, LR=False):
  200. if LR is False:
  201. if self.allPowerCHS.__len__() < 1:
  202. return 0
  203. soundPowersL = self.list2NP(self.allPowerCHS[0])
  204. if self.allPowerCHS.__len__() == 2:
  205. soundPowersR = self.list2NP(self.allPowerCHS[1])
  206. soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR)
  207. return np.average(soundPowers)
  208. else:
  209. return np.average(soundPowersL)
  210. else:
  211. if self.allPowerCHS.__len__() < 1:
  212. return 0,0
  213. soundPowersL = self.list2NP(self.allPowerCHS[0])
  214. soundPowersR = self.list2NP(self.allPowerCHS[1])
  215. return np.average(soundPowersL),np.average(soundPowersR)
  216. '''
  217. 计算整个录音的声音最大强度,实际意义不太大,除非是固定幅度音频检测。
  218. '''
  219. def getTotalMaxPower(self, LR=False):
  220. if LR is False:
  221. if self.allPowerCHS.__len__() < 1:
  222. return 0
  223. soundPowersL = self.list2NP(self.allPowerCHS[0])
  224. if self.allPowerCHS.__len__() == 2:
  225. soundPowersR = self.list2NP(self.allPowerCHS[1])
  226. soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR)
  227. return int(np.max(soundPowers))
  228. else:
  229. return int(np.max(soundPowersL))
  230. else:
  231. if self.allPowerCHS.__len__() < 1:
  232. return 0, 0
  233. soundPowersL = self.list2NP(self.allPowerCHS[0])
  234. soundPowersR = self.list2NP(self.allPowerCHS[1])
  235. return int(np.max(soundPowersL)), int(np.max(soundPowersR))
  236. '''
  237. 计算帧的平均声音强度
  238. '''
  239. def getFrameAVGPower(self, frameL):
  240. avgPower = np.average(np.abs(frameL))
  241. # print "getFrameAVGPower:",avgPower
  242. return avgPower
  243. '''
  244. 计算帧的最大声音强度
  245. '''
  246. def getFrameMaxPower(self, frameL):
  247. maxPower = np.max(np.abs(frameL))
  248. # print "getFrameMaxPower:",maxPower
  249. return maxPower
  250. '''
  251. 根据傅里叶变化,计算声音强度
  252. '''
  253. def getFrameFFTPower(self, frameL, timeL):
  254. fftFreq = self.time2Frequence(frameL, timeL)
  255. fftPower = np.max(np.abs(fftFreq))
  256. powerFreq = np.argmax(np.abs(fftFreq))
  257. # print "getFrameFFTPower:",fftPower,powerFreq
  258. return fftPower,powerFreq
  259. '''
  260. 计算整个录音过程的fft转换后得到的声音强度
  261. '''
  262. def getFFTPower(self, LR=False):
  263. if LR is False:
  264. if self.fftPowerCHS.__len__() < 1:
  265. return 0
  266. soundPowersL = self.list2NP(self.fftPowerCHS[0])
  267. if self.fftPowerCHS.__len__() == 2:
  268. soundPowersR = self.list2NP(self.fftPowerCHS[1])
  269. soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR)
  270. return np.average(soundPowers)
  271. return np.average(soundPowersL)
  272. else:
  273. if self.fftPowerCHS.__len__() < 1:
  274. return 0,0
  275. soundPowersL = self.list2NP(self.fftPowerCHS[0])
  276. soundPowersR = self.list2NP(self.fftPowerCHS[1])
  277. return np.average(soundPowersL),np.average(soundPowersR)
  278. '''
  279. 获取整个录音文件每1秒的最大声音频率列表。
  280. LR=False,表示左右声道一样,取一个声道的值
  281. LR=True,表示左右声道不一样,取两个声道的值
  282. :param LR:是否取左右两个声道值。
  283. :return :LR=True,二维数组,左右两个声道值;LR=False,一位数组,一个声道值。None:表示数据异常
  284. '''
  285. def getFFTFreq(self, LR = False):
  286. # print "getFFTFreq:",self.fftFrqCHS
  287. if self.fftFrqCHS.__len__() < 1:
  288. return None
  289. if LR is True:
  290. return self.fftFrqCHS
  291. else:
  292. return self.fftFrqCHS[0]
  293. '''
  294. 计算帧过零率
  295. '''
  296. def ZCR(self,curFrame):
  297. # 过零率
  298. tmp1 = curFrame[:-1] #
  299. tmp2 = curFrame[1:]
  300. sings = tmp1 * tmp2 <= 0 #帧左右错位1,如果相邻2个数一正一负,则相乘后小于<0,表示1次过零。
  301. zcr = float(np.sum(sings)) / len(sings)
  302. print "ZCR:",zcr
  303. return zcr
  304. '''
  305. 将声音时域数据,转换成频谱数据。并指取人耳能听到的部分100Hz ~ 12KHz
  306. 注意:如果帧时长,不是1秒,无法建立频谱图。
  307. '''
  308. def time2Frequence(self, frameL, timeL=1.0):
  309. fftFreq = np.fft.fft(frameL) * 2 / len(frameL)
  310. if np.size(fftFreq) > 12000:
  311. fftFreq = fftFreq[0:12000]
  312. # freq = np.arange(0,np.size(fftFreq),1)
  313. # plt.plot(freq[:], abs(fftFreq[:]), 'r')
  314. # plt.show()
  315. return fftFreq
  316. '''
  317. 展现一帧音频的时域波形
  318. '''
  319. def showFrameTime(self, frameL, timeL, width):
  320. print "showFrameTime,frameL:", frameL.shape, np.ndim(frameL)
  321. ndim = np.ndim(frameL)
  322. if ndim == 1:
  323. frameCount = len(frameL)
  324. time = np.arange(0, frameCount) * (timeL / frameCount)
  325. print "showFrameTime,time:",time.shape
  326. plt.plot(time, frameL, "g")
  327. plt.xlabel("time (seconds)")
  328. plt.ylabel("power (hz)")
  329. plt.show()
  330. elif ndim == 2:
  331. rows, cols = frameL.shape
  332. time = np.arange(0, cols) * (timeL / cols)
  333. for i in range(rows):
  334. plt.subplot(rows,1,i+1)
  335. plt.plot(time, frameL[i])
  336. plt.xlabel("time (seconds)")
  337. plt.ylabel("power (hz)")
  338. plt.show()
  339. def getNPDtype(self, byteNum):
  340. if byteNum == 1:
  341. return np.int8
  342. elif byteNum == 2:
  343. return np.int16
  344. else:
  345. return np.int16
  346. '''
  347. 用于音频录制过程中的帧分析。分析结束后,可以使用get类型接口,获取结果,例如:getSoundAVGPower
  348. 分析fft声音频谱时,timeL=1秒才有意义,才能建立频谱图。
  349. 声音有无判断:双声道,只要一个声道有声,判断为有声。
  350. '''
  351. def anaysisFrames(self, frameQue, channels, width, frameTime):
  352. self.frameTime = frameTime
  353. self.nChannels = channels
  354. self.varList = []
  355. soundCount = 0
  356. noSoundCount = 0
  357. self.soundPowerCHS = []
  358. self.allPowerCHS = []
  359. self.fftPowerCHS = []
  360. self.fftFrqCHS = []
  361. for i in range(channels):
  362. self.soundPowerCHS.append([])
  363. self.allPowerCHS.append([])
  364. self.fftPowerCHS.append([])
  365. self.fftFrqCHS.append([])
  366. while self.isAF is True or frameQue.qsize() > 0:
  367. # print "anaysisFrames,frameQue.qsize:",frameQue.qsize()
  368. frames = frameQue.get()
  369. dtype = self.getNPDtype(width)
  370. # print "anaysisFrames,dtype size:", len(frames), channels, dtype
  371. frameL = np.fromstring(frames, dtype=dtype)
  372. frameL.shape = -1,channels
  373. frameL = frameL.T
  374. channelPowers = []
  375. for i in range(channels):
  376. avgPower = self.getFrameMaxPower(frameL[i])
  377. self.allPowerCHS[i].append(avgPower)
  378. fftPower,powerFreq = self.getFrameFFTPower(frameL[i], frameTime)
  379. self.fftPowerCHS[i].append(fftPower)
  380. self.fftFrqCHS[i].append(powerFreq)
  381. channelPowers.append(avgPower)
  382. #每一帧各通道数值计算完毕,开始进行帧的有无声判断
  383. if i == channels - 1:
  384. #获取当前帧的各通道最强声音强度。
  385. framePower = self.getCHMaxPower(channelPowers)
  386. #左右声道有无声的记录
  387. self.recLRVariation(channelPowers)
  388. del channelPowers
  389. if framePower >= TH_POWER:
  390. noSoundCount = 0
  391. soundCount += 1
  392. if soundCount * frameTime >= TH_PLOSIVE_TIME:
  393. self.hasSound = True
  394. else:
  395. # 爆破音检测。还缺失:有声音的情况下,爆破音检测。
  396. if soundCount * frameTime < TH_PLOSIVE_TIME and soundCount > 0:
  397. self.hasPlosive = True
  398. noSoundCount += 1
  399. soundCount = 0
  400. if noSoundCount * frameTime >= TH_BLOCK_TIME:
  401. self.hasBlock = True
  402. #左右声道分开记录有声的帧的声音强度
  403. if avgPower >= TH_POWER:
  404. self.soundPowerCHS[i].append(avgPower)
  405. '''
  406. 返回当前帧的各通道的最大音量
  407. '''
  408. def getCHMaxPower(self, channelPowers):
  409. channelPowers_np = self.list2NP(channelPowers)
  410. return np.max(channelPowers_np)
  411. '''开启音频后台分析线程'''
  412. def startFramesAnalysis(self, frameQue, channels, width, buf_time):
  413. self.initStatus()
  414. self.isAF = True
  415. thread.start_new_thread(self.anaysisFrames, (frameQue,channels,width, buf_time))
  416. '''关闭音频后台分析,需要数据全部处理完后,线程才会停止'''
  417. def endFramesAnalysis(self):
  418. self.isAF = False
  419. def getWavReader(self, waveFile):
  420. wfReader = None
  421. try:
  422. wfReader = wave.open(waveFile,"rb")
  423. except Exception as e:
  424. print "Wave不存在",e
  425. return wfReader
  426. '''
  427. 分析wav音频文件后,可以使用get类型接口,获取结果,例如:getTotalAVGPower
  428. 分析fft声音频谱时,buf_time=1秒才有意义,才能建立频谱图。
  429. '''
  430. def analyWav(self, waveFile, buf_time=READ_BUF_TIME):
  431. self.frameTime = buf_time
  432. self.initStatus()
  433. frameQue = Queue.Queue()
  434. wfReader = self.getWavReader(waveFile)
  435. if wfReader is None:
  436. return
  437. nChannels, width, frameRate, nframes = wfReader.getparams()[0:4]
  438. stepnFrames = int(frameRate*buf_time)
  439. times = int(nframes/stepnFrames)
  440. print "analyWav:",buf_time,stepnFrames,times,frameQue.qsize()
  441. for i in range(times):
  442. frameL = wfReader.readframes(stepnFrames)
  443. frameQue.put_nowait(frameL)
  444. print "analyWav:", buf_time, stepnFrames, times, frameQue.qsize()
  445. # self.startFramesAnalysis(frameQue, nChannels, width,timeL)
  446. # self.endFramesAnalysis()
  447. self.isAF = False
  448. self.anaysisFrames(frameQue, nChannels, width,buf_time)
  449. wfReader.close()
  450. if __name__ == "__main__":
  451. analysis = AAnalysis()
  452. # waveFile = "test1.wav"
  453. waveFile = "eq_10khz_v0.wav"
  454. # waveFile = "tv/DVB_DTV_automatic_search.wav"
  455. # waveFile = "wav_balance_v15.wav"
  456. # waveFile = r"D:\sound\sound_preset\sound_preset_mode_music.wav"
  457. # waveFile = r"D:\sound\5k\eq_5khz_v100.wav"
  458. # waveFile = r"D:\sound\monitorSound_balance.wav"
  459. # analysis.showLanguage(waveFile=waveFile)
  460. # analysis.showFregForm(waveFile)
  461. # analysis.showWaveForm(waveFile)
  462. analysis.analyWav(waveFile, buf_time=1)
  463. analysis.getFFTFreq()
  464. # print "sound,status hasSound,hasBlock,hasPlosive:",analysis.hasSound,analysis.hasBlock,analysis.hasPlosive
  465. # print "0,getSoundAVGPower:", analysis.getSoundAVGPower(LR=True)
  466. # print "0,getTotalAVGPower:", analysis.getTotalAVGPower(LR=True)
  467. # print "0,getFFTPower:",analysis.getFFTPower(LR=True)
  468. #
  469. # waveFile = "eq_10khz_v100.wav"
  470. # analysis.analyWav(waveFile)
  471. # print "100,avgPower:", analysis.getSoundAVGPower()
  472. # print "100,fftPower:",analysis.getFFTPower()