dev*_*sha 4 python subprocess python-multithreading rasa-nlu
我想实现与此非常相似的目标。
\n\n我的实际目标是从 python 中运行 Rasa。\n取自 Rasa\ 的站点:
\n\n\n\n\nRasa 是一个用于构建会话软件的框架:Messenger/Slack 机器人、Alexa 技能等。我们\xe2\x80\x99ll 在本文档中将其缩写为机器人。
\n
它基本上是一个在命令提示符下运行的聊天机器人。这就是它在 cmd 上的工作方式:\n
现在我想从 python 运行 Rasa,以便可以将其与基于 Django 的网站集成。即我想继续从用户那里获取输入,将其传递给rasa,rasa处理文本并给我一个输出,然后将其显示给用户。
\n\n我已经尝试过这个(到目前为止从cmd运行它)
\n\nimport sys\nimport subprocess\nfrom threading import Thread\nfrom queue import Queue, Empty # python 3.x\n\n\ndef enqueue_output(out, queue):\n for line in iter(out.readline, b\'\'):\n queue.put(line)\n out.close()\n\n\ndef getOutput(outQueue):\n outStr = \'\'\n try:\n while True: #Adds output from the Queue until it is empty\n outStr+=outQueue.get_nowait()\n except Empty:\n return outStr\n\np = subprocess.Popen(\'command_to_run_rasa\', \n stdin=subprocess.PIPE, \n stdout=subprocess.PIPE, \n stderr=subprocess.PIPE, \n shell=False, \n universal_newlines=True,\n )\n\noutQueue = Queue()\n\noutThread = Thread(target=enqueue_output, args=(p.stdout, outQueue))\n\noutThread.daemon = True\n\noutThread.start()\n\nsomeInput = ""\n\nwhile someInput != "stop":\n someInput = input("Input: ") # to take input from user\n p.stdin.write(someInput) # passing input to be processed by the rasa command\n p.stdin.flush()\n output = getOutput(outQueue)\n print("Output: " + output + "\\n")\n p.stdout.flush()\nRun Code Online (Sandbox Code Playgroud)\n\n但它仅适用于第一行输出。不适用于连续的输入/输出周期。请参阅下面的输出。
\n\n\n\n我如何让它在多个周期中工作?\n我已经提到过这个,我想我从中理解了我的代码中的问题,但我不知道如何解决它。
\n\n编辑:我在 Windows 10 上使用 Python 3.6.2(64 位)
\n您需要继续与子流程进行交互 - 目前,一旦您从子流程中选择了输出,当您关闭其STDOUT流时,您就已经完成了。
这是继续用户输入 -> 处理输出循环的最基本方法:
import subprocess
import sys
import time
if __name__ == "__main__": # a guard from unintended usage
input_buffer = sys.stdin # a buffer to get the user input from
output_buffer = sys.stdout # a buffer to write rasa's output to
proc = subprocess.Popen(["path/to/rasa", "arg1", "arg2", "etc."], # start the process
stdin=subprocess.PIPE, # pipe its STDIN so we can write to it
stdout=output_buffer, # pipe directly to the output_buffer
universal_newlines=True)
while True: # run a main loop
time.sleep(0.5) # give some time for `rasa` to forward its STDOUT
print("Input: ", end="", file=output_buffer, flush=True) # print the input prompt
print(input_buffer.readline(), file=proc.stdin, flush=True) # forward the user input
Run Code Online (Sandbox Code Playgroud)
您可以替换input_buffer为来自远程用户的缓冲区和output_buffer将数据转发给您的用户的缓冲区,您将基本上得到您正在寻找的内容 - 子进程将获取输入直接来自用户 ( input_buffer) 并将其输出打印给用户 ( output_buffer)。
如果您需要在所有这些都在后台运行时执行其他任务,只需if __name__ == "__main__":在单独的线程中运行所有内容,我建议添加一个try..except块以优雅地拾取KeyboardInterrupt和退出。
但是......很快您就会注意到它并不总是正常工作 - 如果打印rasa它STDOUT并进入等待STDIN阶段需要超过半秒的等待时间,输出将开始混合。这个问题比您想象的要复杂得多。主要问题是STDOUT和STDIN(和STDERR) 是单独的缓冲区,您无法知道子进程何时实际上在其 上期待某些内容STDIN。这意味着,如果没有子进程的明确指示(例如,\r\n[path]>Windows CMD 提示符STDOUT),您只能将数据发送到子进程STDIN并希望它会被接收。
根据您的屏幕截图,它并没有真正给出可区分的STDIN请求提示,因为第一个提示是... :\n然后它等待STDIN,但是一旦发送命令,它就会列出选项,而不会指示其流结束STDOUT(技术上使提示只是...\n但这也将匹配其前面的任何行)。也许您可以聪明地逐行阅读STDOUT,然后在每条新行上测量自子进程写入以来已经过去了多少时间,一旦达到不活动阈值,就假设需要rasa输入并提示用户输入。就像是:
import subprocess
import sys
import threading
# we'll be using a separate thread and a timed event to request the user input
def timed_user_input(timer, wait, buffer_in, buffer_out, buffer_target):
while True: # user input loop
timer.wait(wait) # wait for the specified time...
if not timer.is_set(): # if the timer was not stopped/restarted...
print("Input: ", end="", file=buffer_out, flush=True) # print the input prompt
print(buffer_in.readline(), file=buffer_target, flush=True) # forward the input
timer.clear() # reset the 'timer' event
if __name__ == "__main__": # a guard from unintended usage
input_buffer = sys.stdin # a buffer to get the user input from
output_buffer = sys.stdout # a buffer to write rasa's output to
proc = subprocess.Popen(["path/to/rasa", "arg1", "arg2", "etc."], # start the process
stdin=subprocess.PIPE, # pipe its STDIN so we can write to it
stdout=subprocess.PIPE, # pipe its STDIN so we can process it
universal_newlines=True)
# lets build a timer which will fire off if we don't reset it
timer = threading.Event() # a simple Event timer
input_thread = threading.Thread(target=timed_user_input,
args=(timer, # pass the timer
1.0, # prompt after one second
input_buffer, output_buffer, proc.stdin))
input_thread.daemon = True # no need to keep the input thread blocking...
input_thread.start() # start the timer thread
# now we'll read the `rasa` STDOUT line by line, forward it to output_buffer and reset
# the timer each time a new line is encountered
for line in proc.stdout:
output_buffer.write(line) # forward the STDOUT line
output_buffer.flush() # flush the output buffer
timer.set() # reset the timer
Run Code Online (Sandbox Code Playgroud)
您可以使用类似的技术来检查更复杂的“预期用户输入”模式。有一个名为pexpect“设计”的完整模块可以处理此类任务,如果您愿意放弃一些灵活性,我衷心推荐它。
现在...说了这么多,您知道它Rasa是用 Python 构建的,作为 Python 模块安装并具有 Python API,对吧?STDOUT/STDIN既然您已经在使用 Python,为什么当您可以直接从 Python 代码运行它时,还要将其称为子进程并处理所有这些恶作剧呢?只需导入它并直接与其交互,他们甚至有一个非常简单的示例,完全可以完成您想要做的事情:Rasa Core 和最少的 Python。