Pho*_*rce 2 python audio multithreading pyaudio
我遇到了一些问题,似乎无法理解这个概念.
我想要做的是这样的:
让麦克风"收听"浊音(高于特定阈值),然后开始录制到.wav文件,直到此人停止说话/信号不再存在.例如:
begin:
listen() -> nothing is being said
listen() -> nothing is being said
listen() -> VOICED - _BEGIN RECORDING_
listen() -> VOICED - _BEGIN RECORDING_
listen() -> UNVOICED - _END RECORDING_
end
Run Code Online (Sandbox Code Playgroud)
我也想用"线程"这样做,这样就可以创建一个不断"监听"文件的线程,而另一个线程会在有声音数据的时候开始.但是,我不能为我的生活弄清楚如何我应该去做..到目前为止,这是我的代码:
import wave
import sys
import threading
from array import array
from sys import byteorder
try:
import pyaudio
CHECK_PYLIB = True
except ImportError:
CHECK_PYLIB = False
class Audio:
_chunk = 0.0
_format = 0.0
_channels = 0.0
_rate = 0.0
record_for = 0.0
stream = None
p = None
sample_width = None
THRESHOLD = 500
# initial constructor to accept params
def __init__(self, chunk, format, channels, rate):
#### set data-types
self._chunk = chunk
self.format = pyaudio.paInt16,
self.channels = channels
self.rate = rate
self.p = pyaudio.PyAudio();
def open(self):
# print "opened"
self.stream = self.p.open(format=pyaudio.paInt16,
channels=2,
rate=44100,
input=True,
frames_per_buffer=1024);
return True
def record(self):
# create a new instance/thread to record the sound
threading.Thread(target=self.listen).start();
def is_silence(snd_data):
return max(snd_data) < THRESHOLD
def listen(self):
r = array('h')
while True:
snd_data = array('h', self.stream.read(self._chunk))
if byteorder == 'big':
snd_data.byteswap()
r.extend(snd_data)
return sample_width, r
Run Code Online (Sandbox Code Playgroud)
我猜我可以记录"5"秒块,然后,如果该块被视为"浊音",那么线程应该被启动,直到所有语音数据都被捕获.然而,因为在当前它是在while True:我不想捕获所有音频直到有浊音命令,所以例如"没有声音","没有声音","声音","声音","没有声音", "没有声音"我只想要wav文件中的"声音"..任何人都有任何建议吗?
谢谢
编辑:
import wave
import sys
import time
import threading
from array import array
from sys import byteorder
from Queue import Queue, Full
import pyaudio
CHUNK_SIZE = 1024
MIN_VOLUME = 500
BUF_MAX_SIZE = 1024 * 10
process_g = 0
def main():
stopped = threading.Event()
q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK_SIZE)))
listen_t = threading.Thread(target=listen, args=(stopped, q))
listen_t.start()
process_g = threading.Thread(target=process, args=(stopped, q))
process_g.start()
try:
while True:
listen_t.join(0.1)
process_g.join(0.1)
except KeyboardInterrupt:
stopped.set()
listen_t.join()
process_g.join()
def process(stopped, q):
while True:
if stopped.wait(timeout = 0):
break
print "I'm processing.."
time.sleep(300)
def listen(stopped, q):
stream = pyaudio.PyAudio().open(
format = pyaudio.paInt16,
channels = 2,
rate = 44100,
input = True,
frames_per_buffer = 1024
)
while True:
if stopped and stopped.wait(timeout=0):
break
try:
print process_g
for i in range(0, int(44100 / 1024 * 5)):
data_chunk = array('h', stream.read(CHUNK_SIZE))
vol = max(data_chunk)
if(vol >= MIN_VOLUME):
print "WORDS.."
else:
print "Nothing.."
except Full:
pass
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
现在,每隔5秒钟,我需要执行"进程"功能,然后处理数据(time.delay(10),同时执行此操作,然后重新开始记录.
花了一些时间在上面,我想出了以下代码,似乎正在做你需要的.它当然没有做的是将音频数据写入__CODE__(或等效的)文件,但您可以自己实现:
import threading
from array import array
from Queue import Queue, Full
import pyaudio
CHUNK_SIZE = 1024
MIN_VOLUME = 500
# if the recording thread can't consume fast enough, the listener will start discarding
BUF_MAX_SIZE = CHUNK_SIZE * 10
def main():
stopped = threading.Event()
q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK_SIZE)))
listen_t = threading.Thread(target=listen, args=(stopped, q))
listen_t.start()
record_t = threading.Thread(target=record, args=(stopped, q))
record_t.start()
try:
while True:
listen_t.join(0.1)
record_t.join(0.1)
except KeyboardInterrupt:
stopped.set()
listen_t.join()
record_t.join()
def record(stopped, q):
while True:
if stopped.wait(timeout=0):
break
chunk = q.get()
vol = max(chunk)
if vol >= MIN_VOLUME:
# TODO: write to file
print "O",
else:
print "-",
def listen(stopped, q):
stream = pyaudio.PyAudio().open(
format=pyaudio.paInt16,
channels=2,
rate=44100,
input=True,
frames_per_buffer=1024,
)
while True:
if stopped.wait(timeout=0):
break
try:
q.put(array('h', stream.read(CHUNK_SIZE)))
except Full:
pass # discard
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
看这里:
https://github.com/jeysonmc/python-google-speech-scripts/blob/master/stt_google.py
它甚至将Wav转换为flac并将其发送到google Speech api,如果你不需要它,只需删除stt_google_wav函数;)
| 归档时间: |
|
| 查看次数: |
20879 次 |
| 最近记录: |