dev*_*evZ 5 python flask openai-api langchain
我正在使用 Python Flask 应用程序进行数据聊天。因此,在控制台中,我可以直接从 OpenAI 获得流式响应,因为我可以使用 flag 启用流处理streaming=True。
问题是,我无法在 API 调用中 \xe2\x80\x9cforward\xe2\x80\x9d 流或 \xe2\x80\x9cshow\xe2\x80\x9d 流。
\n处理 OpenAI 和链的代码是:
\ndef askQuestion(self, collection_id, question):\n collection_name = "collection-" + str(collection_id)\n self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get('OPENAI_API_KEY'), streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]))\n self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True, output_key='answer')\n \n chroma_Vectorstore = Chroma(collection_name=collection_name, embedding_function=self.embeddingsOpenAi, client=self.chroma_client)\n\n\n self.chain = ConversationalRetrievalChain.from_llm(self.llm, chroma_Vectorstore.as_retriever(similarity_search_with_score=True),\n return_source_documents=True,verbose=VERBOSE, \n memory=self.memory)\n \n\n result = self.chain({"question": question})\n \n res_dict = {\n "answer": result["answer"],\n }\n\n res_dict["source_documents"] = []\n\n for source in result["source_documents"]:\n res_dict["source_documents"].append({\n "page_content": source.page_content,\n "metadata": source.metadata\n })\n\n return res_dict`\nRun Code Online (Sandbox Code Playgroud)\n和API路由代码:
\n@app.route("/collection/<int:collection_id>/ask_question", methods=["POST"])\ndef ask_question(collection_id):\n question = request.form["question"]\n # response_generator = document_thread.askQuestion(collection_id, question)\n # return jsonify(response_generator)\n\n def stream(question):\n completion = document_thread.askQuestion(collection_id, question)\n for line in completion['answer']:\n yield line\n\n return app.response_class(stream_with_context(stream(question)))\nRun Code Online (Sandbox Code Playgroud)\n我正在使用curl 测试我的端点,并将标志-N 传递给curl,因此如果可能的话,我应该获得可流式响应。
\n当我首先进行 API 调用时,端点正在等待处理数据(我可以在 VS 代码的终端中看到可流式答案),完成后,我会一次性显示所有内容。
\n谢谢
\n通过使用threading和callback我们可以从 Flask API 获得流式响应。
在flask API中,您可以通过langchain的回调创建一个队列来注册令牌。
class StreamingHandler(BaseCallbackHandler):
...
def on_llm_new_token(self, token: str, **kwargs) -> None:
self.queue.put(token)
Run Code Online (Sandbox Code Playgroud)
您可以get从烧瓶路线中的同一队列中获得令牌。
class StreamingHandler(BaseCallbackHandler):
...
def on_llm_new_token(self, token: str, **kwargs) -> None:
self.queue.put(token)
Run Code Online (Sandbox Code Playgroud)
在您的 langchain 中ChatOpenAI添加上述自定义回调StreamingHandler。
from flask import Response, stream_with_context
import threading
@app.route(....):
def stream_output():
q = Queue()
def generate(rq: Queue):
...
# add your logic to prevent while loop
# to run indefinitely
while( ...):
yield rq.get()
callback_fn = StreamingHandler(q)
threading.Thread(target= askQuestion, args=(collection_id, question, callback_fn))
return Response(stream_with_context(generate(q))
Run Code Online (Sandbox Code Playgroud)
以供参考:
| 归档时间: |
|
| 查看次数: |
4741 次 |
| 最近记录: |