MRF*_*MRF 9 python gradio chatgpt-api langchain
我在 Python 中使用 Langchain 和 Gradio 接口。我制作了一个对话代理,并尝试将其响应传输到 Gradio 聊天机器人界面。我查看了 Langchain 文档,但找不到使用代理实现流式传输的示例。这是我的代码的一些部分:
# Loading the LLM
def load_llm():
return AzureChatOpenAI(
temperature=hparams["temperature"],
top_p=hparams["top_p"],
max_tokens=hparams["max_tokens"],
presence_penalty=hparams["presence_penalty"],
frequency_penalty=hparams["freq_penaulty"],
streaming=True,
callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]),
verbose=True,
model_name=hparams["model"],
deployment_name = models_dict[hparams["model"]],
)
# Loading the agent
def load_chain(memory, sys_msg, llm):
"""Logic for loading the chain you want to use should go here."""
agent_chain = initialize_agent(tools,
llm,
agent="conversational-react-description",
verbose=True,
memory=memory,
agent_kwargs = {"added_prompt": sys_msg},
streaming=True,
)
return agent_chain
# Creating the chatbot to be used in Gradio.
class ChatWrapper:
def __init__(self, sys_msg):
self.lock = Lock()
self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,)
self.chain = load_chain(self.memory, sys_msg, load_llm())
self.sysmsg = sys_msg
def __call__(
self, api_key: str, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain]
):
"""Execute the chat functionality."""
self.lock.acquire()
try:
history = history or []
# Run chain and append input.
output = self.chain.run(input=inp)
history.append((inp, output))
except Exception as e:
raise e
finally:
self.lock.release()
return history, history
Run Code Online (Sandbox Code Playgroud)
我目前可以流式传输到终端输出,但我正在寻找的是在我的 Gradio 界面中流式传输。
你能帮我吗?
小智 6
可能的解决方案之一是使用队列作为中介。
from queue import SimpleQueue
q = SimpleQueue()
Run Code Online (Sandbox Code Playgroud)
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from typing import Any, Union
job_done = object() # signals the processing is done
class StreamingGradioCallbackHandler(BaseCallbackHandler):
def __init__(self, q: SimpleQueue):
self.q = q
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""Run when LLM starts running. Clean the queue."""
while not self.q.empty():
try:
self.q.get(block=False)
except Empty:
continue
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""Run on new LLM token. Only available when streaming is enabled."""
self.q.put(token)
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Run when LLM ends running."""
self.q.put(job_done)
def on_llm_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> None:
"""Run when LLM errors."""
self.q.put(job_done)
Run Code Online (Sandbox Code Playgroud)
callback_manager=CallbackManager([StreamingGradioCallbackHandler(q),
StreamingStdOutCallbackHandler()]),
Run Code Online (Sandbox Code Playgroud)
我不明白你的 ChatWrapper。实际上,我对 Gradio 并不熟悉,所以我将依赖文档中的示例。
from threading import Thread
def bot(history):
user_question = history[-1][0]
thread = Thread(target=chain.run, kwargs={"input": user_question})
thread.start()
history[-1][1] = ""
while True:
next_token = q.get(block=True) # Blocks until an input is available
if next_token is job_done:
break
history[-1][1] += next_token
yield history
thread.join()
Run Code Online (Sandbox Code Playgroud)
小智 -1
如果您可以在标准输出上写入,为什么不也从中读取呢?
import subprocess
def listen(cmd): # cmd = 'python', '-m' 'your_langchain.py'
"""from http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
"""
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout = []
while True:
line = p.stdout.readline()
stdout.append(line)
print line,
if line == '' and p.poll() != None:
break
return ''.join(stdout)
Run Code Online (Sandbox Code Playgroud)
来自https://www.saltycrane.com/blog/2009/10/how-capture-stdout-in-real-time-python/