123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- #-*- coding:utf-8 -*-
- import os,sys
- reload(sys)
- sys.setdefaultencoding('utf-8')
- import wave
- from time import sleep
- import time
- import numpy
- import numpy as np
- import pyaudio
- # import pylab
- # import matplotlib.pyplot as pl
- # import matplotlib
- import math,random
- from ssat_sdk.sound.audio_recorder import ARecorder
- from ssat_sdk.sound.audio_recorder import READ_BUF_TIME
- from ssat_sdk.sound.audio_analysis import AAnalysis
- from ssat_sdk.sat_environment import getSATTmpDIR,getVoicecards
- import threading,thread
- import Queue
- from scipy import fftpack
- from scipy import signal
- from scipy.io import wavfile
- from ssat_sdk.device_manage.sound_manager import *
- from threading import Lock
- FORMAT = pyaudio.paInt16
- RATE = 44100
- RECORD_SECONDS = 5
- data = []
- FFT_LEN = 128
- frames = []
- counter = 1
- isMonitor = True
- NoSoundLT = 10#记录连续8次<SoundLevel状态,1秒钟大概抓40次数据。10次代表0.2秒,即表示无声。
- NoSoundTLimit = 0.1 #没有声音时长判断标准
- SoundTLimit = 0.06 #有声时长判断标准
- ExpList = []
- t = None
- q = Queue.Queue()
- # processing block
- # window = signal.hamming(CHUNK)
- TAG = "SoundTool"
- MuTex = Lock()
- class AudioIdentify():
- # 设备名称,默认拾音器=0, 亚为USB=1;
- Sound_Has = 0
- Sound_No = 1
- Sound_Break = 2
- def __init__(self):
- print "Init AudioIdentify"
- self.aAnalysis = AAnalysis()
- self.isSound = False #整个监听过程,是否有声音
- self.hasBlock = False #是否有断续现象
- self.deviceType = 0
- # self.audioChecker = Sound()
- self.audioChecker = None
- self.audioRecorder = ""
- try:
- list = getVoicecards()
- print u'音频设备列表:',list
- self.default = list["default"]
- for item in list["devices"]:
- if self.default == item["name"]:
- self.deviceType = int(item["type"])
- break
-
- if self.deviceType == 1: # usb
- print u'使用usb声音检测设备'
- self.audioChecker = SoundManager()
- if self.audioChecker.IsOpen != 0:
- print u'初始化声音设备失败'
- else:
- self.audioChecker = ARecorder()
- print u'使用拾音器设备'
- except Exception, e:
- self.audioChecker = ARecorder()
- print u"获取VoiceCards失败", e
- # 是否有声音,muteVoltage静音电压值
- # 废弃函数
- def hasSound(self, duration, muteVoltage = 1.0):
- if self.deviceType == 0:
- return self.isSound
- else:
- if self.audioChecker == None:
- print u'USB设备未初始化'
- return False
- # 获取静音百分比,1.0表示完全静音;
- percentMute = self.audioChecker.IsMute(duration, muteVoltage)
- # 静音百分比小于1.0时认为有声音;
- self.isSound = True if percentMute < 1.0 else False
- return self.isSound
- '''
- 返回当前设备状态。设备就绪返回True,失败返回False。
- '''
- def getStatus(self):
- if self.deviceType == 0:
- return self.audioChecker.getStatus()
- if self.audioChecker == None:
- print u'USB设备未初始化'
- return False
- return True if self.audioChecker.IsOpen == 0 else False
- '''
- 判断当前声音是否中断,中断返回True,没有中断返回False
- collectionTime: 采集时长,单位毫秒
- muteVoltage: 静音电压值,单位伏
- interruptTime: 停顿时长,单位毫秒
- '''
- def isInterrupt(self, collectionTime = 5000, muteVoltage = 1.0, interruptTime = 200):
- # return self.audioChecker.isInterrupt()
- if self.audioChecker == None:
- print u'USB设备未初始化'
- return False
- return self.audioChecker.IsInterrupt( collectionTime, muteVoltage, interruptTime)
- '''
- 判断当前是否监听到声音,成功返回True,失败返回False。
- collectionTime: 采集时长,单位毫秒
- muteVoltage: 静音电压值,单位伏
- [return] Bool
- '''
- def isOK(self):
- return self.isSound
- '''
- 采集声音(主线程)
- [int] int :采集多少时长的声明,以秒为单位
- [return] void
- '''
- def startCHK(self, seconds,muteVoltage = 1.0):
- if self.deviceType == 0:
- self.monitorSound(seconds)
- elif self.deviceType == 1:
- if self.audioChecker == None:
- print u'USB设备未初始化'
- return False
- # 获取静音百分比,1.0表示完全静音;
- percentMute = self.audioChecker.IsMute(seconds, muteVoltage)
- # 静音百分比小于1.0时认为有声音;
- self.isSound = True if percentMute < 1.0 else False
- # return self.isSound
- '''
- 返回monitorSound过程中的声音平均强度,如果LR为True则返回左右声道的声音强度,Freq为频率范围,单位Hz。
- 切割音频,进行最大强度累计,然后算平均值。
- :param minFreq:频率范围中的最小频率值。
- :param maxFreq:频率范围中的最大频率值。
- :param LR:左右声道检测选项。True:左右声道分开检测;False:仅检测0声道,可能是左声道,有可能仅有的一个声道。
- :param audioFile: 传入新的音频文件,不用monitorSound的音频文件。
- :return: LR=True时,返回左右声道声音强度2个值,LR=False时,返回一个声道强度值。
- '''
- def getSoundPower(self, minFreq=50, maxFreq=20000, LR = False, audioFile="", avg=False):
- if audioFile.__len__() > 2:
- self.aAnalysis.analyWav(audioFile)
- return self.aAnalysis.getTotalAVGPower(LR=LR)
- '''
- 获取左右声道有无声的状态。
- :return result,valueList。
- result:-1代表声音检测异常;1表示单声道;2 表示双声道
- valueList:-1代表没有声音,0代表仅左声道,1代表仅右声道,2代表左右声道有声
- '''
- def getLRVariation(self):
- return self.aAnalysis.getLRVariation()
- '''
- 监听声音状态,阻塞式监听
- :param seconds:监听声音异常时间
- :param expList数组:声音异常记录数组。数组第一个值{"time":time.asctime( time.localtime(time.time()))},记录开始时间。
- 后续,碰到一次异常,记录一次。
- expList第一个字典数据,有audio 键,取录音文件路径。status键:0 代表无异常;1代表没有声音发出;2代表声音有间断。
- :param waveFile:可以指定录制的音频文件路径
- :return expList数组。 和带入的expList一样
- '''
- def monitorSound(self, seconds,expList=None, waveFile = None, buf_time=READ_BUF_TIME):
- global ExpList
- MuTex.acquire()
- print "monitorSound start:",seconds,expList
- del ExpList
- if expList is not None:
- ExpList = expList
- else:
- ExpList = []
- if self.deviceType == 0:
- firstItem = {"time":time.asctime(time.localtime(time.time()))}
- # analysisLock = threading.Condition()
- self.aAnalysis.startFramesAnalysis(self.audioChecker.getFrameQue(),
- self.audioChecker.CHANNELS,
- self.audioChecker.WIDTH,
- buf_time)
- self.audioChecker.monitor(seconds,saveWave=True, waveFile=waveFile, buf_time=buf_time)
- self.audioRecorder = self.audioChecker.waveFile
- time.sleep(1) #预留1秒钟,用于音频分析。
- self.aAnalysis.endFramesAnalysis()
- firstItem["audio"] = self.audioChecker.getWavFile()
- ExpList.append(firstItem)
- retExpList = self.changeMonSoundResult(ExpList)
- # print u'monitorSound end',retExpList
- MuTex.release() # 解锁
- print "monitorSound end:", seconds, retExpList
- return retExpList
- elif self.deviceType == 1:
- # 采集时长,单位毫秒;
- # collectionTime = 5000
- # 静音电压,单位伏;
- muteVoltage = 1.0
- # 停顿时长,单位毫秒;
- interruptTime = 200
- ExpList.append({"time":time.asctime( time.localtime(time.time())),"audio":""})
- isInterrupt = self.audioChecker.IsInterrupt( seconds, muteVoltage, interruptTime)
- if isInterrupt == True:
- ExpList.append({"time":time.asctime( time.localtime(time.time()))})
- MuTex.release() # 解锁
- return ExpList
- '''
- 启动一个线程,调用monitorSound函数。
- 详细说明,查看monitorSound函数。
- '''
- def monitorSoundTH(self,seconds,expList=None, waveFile = None, buf_time=READ_BUF_TIME):
- thread.start_new_thread(self.monitorSound, (seconds, expList, waveFile, buf_time,))
- def changeMonSoundResult(self,ExpList):
- self.isSound = self.aAnalysis.hasSound
- self.hasBlock = self.aAnalysis.hasBlock
- soundPower = self.getSoundPower()
- print "changeMonSoundResult:", self.aAnalysis.hasSound, self.aAnalysis.hasBlock, self.aAnalysis.hasPlosive,soundPower
- if soundPower - self.aAnalysis.TH_POWER >= 0:
- self.aAnalysis.hasSound = True
- if self.aAnalysis.hasSound is True and self.aAnalysis.hasBlock is True:
- ExpList[0]["status"] = self.Sound_Break
- elif self.aAnalysis.hasSound is True and self.aAnalysis.hasBlock is False:
- ExpList[0]["status"] = self.Sound_Has
- else:
- ExpList[0]["status"] = self.Sound_No
- retExpList = []
- for item in ExpList:
- retExpList.append(item)
- # del ExpList #线程调用,回传参数,需要用到
- return retExpList
- # print "changeMonSoundResult:",ExpList
- '''
- 开启一个后台线程,录制音频文件,并返回音频文件路径(保存于临时文件夹)。
- :param seconds:录制声音时间
- :param fileName:希望命名的文件名
- :param CHANNELS:录制单声道时为1,录制双声道时为2
- 成功时返回音频文件路径,失败则返回空结果
- '''
- def recordAudioFileReturnPath(self, seconds, fileName,CHANNELS = 2):
- try:
- filePath = os.path.join(getSATTmpDIR(), fileName)
- audioPath = self.monitorSound(seconds,waveFile=filePath)[0]["audio"]
- return audioPath
- except Exception,e:
- # LoggingUtil.printLog(u"获取录制的音频文件失败!")
- print u"获取录制的音频文件失败!",e
- return "wav file record fail!!!"
- '''
- 读取立体声文件并分离左右声道音源,
- 并返回左右声道音源的路径
- '''
- def splitChannel(self, filePath = ""):
- # 读取WAV声音文件
- if filePath == "":
- filePath = self.audioChecker.getWavFile()
- sampleRate, musicData = wavfile.read(filePath)
- # 提取左右声道数据
- left = []
- right = []
- musicData.shape = -1,2
- musicData = musicData.T
- left = musicData[0]
- right = musicData[1]
- # 写入结果文件
- section_path = filePath.split(".")[0]
- leftPath = section_path + "_left.wav"
- rightPath = section_path + "_right.wav"
- wavfile.write(leftPath, sampleRate, np.array(left))
- wavfile.write(rightPath, sampleRate, np.array(right))
- return leftPath, rightPath
- if __name__ == "__main__":
- aIden = AudioIdentify()
- aIden.monitorSound(5)
- time.sleep(1)
- # sound1 = "D:/1.wav"
- # sound2 = "D:/2.wav"
- # list1=[]
- # expList1 = aIden.monitorSoundTH(10)
- # time.sleep(15)
- # aIden.recordAudioFileReturnPath(5, "wavetest_%s.wav" % str(1))
- # print "expList1:",expList1
- # print "soundPower:", aIden.getSoundPower()
- # print "fftsoundPower:", aIden.aAnalysis.getFFTPower()
- # for i in range(10):
- # filePath = aIden.recordAudioFileReturnPath(5,"wavetest_%s.wav"%str(i))
- # print "getSoundPower:", aIden.getSoundPower(LR=True)
- # print "filePath:", filePath
- # time.sleep(3)
- # print "getSoundPower:",aIden.getSoundPower(LR=True)
- # print "getTotalAVGPower:",aIden.aAnalysis.getTotalAVGPower(LR=True)
- # aIden.aAnalysis.analyWav("D:/5.wav")
- # print "getLRVariation:",aIden.getLRVariation()
- # print "getFFTPower:",aIden.aAnalysis.getFFTPower(LR=True)
- # print "aIden.isOK():", aIden.isOK(),list1
- # shutil.move(expList1[0]['audio'], sound1)
- # for i in range(10):
- # expList2 = aIden.monitorSound(1)
- # print "expList2:",i, expList2
- # shutil.move(expList2[0]['audio'], sound2)
- # waveFile = "sound/wav_balance_v15.wav"
- # print "getSoundPower:", aIden.getSoundPower(LR=True,audioFile=waveFile)
|