#-*- coding:utf-8 -*- import os,sys reload(sys) sys.setdefaultencoding('utf-8') import wave from time import sleep import time import numpy import numpy as np import pyaudio # import pylab # import matplotlib.pyplot as pl # import matplotlib import math,random from ssat_sdk.sound.audio_recorder import ARecorder # from ssat_sdk.sound.audio_device import AudioDevice from ssat_sdk.sound.audio_recorder import READ_BUF_TIME from ssat_sdk.sound.audio_analysis import AAnalysis from ssat_sdk.sat_environment import getSATTmpDIR,getVoicecards,writeSoundList import threading,thread import Queue from scipy import fftpack from scipy import signal from scipy.io import wavfile from ssat_sdk.device_manage.sound_manager import * from threading import Lock FORMAT = pyaudio.paInt16 RATE = 44100 RECORD_SECONDS = 5 data = [] FFT_LEN = 128 frames = [] counter = 1 isMonitor = True NoSoundLT = 10#记录连续8次 2: self.aAnalysis.analyWav(audioFile) return self.aAnalysis.getTotalAVGPower(LR=LR) ''' 返回monitorSound过程中的最大音量,如果LR为True,则返回左右声道的声音最大强度。 :param LR:左右声道检测选项。True:左右声道分开检测;False:仅检测0声道,可能是左声道,有可能仅有的一个声道。 :param audioFile: 传入新的音频文件,不用monitorSound的音频文件。 :return: LR=True时,返回左右声道声音强度2个值,LR=False时,返回一个声道强度值。 ''' def getMaxSoundPower(self, LR=False, audioFile=""): if audioFile.__len__() > 2: self.aAnalysis.analyWav(audioFile) return self.aAnalysis.getTotalMaxPower(LR=LR) ''' 获取左右声道有无声的状态。 :return result,valueList。 result:-1代表声音检测异常;1表示单声道;2 表示双声道 valueList:-1代表没有声音,0代表仅左声道,1代表仅右声道,2代表左右声道有声 ''' def getLRVariation(self): return self.aAnalysis.getLRVariation() ''' 监听声音状态,阻塞式监听 :param seconds:监听声音异常时间 :param expList数组:声音异常记录数组。数组第一个值{"time":time.asctime( time.localtime(time.time()))},记录开始时间。 后续,碰到一次异常,记录一次。 expList第一个字典数据,有audio 键,取录音文件路径。status键:0 代表无异常;1代表没有声音发出;2代表声音有间断。 :param waveFile:可以指定录制的音频文件路径 :return expList数组。 和带入的expList一样 ''' def monitorSound(self, seconds,expList=None, waveFile = None, buf_time=READ_BUF_TIME): global ExpList MuTex.acquire() print "monitorSound start:",seconds,expList del ExpList if expList is not None: ExpList = expList else: ExpList = [] if self.deviceType == 0: firstItem = {"time":time.asctime(time.localtime(time.time()))} # analysisLock = threading.Condition() self.aAnalysis.startFramesAnalysis(self.audioChecker.getFrameQue(), self.audioChecker.CHANNELS, self.audioChecker.WIDTH, buf_time) self.audioChecker.monitor(seconds,saveWave=True, waveFile=waveFile, buf_time=buf_time) self.audioFile = self.audioChecker.waveFile time.sleep(1) #预留1秒钟,用于音频分析。 self.aAnalysis.endFramesAnalysis() firstItem["audio"] = self.audioChecker.getWavFile() firstItem["status"] = self.Sound_No retExpList = self.changeMonSoundResult(firstItem) # print u'monitorSound end',retExpList MuTex.release() # 解锁 ExpList.append(firstItem) print "monitorSound end:", seconds, retExpList return retExpList elif self.deviceType == 1: # 采集时长,单位毫秒; # collectionTime = 5000 # 静音电压,单位伏; muteVoltage = 1.0 # 停顿时长,单位毫秒; interruptTime = 200 ExpList.append({"time":time.asctime( time.localtime(time.time())),"audio":""}) isInterrupt = self.audioChecker.IsInterrupt( seconds, muteVoltage, interruptTime) if isInterrupt == True: ExpList.append({"time":time.asctime( time.localtime(time.time()))}) MuTex.release() # 解锁 return ExpList ''' 启动一个线程,调用monitorSound函数。 详细说明,查看monitorSound函数。 ''' def monitorSoundTH(self,seconds,expList=None, waveFile = None, buf_time=READ_BUF_TIME): thread.start_new_thread(self.monitorSound, (seconds, expList, waveFile, buf_time,)) def changeMonSoundResult(self,expFirstItem): self.isSound = self.aAnalysis.hasSound self.hasBlock = self.aAnalysis.hasBlock soundPower = self.getSoundPower() print "changeMonSoundResult:", self.aAnalysis.hasSound, self.aAnalysis.hasBlock, self.aAnalysis.hasPlosive,soundPower if soundPower - self.aAnalysis.TH_POWER >= 0: self.aAnalysis.hasSound = True if self.aAnalysis.hasSound is True and self.aAnalysis.hasBlock is True: expFirstItem["status"] = self.Sound_Break elif self.aAnalysis.hasSound is True and self.aAnalysis.hasBlock is False: expFirstItem["status"] = self.Sound_Has else: expFirstItem["status"] = self.Sound_No retExpList = [] retExpList.append(expFirstItem) # del ExpList #线程调用,回传参数,需要用到 return retExpList # print "changeMonSoundResult:",ExpList ''' 开启一个后台线程,录制音频文件,并返回音频文件路径(保存于临时文件夹)。 :param seconds:录制声音时间 :param fileName:希望命名的文件名 :param CHANNELS:录制单声道时为1,录制双声道时为2 成功时返回音频文件路径,失败则返回空结果 ''' def recordAudioFileReturnPath(self, seconds, fileName,CHANNELS = 2): try: filePath = os.path.join(getSATTmpDIR(), fileName) audioPath = self.monitorSound(seconds,waveFile=filePath)[0]["audio"] return audioPath except Exception,e: # LoggingUtil.printLog(u"获取录制的音频文件失败!") print u"获取录制的音频文件失败!",e return "wav file record fail!!!" ''' 读取立体声文件并分离左右声道音源, 并返回左右声道音源的路径 ''' def splitChannel(self, filePath = ""): # 读取WAV声音文件 if filePath == "": filePath = self.audioChecker.getWavFile() sampleRate, musicData = wavfile.read(filePath) # 提取左右声道数据 left = [] right = [] musicData.shape = -1,2 musicData = musicData.T left = musicData[0] right = musicData[1] # 写入结果文件 section_path = filePath.split(".")[0] leftPath = section_path + "_left.wav" rightPath = section_path + "_right.wav" wavfile.write(leftPath, sampleRate, np.array(left)) wavfile.write(rightPath, sampleRate, np.array(right)) return leftPath, rightPath ''' 对比两个声音模式的参数,判断是否为同一种声音模式。 传入的参数,mode1和mode2数组数据要对齐,按低频率到高频率幅度值排列。 建议采集幅度:200HZ、500HZ、1KHz、1.5KHz、2KHz、5KHz、10KHz :param mode1,数组[200hz声音强度、500hz声音强度...] :param mode2,数组[200hz声音强度、500hz声音强度...] :return True/False, True代表一样,False代表不一样 ''' def cmpSoundMode(self, mode1, mode2, offset=2000): freqD = 0 for index in range(mode1.__len__()): if abs(mode1[index] - mode2[index]) > offset: return False elif abs(mode1[index] - mode2[index]) > 2*offset/3: freqD += 2 elif abs(mode1[index] - mode2[index]) > offset/3: freqD += 1 if freqD/mode1.__len__() > 1: return False else: return True ''' 用于检测幅度不变单频音的稳定播放,支持左右两个声道不同单频音 在monitorSound后,根据记录下的音频文件,判断最大幅度的频率是否和目标频率吻合。 或者输入一个音频文件,分析音频文件,判断最大幅度的频率是否和目标频率吻合。 精确度:1秒为单位,进行频率检测。 :param stdFreq:目标频率,单位Hz :param audioFile:录制的音频文件 :param offset:表示频率偏差允许范围 :return 声道1数值,声道2数值。1:频率完全吻合,0:频率部分吻合,有频率变化情况(可能是噪音);-1:频率完全不吻合 ''' def checkSingleFreq(self, stdFreq,audioFile="",offset=10): #audioFile: D:\1.wav ret1 = -1 ret2 = -1 if audioFile is not None and audioFile.find(".wav") > -1: self.aAnalysis.analyWav(audioFile, buf_time=1) else: self.aAnalysis.analyWav(self.audioFile, buf_time=1) # 记录每秒的最大幅度频率 freqList = self.aAnalysis.getFFTFreq(LR=True) print "checkSingleFreq,freqList:",freqList.__len__(),freqList if freqList is None: return ret1, ret2 # 根据记录的频率列表,与stdFreq比对,返回结果。 if freqList.__len__() < 1: return ret1.ret2 freqList1,freqList2 = [],[] if freqList.__len__() >= 1: freqList1 = freqList[0] if freqList.__len__() >= 2: freqList2 = freqList[0] #计算声道1 meetNum = 0 for freq in freqList1: if abs(freq - stdFreq) < offset: meetNum += 1 if meetNum/freqList1.__len__() == 1: ret1 = 1 elif meetNum/freqList1.__len__() > 0: ret1 = 0 elif meetNum/freqList1.__len__() == 0: ret1 = -1 #计算声道2 meetNum = 0 for freq in freqList2: if abs(freq - stdFreq) < offset: meetNum += 1 if meetNum/freqList2.__len__() == 1: ret2 = 1 elif meetNum/freqList2.__len__() > 0: ret2 = 0 elif meetNum/freqList2.__len__() == 0: ret2 = -1 return ret1,ret2 ''' 设置录音设备: input_front_mic = Realtek High Definition input_back_mic = HD Webcam C270 :param devName,传入自定义设备名字,例如:input_front_mic ''' def setInputSoundDev(self, devName): self.audioChecker.setDeviceName(devName) if __name__ == "__main__": audioDevice = AudioDevice() audioDevice.setDevice(2) aIden = AudioIdentify() # aIden.setInputSoundDev("input_front_mic") # aIden.setInputSoundDev("input_back_mic") aIden.monitorSound(5) # print "cmpSoundMode:", aIden.cmpSoundMode([3000,6000,34000,40000],[3400,5000,30000,40000]) # aIden.monitorSound(5) time.sleep(1) # sound1 = "D:/1.wav" # sound2 = "D:/2.wav" # list1=[] # expList1 = aIden.monitorSoundTH(10) # time.sleep(15) # aIden.recordAudioFileReturnPath(5, "wavetest_%s.wav" % str(1)) # print "expList1:",expList1 # print "soundPower:", aIden.getSoundPower() # print "fftsoundPower:", aIden.aAnalysis.getFFTPower() # for i in range(10): # filePath = aIden.recordAudioFileReturnPath(5,"wavetest_%s.wav"%str(i)) # print "getSoundPower:", aIden.getSoundPower(LR=True) # print "filePath:", filePath # time.sleep(3) # print "getSoundPower:",aIden.getSoundPower(LR=True) # print "getTotalAVGPower:",aIden.aAnalysis.getTotalAVGPower(LR=True) # aIden.aAnalysis.analyWav("D:/5.wav") # print "getLRVariation:",aIden.getLRVariation() # print "getFFTPower:",aIden.aAnalysis.getFFTPower(LR=True) # print "aIden.isOK():", aIden.isOK(),list1 # shutil.move(expList1[0]['audio'], sound1) # for i in range(10): # expList2 = aIden.monitorSound(1) # print "expList2:",i, expList2 # shutil.move(expList2[0]['audio'], sound2) # waveFile = "sound/wav_balance_v15.wav" # print "getSoundPower:", aIden.getSoundPower(LR=True,audioFile=waveFile)