如何在 Langchain 中传输代理的响应?

MRF*_*MRF 9 python gradio chatgpt-api langchain

我在 Python 中使用 Langchain 和 Gradio 接口。我制作了一个对话代理,并尝试将其响应传输到 Gradio 聊天机器人界面。我查看了 Langchain 文档,但找不到使用代理实现流式传输的示例。这是我的代码的一些部分:

# Loading the LLM
def load_llm():
    return AzureChatOpenAI(
        temperature=hparams["temperature"],
        top_p=hparams["top_p"],
        max_tokens=hparams["max_tokens"],
        presence_penalty=hparams["presence_penalty"],
        frequency_penalty=hparams["freq_penaulty"],
        streaming=True, 
        callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]), 
        verbose=True,
        model_name=hparams["model"],
        deployment_name = models_dict[hparams["model"]],
        )

# Loading the agent
def load_chain(memory, sys_msg, llm):
    """Logic for loading the chain you want to use should go here."""
    agent_chain = initialize_agent(tools, 
                                   llm, 
                                   agent="conversational-react-description", 
                                   verbose=True, 
                                   memory=memory, 
                                   agent_kwargs = {"added_prompt": sys_msg},
                                   streaming=True, 
                                   )
    return agent_chain

# Creating the chatbot to be used in Gradio.
class ChatWrapper:

    def __init__(self, sys_msg):
        self.lock = Lock()
        self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,)
        self.chain = load_chain(self.memory, sys_msg, load_llm())
        self.sysmsg = sys_msg
    def __call__(
        self, api_key: str, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain]
    ):
        """Execute the chat functionality."""
        self.lock.acquire()
        try:
            history = history or []
            # Run chain and append input.
            output = self.chain.run(input=inp)
            
            history.append((inp, output))
        except Exception as e:
            raise e
        finally:
            self.lock.release()
        return history, history
Run Code Online (Sandbox Code Playgroud)

我目前可以流式传输到终端输出,但我正在寻找的是在我的 Gradio 界面中流式传输。

你能帮我吗?

小智 6

可能的解决方案之一是使用队列作为中介。

  1. 创建队列
from queue import SimpleQueue
q = SimpleQueue()
Run Code Online (Sandbox Code Playgroud)
  1. 创建一个自定义回调,将生成的令牌写入队列
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from typing import Any, Union


job_done = object() # signals the processing is done

class StreamingGradioCallbackHandler(BaseCallbackHandler):
    def __init__(self, q: SimpleQueue):
        self.q = q

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        """Run when LLM starts running. Clean the queue."""
        while not self.q.empty():
            try:
                self.q.get(block=False)
            except Empty:
                continue

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """Run on new LLM token. Only available when streaming is enabled."""
        self.q.put(token)

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Run when LLM ends running."""
        self.q.put(job_done)

    def on_llm_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> None:
        """Run when LLM errors."""
        self.q.put(job_done)
Run Code Online (Sandbox Code Playgroud)
  1. 给您的 LLM 回电
callback_manager=CallbackManager([StreamingGradioCallbackHandler(q),
                                  StreamingStdOutCallbackHandler()]), 
Run Code Online (Sandbox Code Playgroud)
  1. 在 Gradio 代码中,创建一个并行线程,它将运行您的代理。从队列中读取。

我不明白你的 ChatWrapper。实际上,我对 Gradio 并不熟悉,所以我将依赖文档中的示例。

from threading import Thread

def bot(history):
    user_question = history[-1][0]
    thread = Thread(target=chain.run, kwargs={"input": user_question})
    thread.start()
    history[-1][1] = ""
    while True:
        next_token = q.get(block=True) # Blocks until an input is available
        if next_token is job_done:
            break
        history[-1][1] += next_token
        yield history
    thread.join()
Run Code Online (Sandbox Code Playgroud)


小智 -1

如果您可以在标准输出上写入,为什么不也从中读取呢?

import subprocess

def listen(cmd): # cmd = 'python', '-m' 'your_langchain.py'
    """from http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
    """
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout = []
    while True:
        line = p.stdout.readline()
        stdout.append(line)
        print line,
        if line == '' and p.poll() != None:
            break
    return ''.join(stdout)
Run Code Online (Sandbox Code Playgroud)

来自https://www.saltycrane.com/blog/2009/10/how-capture-stdout-in-real-time-python/