从 python 中运行交互式程序

dev*_*sha 4 python subprocess python-multithreading rasa-nlu

我想实现与非常相似的目标。

\n\n

我的实际目标是从 python 中运行 Rasa。\n取自 Rasa\ 的站点:

\n\n
\n

Rasa 是一个用于构建会话软件的框架:Messenger/Slack 机器人、Alexa 技能等。我们\xe2\x80\x99ll 在本文档中将其缩写为机器人。

\n
\n\n

它基本上是一个在命令提示符下运行的聊天机器人。这就是它在 cmd 上的工作方式:\n在此输入图像描述

\n\n

现在我想从 python 运行 Rasa,以便可以将其与基于 Django 的网站集成。即我想继续从用户那里获取输入,将其传递给rasa,rasa处理文本并给我一个输出,然后将其显示给用户。

\n\n

我已经尝试过这个(到目前为止从cmd运行它)

\n\n
import sys\nimport subprocess\nfrom threading import Thread\nfrom queue import Queue, Empty  # python 3.x\n\n\ndef enqueue_output(out, queue):\n    for line in iter(out.readline, b\'\'):\n        queue.put(line)\n    out.close()\n\n\ndef getOutput(outQueue):\n    outStr = \'\'\n    try:\n        while True: #Adds output from the Queue until it is empty\n            outStr+=outQueue.get_nowait()\n    except Empty:\n        return outStr\n\np = subprocess.Popen(\'command_to_run_rasa\', \n                    stdin=subprocess.PIPE, \n                    stdout=subprocess.PIPE, \n                    stderr=subprocess.PIPE, \n                    shell=False, \n                    universal_newlines=True,\n                    )\n\noutQueue = Queue()\n\noutThread = Thread(target=enqueue_output, args=(p.stdout, outQueue))\n\noutThread.daemon = True\n\noutThread.start()\n\nsomeInput = ""\n\nwhile someInput != "stop":\n    someInput = input("Input: ") # to take input from user\n    p.stdin.write(someInput) # passing input to be processed by the rasa command\n    p.stdin.flush()\n    output = getOutput(outQueue)\n    print("Output: " + output + "\\n")\n    p.stdout.flush()\n
Run Code Online (Sandbox Code Playgroud)\n\n

但它仅适用于第一行输出。不适用于连续的输入/输出周期。请参阅下面的输出。

\n\n

在此输入图像描述

\n\n

我如何让它在多个周期中工作?\n我已经提到过这个,我想我从中理解了我的代码中的问题,但我不知道如何解决它。

\n\n

编辑:我在 Windows 10 上使用 Python 3.6.2(64 位)

\n

zwe*_*wer 7

您需要继续与子流程进行交互 - 目前,一旦您从子流程中选择了输出,当您关闭其STDOUT流时,您就已经完成了。

这是继续用户输入 -> 处理输出循环的最基本方法:

import subprocess
import sys
import time

if __name__ == "__main__":  # a guard from unintended usage
    input_buffer = sys.stdin  # a buffer to get the user input from
    output_buffer = sys.stdout  # a buffer to write rasa's output to
    proc = subprocess.Popen(["path/to/rasa", "arg1", "arg2", "etc."],  # start the process
                            stdin=subprocess.PIPE,  # pipe its STDIN so we can write to it
                            stdout=output_buffer, # pipe directly to the output_buffer
                            universal_newlines=True)
    while True:  # run a main loop
        time.sleep(0.5)  # give some time for `rasa` to forward its STDOUT
        print("Input: ", end="", file=output_buffer, flush=True)  # print the input prompt
        print(input_buffer.readline(), file=proc.stdin, flush=True)  # forward the user input
Run Code Online (Sandbox Code Playgroud)

您可以替换input_buffer为来自远程用户的缓冲区和output_buffer将数据转发给您的用户的缓冲区,您将基本上得到您正在寻找的内容 - 子进程将获取输入直接来自用户 ( input_buffer) 并将其输出打印给用户 ( output_buffer)。

如果您需要在所有这些都在后台运行时执行其他任务,只需if __name__ == "__main__":在单独的线程中运行所有内容,我建议添加一个try..except块以优雅地拾取KeyboardInterrupt和退出。

但是......很快您就会注意到它并不总是正常工作 - 如果打印rasaSTDOUT并进入等待STDIN阶段需要超过半秒的等待时间,输出将开始混合。这个问题比您想象的要复杂得多。主要问题是STDOUTSTDIN(和STDERR) 是单独的缓冲区,您无法知道子进程何时实际上在其 上期待某些内容STDIN。这意味着,如果没有子进程的明确指示(例如,\r\n[path]>Windows CMD 提示符STDOUT),您只能将数据发送到子进程STDIN并希望它会被接收。

根据您的屏幕截图,它并没有真正给出可区分的STDIN请求提示,因为第一个提示是... :\n然后它等待STDIN,但是一旦发送命令,它就会列出选项,而不会指示其流结束STDOUT(技术上使提示只是...\n但这也将匹配其前面的任何行)。也许您可以聪明地逐行阅读STDOUT,然后在每条新行上测量自子进程写入以来已经过去了多少时间,一旦达到不活动阈值,就假设需要rasa输入并提示用户输入。就像是:

import subprocess
import sys
import threading

# we'll be using a separate thread and a timed event to request the user input
def timed_user_input(timer, wait, buffer_in, buffer_out, buffer_target):
    while True:  # user input loop
        timer.wait(wait)  # wait for the specified time...
        if not timer.is_set():  # if the timer was not stopped/restarted...
            print("Input: ", end="", file=buffer_out, flush=True)  # print the input prompt
            print(buffer_in.readline(), file=buffer_target, flush=True)  # forward the input
        timer.clear()  # reset the 'timer' event

if __name__ == "__main__":  # a guard from unintended usage
    input_buffer = sys.stdin  # a buffer to get the user input from
    output_buffer = sys.stdout  # a buffer to write rasa's output to
    proc = subprocess.Popen(["path/to/rasa", "arg1", "arg2", "etc."],  # start the process
                            stdin=subprocess.PIPE,  # pipe its STDIN so we can write to it
                            stdout=subprocess.PIPE,  # pipe its STDIN so we can process it
                            universal_newlines=True)
    # lets build a timer which will fire off if we don't reset it
    timer = threading.Event()  # a simple Event timer
    input_thread = threading.Thread(target=timed_user_input,
                                    args=(timer,  # pass the timer
                                          1.0,  # prompt after one second
                                          input_buffer, output_buffer, proc.stdin))
    input_thread.daemon = True  # no need to keep the input thread blocking...
    input_thread.start()  # start the timer thread
    # now we'll read the `rasa` STDOUT line by line, forward it to output_buffer and reset
    # the timer each time a new line is encountered
    for line in proc.stdout:
        output_buffer.write(line)  # forward the STDOUT line
        output_buffer.flush()  # flush the output buffer
        timer.set()  # reset the timer
Run Code Online (Sandbox Code Playgroud)

您可以使用类似的技术来检查更复杂的“预期用户输入”模式。有一个名为pexpect“设计”的完整模块可以处理此类任务,如果您愿意放弃一些灵活性,我衷心推荐它。

现在...说了这么多,您知道它Rasa是用 Python 构建的,作为 Python 模块安装并具有 Python API,对吧?STDOUT/STDIN既然您已经在使用 Python,为什么当您可以直接从 Python 代码运行它时,还要将其称为子进程并处理所有这些恶作剧呢?只需导入它并直接与其交互,他们甚至有一个非常简单的示例,完全可以完成您想要做的事情:Rasa Core 和最少的 Python