audio_analysis.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. # -*- coding:utf-8 -*-
  2. import os, sys, time
  3. from audio_recorder import READ_BUF_TIME,ARecorder
  4. from ssat_sdk.sat_environment import getSoundList
  5. import numpy as np
  6. import matplotlib.pyplot as plt
  7. import wave
  8. import thread
  9. import Queue
  10. def getSoundLevel():
  11. hasSoundLevel, noSoundLevel = getSoundList()
  12. return round(hasSoundLevel, 2)
  13. TH_POWER = getSoundLevel() #平均短时能有无声的阈值
  14. TH_PLOSIVE_TIME = 0.1 #短于0.1秒的声音,判断为爆破音,大于0.1秒,判断为正常声音
  15. TH_BLOCK_TIME = 0.1#声音间断间隔时间,单位s
  16. class AAnalysis():
  17. def __init__(self):
  18. self.varList = [] #记录各声道有无声变化
  19. self.nChannels = 2
  20. self.initStatus()
  21. def initStatus(self):
  22. self.power = 0
  23. self.hasSound = False
  24. self.hasBlock = False
  25. self.hasPlosive = False
  26. self.soundPowerCHS = []
  27. self.allPowerCHS = []
  28. self.fftPowerCHS = []
  29. def showLanguage(self, time=5, waveFile=None):
  30. if waveFile is None:
  31. recorder = ARecorder()
  32. waveFile = "test.wav"
  33. recorder.recordWave(waveFile,time)
  34. # 调用wave模块中的open函数,打开语音文件。
  35. f = wave.open(waveFile, 'rb')
  36. # 得到语音参数
  37. params = f.getparams()
  38. nchannels, sampwidth, framerate, nframes = params[:4]
  39. # 得到的数据是字符串,需要将其转成int型
  40. strData = f.readframes(nframes)
  41. wavaData = np.fromstring(strData, dtype=np.int16)
  42. # 归一化
  43. wavaData = wavaData * 1.0 / max(abs(wavaData))
  44. # .T 表示转置
  45. wavaData = np.reshape(wavaData, [nframes, nchannels]).T
  46. f.close()
  47. # 绘制频谱
  48. plt.specgram(wavaData[0], Fs=framerate, scale_by_freq=True, sides='default')
  49. plt.ylabel('Frequency')
  50. plt.xlabel('Time(s)')
  51. plt.show()
  52. def showFregForm(self,waveFile):
  53. wf = wave.open(waveFile, "rb")
  54. nframes = wf.getnframes()
  55. framerate = wf.getframerate()
  56. frame_data = wf.readframes(nframes)
  57. wf.close()
  58. time_data = np.fromstring(frame_data, dtype=np.int16)
  59. time_data.shape = -1, 2
  60. time_dataT = time_data.T
  61. freq = [n for n in range(0, framerate)]
  62. start = 0
  63. end = framerate
  64. time_dataT2 = time_dataT[0][start:start + end]
  65. # self.time2Frequence(time_dataT2[2000:2000+1000])
  66. c = self.time2Frequence(time_dataT2, 1)
  67. print "showFregForm:c,", c.shape, np.max(np.abs(c)), np.average(np.abs(c))
  68. for i in range(1,5):
  69. start = framerate*i
  70. end = framerate*(i+1)
  71. print "showFregForm:start,end:",start,end
  72. time_dataT2 = time_dataT[0][start:end]
  73. c = self.time2Frequence(time_dataT2, 1)
  74. print "showFregForm:c,", c.shape, np.max(np.abs(c)), np.average(np.abs(c)),time_dataT2.shape
  75. plt.plot(freq[:], abs(c[:]), 'r')
  76. plt.show()
  77. def showWaveForm(self, waveFile):
  78. # -*- coding: utf-8 -*-
  79. # 打开WAV文档
  80. f = wave.open(waveFile, "rb")
  81. # 读取格式信息
  82. # (nchannels, sampwidth, framerate, nframes, comptype, compname)
  83. params = f.getparams()
  84. nchannels, sampwidth, framerate, nframes = params[:4]
  85. # 读取波形数据
  86. print "nchannels, sampwidth, framerate, nframes:",nchannels, sampwidth, framerate, nframes
  87. str_data = f.readframes(nframes)
  88. f.close()
  89. # 将波形数据转换为数组
  90. print "str_data:",str_data.__len__()
  91. wave_data = np.fromstring(str_data, dtype=np.int16)
  92. print "wave_data.shape:",wave_data.shape
  93. # 声道处理
  94. wave_data.shape = -1, nchannels
  95. wave_data = wave_data.T
  96. start = 0
  97. per_frames = int(framerate * READ_BUF_TIME)
  98. chunks = int(nframes/framerate/READ_BUF_TIME)
  99. for i in range(2,chunks):
  100. start = per_frames*i
  101. end = per_frames * (i + 1)
  102. self.getFrameAVGPower(wave_data[0][start:end])
  103. self.getFrameFFTPower(wave_data[0][start:end],READ_BUF_TIME)
  104. # self.STE(wave_data[0][start:end])
  105. # self.ZCR(wave_data[0][start:end])
  106. self.STE(wave_data[0])
  107. # 声道处理 End
  108. print "channel 0:",wave_data[0].shape, len(wave_data[0])
  109. print "channel 1:", wave_data[1].shape, len(wave_data[1])
  110. # time = np.arange(0, nframes) * (1.0 / framerate)
  111. time = np.arange(0, nframes) * (1.0 / framerate)
  112. print "time:", time.shape
  113. # 绘制波形
  114. plt.subplot(211)
  115. plt.plot(time, wave_data[0], "b")
  116. plt.subplot(212)
  117. plt.plot(time, wave_data[1], c="g")
  118. plt.xlabel("time (seconds)")
  119. plt.ylabel("power (hz)")
  120. plt.show()
  121. '''
  122. 计算帧短时能
  123. '''
  124. def STE(self, frameL):
  125. amp = np.sum(np.abs(frameL))
  126. print "STE amp:",amp
  127. return amp
  128. '''
  129. 将列表,转换成numpy的数组。
  130. '''
  131. def list2NP(self, srcList):
  132. if srcList.__len__() > 0:
  133. return np.array(srcList)
  134. else:
  135. return np.array([-1])
  136. '''
  137. 计算有声的平均强度,即帧的强度超过有声阈值,才被统计
  138. '''
  139. def getSoundAVGPower(self, LR=False):
  140. # print "getSoundAVGPower,self.soundPowerCHS:",self.soundPowerCHS
  141. if LR is False:
  142. if self.soundPowerCHS.__len__() < 1:
  143. return 0
  144. soundPowersL = self.list2NP(self.soundPowerCHS[0])
  145. if self.soundPowerCHS.__len__() == 2:
  146. soundPowersR = self.list2NP(self.soundPowerCHS[1])
  147. soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR)
  148. return np.average(soundPowers)
  149. return np.average(soundPowersL)
  150. else:
  151. if self.soundPowerCHS.__len__() < 1:
  152. return 0,0
  153. soundPowersL = self.list2NP(self.soundPowerCHS[0])
  154. soundPowersR = self.list2NP(self.soundPowerCHS[1])
  155. return np.average(soundPowersL),np.average(soundPowersR)
  156. '''
  157. :return result,valueList。
  158. result:-1代表声音检测异常;1表示单声道;2 表示双声道
  159. valueList:-1代表没有声音,0代表仅左声道,1代表仅右声道,2代表左右声道有声
  160. '''
  161. def getLRVariation(self):
  162. varList = []
  163. if self.soundPowerCHS.__len__() < 1:
  164. return -1, self.varList
  165. elif self.soundPowerCHS.__len__() == 1:
  166. return 1, self.varList
  167. else:
  168. return 2, self.varList
  169. varLRCount = 0
  170. def recLRVariation(self, channelPowers):
  171. ret = self.calLRVar(channelPowers)
  172. if self.varList.__len__() == 0:
  173. self.varList.append(ret)
  174. else:
  175. if self.varList[self.varList.__len__()-1] <> ret:
  176. self.varLRCount += 1
  177. if self.varLRCount * self.frameTime > 0.5:#大于0.5秒钟声音,才算入变化
  178. self.varList.append(ret)
  179. else:
  180. self.varLRCount = 0
  181. def calLRVar(self, channelPowers):
  182. if channelPowers.__len__() == 1:
  183. #单声道,左声道有声,或者无声
  184. return 0 if channelPowers[0] >= TH_POWER else -1
  185. else:
  186. if channelPowers[0] >= TH_POWER and channelPowers[1] >= TH_POWER:
  187. return 2
  188. elif channelPowers[0] >= TH_POWER and channelPowers[1] < TH_POWER:
  189. return 0
  190. elif channelPowers[0] < TH_POWER and channelPowers[1] >= TH_POWER:
  191. return 1
  192. else:
  193. return -1
  194. '''
  195. 计算整个录音的声音平均强度,实际意义不太大,除非是固定幅度音频检测。
  196. '''
  197. def getTotalAVGPower(self, LR=False):
  198. if LR is False:
  199. if self.allPowerCHS.__len__() < 1:
  200. return 0
  201. soundPowersL = self.list2NP(self.allPowerCHS[0])
  202. if self.allPowerCHS.__len__() == 2:
  203. soundPowersR = self.list2NP(self.allPowerCHS[1])
  204. soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR)
  205. return np.average(soundPowers)
  206. else:
  207. return np.average(soundPowersL)
  208. else:
  209. if self.allPowerCHS.__len__() < 1:
  210. return 0,0
  211. soundPowersL = self.list2NP(self.allPowerCHS[0])
  212. soundPowersR = self.list2NP(self.allPowerCHS[1])
  213. return np.average(soundPowersL),np.average(soundPowersR)
  214. '''
  215. 计算帧的平均声音强度
  216. '''
  217. def getFrameAVGPower(self, frameL):
  218. avgPower = np.average(np.abs(frameL))
  219. # print "getFrameAVGPower:",avgPower
  220. return avgPower
  221. '''
  222. 计算帧的平均声音强度
  223. '''
  224. def getFrameMaxPower(self, frameL):
  225. maxPower = np.max(np.abs(frameL))
  226. # print "getFrameMaxPower:",maxPower
  227. return maxPower
  228. '''
  229. 根据傅里叶变化,计算声音强度
  230. '''
  231. def getFrameFFTPower(self, frameL, timeL):
  232. fftFreq = self.time2Frequence(frameL, timeL)
  233. fftPower = np.max(np.abs(fftFreq))
  234. # print "getFrameFFTPower:",fftPower
  235. return fftPower
  236. '''
  237. 计算整个录音过程的fft转换后得到的声音强度
  238. '''
  239. def getFFTPower(self, LR=False):
  240. if LR is False:
  241. if self.fftPowerCHS.__len__() < 1:
  242. return 0
  243. soundPowersL = self.list2NP(self.fftPowerCHS[0])
  244. if self.fftPowerCHS.__len__() == 2:
  245. soundPowersR = self.list2NP(self.fftPowerCHS[1])
  246. soundPowers = np.where(soundPowersL > soundPowersR, soundPowersL, soundPowersR)
  247. return np.average(soundPowers)
  248. return np.average(soundPowersL)
  249. else:
  250. if self.fftPowerCHS.__len__() < 1:
  251. return 0,0
  252. soundPowersL = self.list2NP(self.fftPowerCHS[0])
  253. soundPowersR = self.list2NP(self.fftPowerCHS[1])
  254. return np.average(soundPowersL),np.average(soundPowersR)
  255. '''
  256. 计算帧过零率
  257. '''
  258. def ZCR(self,curFrame):
  259. # 过零率
  260. tmp1 = curFrame[:-1] #
  261. tmp2 = curFrame[1:]
  262. sings = tmp1 * tmp2 <= 0 #帧左右错位1,如果相邻2个数一正一负,则相乘后小于<0,表示1次过零。
  263. zcr = float(np.sum(sings)) / len(sings)
  264. print "ZCR:",zcr
  265. return zcr
  266. '''
  267. 将声音时域数据,转换成频谱数据。
  268. 注意:如果帧时长,不是1秒,无法建立频谱图。
  269. '''
  270. def time2Frequence(self, frameL, timeL=1.0):
  271. fftFreq = np.fft.fft(frameL) * 2 / len(frameL)
  272. # freq = np.arange(0,len(frameL),1)
  273. # plt.plot(freq[:], abs(fftFreq[:]), 'r')
  274. # plt.show()
  275. return fftFreq
  276. '''
  277. 展现一帧音频的时域波形
  278. '''
  279. def showFrameTime(self, frameL, timeL, width):
  280. print "showFrameTime,frameL:", frameL.shape, np.ndim(frameL)
  281. ndim = np.ndim(frameL)
  282. if ndim == 1:
  283. frameCount = len(frameL)
  284. time = np.arange(0, frameCount) * (timeL / frameCount)
  285. print "showFrameTime,time:",time.shape
  286. plt.plot(time, frameL, "g")
  287. plt.xlabel("time (seconds)")
  288. plt.ylabel("power (hz)")
  289. plt.show()
  290. elif ndim == 2:
  291. rows, cols = frameL.shape
  292. time = np.arange(0, cols) * (timeL / cols)
  293. for i in range(rows):
  294. plt.subplot(rows,1,i+1)
  295. plt.plot(time, frameL[i])
  296. plt.xlabel("time (seconds)")
  297. plt.ylabel("power (hz)")
  298. plt.show()
  299. def getNPDtype(self, byteNum):
  300. if byteNum == 1:
  301. return np.int8
  302. elif byteNum == 2:
  303. return np.int16
  304. else:
  305. return np.int16
  306. '''
  307. 用于音频录制过程中的帧分析。分析结束后,可以使用get类型接口,获取结果,例如:getSoundAVGPower
  308. 分析fft声音频谱时,timeL=1秒才有意义,才能建立频谱图。
  309. 声音有无判断:双声道,只要一个声道有声,判断为有声。
  310. '''
  311. def anaysisFrames(self, frameQue, channels, width, frameTime):
  312. self.frameTime = frameTime
  313. self.nChannels = channels
  314. self.varList = []
  315. soundCount = 0
  316. noSoundCount = 0
  317. self.soundPowerCHS = []
  318. self.allPowerCHS = []
  319. self.fftPowerCHS = []
  320. for i in range(channels):
  321. self.soundPowerCHS.append([])
  322. self.allPowerCHS.append([])
  323. self.fftPowerCHS.append([])
  324. while self.isAF is True or frameQue.qsize() > 0:
  325. frames = frameQue.get()
  326. dtype = self.getNPDtype(width)
  327. # print "anaysisFrames,dtype size:", len(frames), channels, dtype
  328. frameL = np.fromstring(frames, dtype=dtype)
  329. frameL.shape = -1,channels
  330. frameL = frameL.T
  331. channelPowers = []
  332. for i in range(channels):
  333. avgPower = self.getFrameMaxPower(frameL[i])
  334. self.allPowerCHS[i].append(avgPower)
  335. fftPower = self.getFrameFFTPower(frameL[i], frameTime)
  336. self.fftPowerCHS[i].append(fftPower)
  337. channelPowers.append(avgPower)
  338. #每一帧各通道数值计算完毕,开始进行帧的有无声判断
  339. if i == channels - 1:
  340. #获取当前帧的各通道最强声音强度。
  341. framePower = self.getCHMaxPower(channelPowers)
  342. #左右声道有无声的记录
  343. self.recLRVariation(channelPowers)
  344. del channelPowers
  345. if framePower >= TH_POWER:
  346. noSoundCount = 0
  347. soundCount += 1
  348. if soundCount * frameTime >= TH_PLOSIVE_TIME:
  349. self.hasSound = True
  350. else:
  351. # 爆破音检测。还缺失:有声音的情况下,爆破音检测。
  352. if soundCount * frameTime < TH_PLOSIVE_TIME and soundCount > 0:
  353. self.hasPlosive = True
  354. noSoundCount += 1
  355. soundCount = 0
  356. if noSoundCount * frameTime >= TH_BLOCK_TIME:
  357. self.hasBlock = True
  358. #左右声道分开记录有声的帧的声音强度
  359. if avgPower >= TH_POWER:
  360. self.soundPowerCHS[i].append(avgPower)
  361. '''
  362. 返回当前帧的各通道的最大音量
  363. '''
  364. def getCHMaxPower(self, channelPowers):
  365. channelPowers_np = self.list2NP(channelPowers)
  366. return np.max(channelPowers_np)
  367. '''开启音频后台分析线程'''
  368. def startFramesAnalysis(self, frameQue, channels, width, buf_time):
  369. self.initStatus()
  370. self.isAF = True
  371. thread.start_new_thread(self.anaysisFrames, (frameQue,channels,width, buf_time))
  372. '''关闭音频后台分析,需要数据全部处理完后,线程才会停止'''
  373. def endFramesAnalysis(self):
  374. self.isAF = False
  375. def getWavReader(self, waveFile):
  376. wfReader = None
  377. try:
  378. wfReader = wave.open(waveFile,"rb")
  379. except Exception as e:
  380. print "Wave不存在",e
  381. return wfReader
  382. '''
  383. 分析wav音频文件后,可以使用get类型接口,获取结果,例如:getTotalAVGPower
  384. 分析fft声音频谱时,timeL=1秒才有意义,才能建立频谱图。
  385. '''
  386. def analyWav(self, waveFile, buf_time=READ_BUF_TIME):
  387. self.frameTime = buf_time
  388. self.initStatus()
  389. frameQue = Queue.Queue()
  390. wfReader = self.getWavReader(waveFile)
  391. if wfReader is None:
  392. return
  393. nChannels, width, frameRate, nframes = wfReader.getparams()[0:4]
  394. stepnFrames = int(frameRate*buf_time)
  395. times = int(nframes/stepnFrames)
  396. print "analyWav:",buf_time,stepnFrames,times
  397. for i in range(times):
  398. frameL = wfReader.readframes(stepnFrames)
  399. frameQue.put_nowait(frameL)
  400. # self.startFramesAnalysis(frameQue, nChannels, width,timeL)
  401. # self.endFramesAnalysis()
  402. self.isAF = False
  403. self.anaysisFrames(frameQue, nChannels, width,buf_time)
  404. wfReader.close()
  405. if __name__ == "__main__":
  406. analysis = AAnalysis()
  407. # waveFile = "test1.wav"
  408. # waveFile = "eq_10khz_v0.wav"
  409. # waveFile = "tv/DVB_DTV_automatic_search.wav"
  410. waveFile = "wav_balance_v15.wav"
  411. # waveFile = r"D:\sound\sound_preset\sound_preset_mode_music.wav"
  412. # waveFile = r"D:\sound\5k\eq_5khz_v100.wav"
  413. # waveFile = r"D:\sound\monitorSound_balance.wav"
  414. # analysis.showLanguage(waveFile=waveFile)
  415. # analysis.showFregForm(waveFile)
  416. # analysis.showWaveForm(waveFile)
  417. analysis.analyWav(waveFile)
  418. print "sound,status hasSound,hasBlock,hasPlosive:",analysis.hasSound,analysis.hasBlock,analysis.hasPlosive
  419. print "0,getSoundAVGPower:", analysis.getSoundAVGPower(LR=True)
  420. print "0,getTotalAVGPower:", analysis.getTotalAVGPower(LR=True)
  421. print "0,getFFTPower:",analysis.getFFTPower(LR=True)
  422. #
  423. # waveFile = "eq_10khz_v100.wav"
  424. # analysis.analyWav(waveFile)
  425. # print "100,avgPower:", analysis.getSoundAVGPower()
  426. # print "100,fftPower:",analysis.getFFTPower()