Dub*_*n93 9 python asynchronous python-asyncio fastapi
我们已经使用 FastAPI 创建了一个服务。当我们的服务启动时,它会创建一些 Python 对象,然后端点使用这些对象来存储或检索数据。
生产中的 FastAPI 从多个工人开始。我们的问题是每个工人创建自己的对象而不是共享一个。
下面的脚本显示了我们正在做的(简化的)示例,尽管在我们的例子中 Meta() 的使用要复杂得多。
from fastapi import FastAPI, status
class Meta:
def __init__(self):
self.count = 0
app = FastAPI()
meta = Meta()
# increases the count variable in the meta object by 1
@app.get("/increment")
async def increment():
meta.count += 1
return status.HTTP_200_OK
# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
return {'count':meta.count}
# resets the count in the meta object to 0
@app.get("/reset")
async def reset():
meta.count = 0
return status.HTTP_200_OK
Run Code Online (Sandbox Code Playgroud)
如上所述,多个 worker 的问题在于每个 worker 都有自己的meta
对象。请注意,当使用单个工作器运行 api 时,该问题不可见。
更明确地说,当我们/increment
第一次到达端点时,我们只会看到两个工作人员中的一个响应调用(这是正确的,我们不希望两个工作人员都做同样的事情)。但是,因为有两个独立的meta
对象,所以只有两个对象之一会递增。
当到达/report
端点时,根据哪个工作人员响应请求,将返回 1 或 0。
那么问题是,我们如何让工作人员共享和操作同一个对象?
作为一个附带问题,上述问题/reset
也会影响端点。如果调用此端点,则只有一个工作人员将重置其对象。有没有办法强制所有工作人员响应端点上的单个呼叫?
谢谢!
编辑:我忘了提到我们已经尝试(但没有成功)将meta
对象存储在app.state
。本质上:
app.state.meta = Meta()
...
@app.get("/report")
async def report():
return {'count':app.state.meta.count}
Run Code Online (Sandbox Code Playgroud)
ste*_*wit 15
如果您按照文档中所述使用带有 Gunicorn 和 uvicorn 的设置运行 FastAPI 服务,则可以以更简单的方式采用 Yagiz Degimenci 此处描述的方法。您可以将gunicorn的--preload
设置与multiprocessing.Manager结合使用,以避免启动另一个服务器的必要性。特别是以下内容不需要额外的设置即可使其在单个 Docker 容器中工作。
import logging
from multiprocessing import Manager
manager = Manager()
store = manager.dict()
store["count"] = 0
from fastapi import FastAPI
app = FastAPI()
@app.post("/increment")
async def increment():
store["count"] = store["count"] + 1
@app.get("/count")
async def get_count():
return store["count"]
@app.on_event("startup")
async def startup_event():
uv_logger = logging.getLogger("uvicorn.access")
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter(
"%(process)d - %(processName)s - %(asctime)s - %(levelname)s - %(message)s"
)
)
uv_logger.addHandler(handler)
Run Code Online (Sandbox Code Playgroud)
保存并demo.py
运行(您需要 fastapi、guvicorn 和 uvicorn 库):
GUNICORN_CMD_ARGS="--bind=127.0.0.1 --workers=3 --preload --access-logfile=-" gunicorn -k uvicorn.workers.UvicornWorker demo:app
Run Code Online (Sandbox Code Playgroud)
(--preload
这里是必不可少的!)
尝试通过位于 http://localhost:8000/docs 的 OpenApi UI 进行递增,并将对 /count 端点的多个调用与访问日志输出中的进程 ID 进行比较,以查看无论哪个工作进程正在响应,它都会返回递增的值。
注意:我在这里没有对线程/异步安全做出任何声明,并且此方法可能不应该在生产服务中使用。如果有任何疑问,您应该始终依赖适当的数据库/缓存/内存存储解决方案来进行生产设置。我自己只在演示代码中使用它!
Yag*_*nci 12
您可以创建架构,而无需任何外部库或通过数据库等添加任何额外的复杂性。
这将是我们用于跨不同进程共享对象的服务器。
from multiprocessing.managers import SyncManager
class MyManager(SyncManager):
pass
syncdict = {}
def get_dict():
return syncdict
if __name__ == "__main__":
MyManager.register("syncdict", get_dict)
manager = MyManager(("127.0.0.1", 5000), authkey=b"password")
manager.start()
input()
manager.shutdown()
Run Code Online (Sandbox Code Playgroud)
将此文件命名为server.py
并在不同的进程上运行它。只是python server.py
应该做好事。
这将是我们的客户端实现。
from multiprocessing.managers import SyncManager
from typing import Optional, Dict, Any, Union
class MyManager(SyncManager):
...
class Meta:
def __init__(self, *, port: int) -> None:
self.manager = MyManager(("127.0.0.1", port), authkey=b"password")
self.manager.connect()
MyManager.register("syncdict")
self.syndict = self.manager.syncdict()
def update(self, kwargs: Dict[Any, Any]) -> None:
self.syndict.update(kwargs)
def increase_one(self, key: str) -> None:
self.syndict.update([(key, self.syndict.get(key) + 1)])
def report(self, item: Union[str, int]) -> int:
return self.syndict.get(item)
meta = Meta(port=5000)
Run Code Online (Sandbox Code Playgroud)
from fastapi import FastAPI, status
from multiprocessing.managers import SyncManager
from typing import Optional, Dict, Any, Union
class MyManager(SyncManager):
...
class Meta:
def __init__(self, *, port: int, **kwargs: Dict[Any, Any]):
self.manager = MyManager(("127.0.0.1", port), authkey=b"password")
self.manager.connect()
MyManager.register("syncdict")
self.syndict = self.manager.syncdict()
self.syndict.update(**kwargs)
def increase_one(self, key: str):
self.syndict.update([(key, self.syndict.get(key) + 1)])
def reset(self, key: str):
self.syndict.update([(key, 0)])
def report(self, item: Union[str, int]):
return self.syndict.get(item)
app = FastAPI()
meta = Meta(port=5000, cnt=0)
# increases the count variable in the meta object by 1
@app.get("/increment")
async def increment(key: str):
meta.increase_one(key)
return status.HTTP_200_OK
# returns a json containing the current count from the meta object
@app.get("/report")
async def report(key: str):
return {"count": meta.report(key)}
# resets the count in the meta object to 0
@app.get("/reset")
async def reset(key: str):
meta.reset(key)
return status.HTTP_200_OK
Run Code Online (Sandbox Code Playgroud)
我将启动 API 的两个实例,一个在 8000 上,另一个在 8001 上。
In: curl -X GET "http://127.0.0.1:8000/report?key=cnt"
Out: {"count": 0}
In: curl -X GET "http://127.0.0.1:8001/report?key=cnt"
Out: {"count": 0}
Run Code Online (Sandbox Code Playgroud)
两者都以 0 值开始。现在让我们增加它
for _ in {1..10}; do curl -X GET "http://127.0.0.1:8000/increment?key=cnt" &; done
Run Code Online (Sandbox Code Playgroud)
我在端口上运行了curl 8000
10次,这意味着cnt
应该是10次。
让我们从端口检查一下8001
:
In: curl -X GET "http://127.0.0.1:8001/report?key=cnt"
Out: {"cnt": 10}
Run Code Online (Sandbox Code Playgroud)
像魅力一样工作。
有两件事需要考虑。
uvicorn my_app:app
您的服务器不应该是父进程。无法直接在不同进程之间共享 Python 对象。multiprocessing
模块中包含的设施(如管理器或共享内存)不适合在工作人员之间共享资源,因为它们需要一个主进程来创建资源并且没有持久性属性。
在最优选的工人之间的资源共享方式:
PostgreSQL
、MariaDB
、MongoDB
和许多其他。Redis
,Memcached
等等。下面我将展示两个非常简单的示例,说明如何使用这两种方法FastAPI
在工作人员之间共享应用程序中的数据。举个例子,我把aiocache
库Redis
作为后端,Tortoise ORM
库PostgreSQL
作为后端。由于FastAPI
是异步框架,我选择了asyncio
基于库的库。
.
??? app_cache.py
??? app_db.py
??? docker-compose.yml
??? __init__.py
Run Code Online (Sandbox Code Playgroud)
对于实验,您可以使用以下 docker-compose 文件将5432
(Postgres) 和6379
(Redis) 端口暴露给localhost
.
version: '3'
services:
database:
image: postgres:12-alpine
ports:
- "5432:5432"
environment:
POSTGRES_PASSWORD: test_pass
POSTGRES_USER: test_user
POSTGRES_DB: test_db
redis:
image: redis:6-alpine
ports:
- "6379:6379"
Run Code Online (Sandbox Code Playgroud)
开始:
docker-compose up -d
Run Code Online (Sandbox Code Playgroud)
Aiocache 提供了 3 个主要实体:
- backends:允许您指定要用于缓存的后端。目前支持:
SimpleMemoryCache
,RedisCache
使用aioredis
和MemCache
使用aiomcache
。serializers
:序列化和反序列化您的代码和后端之间的数据。这允许您将任何 Python 对象保存到缓存中。目前配套:StringSerializer
,PickleSerializer
,JsonSerializer
,和MsgPackSerializer
。但是您也可以构建自定义的。- plugins:实现一个钩子系统,允许在每个命令之前和之后执行额外的行为。
开始:
uvicorn app_cache:app --host localhost --port 8000 --workers 5
Run Code Online (Sandbox Code Playgroud)
# app_cache.py
import os
from aiocache import Cache
from fastapi import FastAPI, status
app = FastAPI()
cache = Cache(Cache.REDIS, endpoint="localhost", port=6379, namespace="main")
class Meta:
def __init__(self):
pass
async def get_count(self) -> int:
return await cache.get("count", default=0)
async def set_count(self, value: int) -> None:
await cache.set("count", value)
async def increment_count(self) -> None:
await cache.increment("count", 1)
meta = Meta()
# increases the count variable in the meta object by 1
@app.post("/increment")
async def increment():
await meta.increment_count()
return status.HTTP_200_OK
# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
count = await meta.get_count()
return {'count': count, "current_process_id": os.getpid()}
# resets the count in the meta object to 0
@app.post("/reset")
async def reset():
await meta.set_count(0)
return status.HTTP_200_OK
Run Code Online (Sandbox Code Playgroud)
开始:为了简单起见,我们首先运行一个worker在数据库中创建一个schema:
uvicorn app_db:app --host localhost --port 8000 --workers 1
[Ctrl-C]
uvicorn app_db:app --host localhost --port 8000 --workers 5
Run Code Online (Sandbox Code Playgroud)
# app_db.py
from fastapi import FastAPI, status
from tortoise import Model, fields
from tortoise.contrib.fastapi import register_tortoise
class MetaModel(Model):
count = fields.IntField(default=0)
app = FastAPI()
# increases the count variable in the meta object by 1
@app.get("/increment")
async def increment():
meta, is_created = await MetaModel.get_or_create(id=1)
meta.count += 1 # it's better do it in transaction
await meta.save()
return status.HTTP_200_OK
# returns a json containing the current count from the meta object
@app.get("/report")
async def report():
meta, is_created = await MetaModel.get_or_create(id=1)
return {'count': meta.count}
# resets the count in the meta object to 0
@app.get("/reset")
async def reset():
meta, is_created = await MetaModel.get_or_create(id=1)
meta.count = 0
await meta.save()
return status.HTTP_200_OK
register_tortoise(
app,
db_url="postgres://test_user:test_pass@localhost:5432/test_db", # Don't expose login/pass in src, use environment variables
modules={"models": ["app_db"]},
generate_schemas=True,
add_exception_handlers=True,
)
Run Code Online (Sandbox Code Playgroud)
那么问题来了,我们如何让worker共享并操作同一个对象呢?
虽然您可以通过类似的方式共享对象multiprocessing
,但在您的用例中,您最好使用缓存,例如 Redis。
我根本不是并行/并发应用程序方面的专家,但我确实知道,除非您需要加速非常昂贵的 CPU 绑定操作(即非常复杂和/或长时间运行的计算),否则您不想在之间共享对象流程。
您可以通过专用库和模块来做到这一点,但这将使您的应用程序变得更加复杂,必须处理所有可能的竞争条件和并行性固有的边缘情况。如果您确实想走这条路,我确信有很多库和工具,但您应该首先看一下multiprocessing
,它是用于处理并行性的标准 python 库。另请检查此和此有关使用它在工作人员之间共享资源的信息gunicorn
。
另一方面,您的用例看起来并不需要非常复杂的计算,因此我建议使用一个简单的缓存来充当您的工作人员的“数据中心”,而不是一个类。它将为您提供所需的结果,即为您的流程提供单一事实来源,而无需共享内存的复杂性。
如果您想尝试这种方法,我建议您看看Redis,这是一种非常流行且支持良好的缓存解决方案,如果您愿意,甚至可以保留数据。
以下是python 的 Redis 客户端列表。redis-py
是推荐的一款。
作为附带问题,上述问题也会影响 /reset 端点。如果调用此端点,则只有一个工作人员会重置其对象。有没有办法强制所有工作人员响应端点上的单个调用?
如果您使用缓存,问题就会消失。你只有一个事实来源,你只需删除那里的数据,无论哪个工作人员响应请求。然后每个工人都会看到数据已被重置。
归档时间: |
|
查看次数: |
3151 次 |
最近记录: |