使用 Pyton Flask API 传输来自 LangChain 的 OpenAI 的响应

dev*_*evZ 5 python flask openai-api langchain

我正在使用 Python Flask 应用程序进行数据聊天。因此,在控制台中,我可以直接从 OpenAI 获得流式响应,因为我可以使用 flag 启用流处理streaming=True

\n

问题是,我无法在 API 调用中 \xe2\x80\x9cforward\xe2\x80\x9d 流或 \xe2\x80\x9cshow\xe2\x80\x9d 流。

\n

处理 OpenAI 和链的代码是:

\n
def askQuestion(self, collection_id, question):\n        collection_name = "collection-" + str(collection_id)\n        self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get('OPENAI_API_KEY'), streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]))\n        self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,  output_key='answer')\n        \n        chroma_Vectorstore = Chroma(collection_name=collection_name, embedding_function=self.embeddingsOpenAi, client=self.chroma_client)\n\n\n        self.chain = ConversationalRetrievalChain.from_llm(self.llm, chroma_Vectorstore.as_retriever(similarity_search_with_score=True),\n                                                            return_source_documents=True,verbose=VERBOSE, \n                                                            memory=self.memory)\n        \n\n        result = self.chain({"question": question})\n        \n        res_dict = {\n            "answer": result["answer"],\n        }\n\n        res_dict["source_documents"] = []\n\n        for source in result["source_documents"]:\n            res_dict["source_documents"].append({\n                "page_content": source.page_content,\n                "metadata":  source.metadata\n            })\n\n        return res_dict`\n
Run Code Online (Sandbox Code Playgroud)\n

和API路由代码:

\n
@app.route("/collection/<int:collection_id>/ask_question", methods=["POST"])\ndef ask_question(collection_id):\n    question = request.form["question"]\n    # response_generator = document_thread.askQuestion(collection_id, question)\n    # return jsonify(response_generator)\n\n    def stream(question):\n        completion = document_thread.askQuestion(collection_id, question)\n        for line in completion['answer']:\n            yield line\n\n    return app.response_class(stream_with_context(stream(question)))\n
Run Code Online (Sandbox Code Playgroud)\n

我正在使用curl 测试我的端点,并将标志-N 传递给curl,因此如果可能的话,我应该获得可流式响应。

\n

当我首先进行 API 调用时,端点正在等待处理数据(我可以在 VS 代码的终端中看到可流式答案),完成后,我会一次性显示所有内容。

\n

谢谢

\n

var*_*hal 3

通过使用threadingcallback我们可以从 Flask API 获得流式响应。

在flask API中,您可以通过langchain的回调创建一个队列来注册令牌。

class StreamingHandler(BaseCallbackHandler):
    ...

    def on_llm_new_token(self, token: str, **kwargs) -> None:
        self.queue.put(token)
Run Code Online (Sandbox Code Playgroud)

您可以get从烧瓶路线中的同一队列中获得令牌。

class StreamingHandler(BaseCallbackHandler):
    ...

    def on_llm_new_token(self, token: str, **kwargs) -> None:
        self.queue.put(token)
Run Code Online (Sandbox Code Playgroud)

在您的 langchain 中ChatOpenAI添加上述自定义回调StreamingHandler

from flask import Response, stream_with_context
import threading 

@app.route(....):
def stream_output():
   q = Queue()
   
   def generate(rq: Queue):
      ...
      # add your logic to prevent while loop
      # to run indefinitely  
      while( ...):
          yield rq.get()
   
   callback_fn = StreamingHandler(q)
   
   threading.Thread(target= askQuestion, args=(collection_id, question, callback_fn))
   return Response(stream_with_context(generate(q))

Run Code Online (Sandbox Code Playgroud)

以供参考: