我正在开发一个Web后端/ API提供程序,它从第三方Web API获取实时数据,将其放入MySQL数据库并通过HTTP/JSON API提供.
我正在使用SQLAlchemy Core为API提供烧瓶并使用DB.
对于实时数据获取部分,我有通过发送请求包装第三方API的函数,将返回的xml解析为Python dict并返回它.我们将调用这些API包装器.
然后我在其他方法中调用这些函数,这些方法获取相应的数据,在需要时进行任何处理(如时区转换等)并将其放入DB中.我们称之为处理器.
我一直在阅读异步I/O和eventlet,我印象非常深刻.
我将把它合并到我的数据抓取代码中,但我先问了一些问题:
猴子修补一切是否安全?考虑到我有烧瓶,SQLAlchemy和一堆其他的库,猴子补丁是否有任何缺点(假设没有后期绑定)?
我应该将任务划分给什么粒度?我在考虑创建一个定期生成处理器的池.然后,一旦处理器到达它调用API包装器的部分,API包装器将启动GreenPile以使用eventlet.green.urllib2获取实际的HTTP数据.这是一个好方法吗?
仅供参考,我有大约10种不同的实时数据,每隔约5-10秒就会产生一个处理器.
谢谢!
我requests
在芹菜中使用Python workers
来进行大量(~10 /秒)API调用(包括GET,POST,PUT,DELETE).每个请求大约需要5到10秒才能完成.
我尝试在eventlet
池中运行芹菜工作者,并发1000次.
由于requests
是阻塞进程,因此每个并发连接都在等待一个请求.
我如何进行requests
异步?
我尝试设置一个最小的Flask应用程序,该应用程序使用eventlet立即响应并发请求,而不是阻塞并响应一个请求(如标准的Flask调试Web服务器那样).
先决条件:
pip install Flask
pip install eventlet
Run Code Online (Sandbox Code Playgroud)
从我到目前为止在互联网上发现的东西的理解,它应该像这样工作:
# activate eventlet
import eventlet
eventlet.monkey_patch()
from flask import Flask
import datetime
from time import sleep
# create a new Flask application
app = Flask(__name__)
# a short running task that returns immediately
@app.route('/shortTask')
def short_running_task():
start = datetime.datetime.now()
return 'Started at {0}, returned at {1}'.format(start, datetime.datetime.now())
# a long running tasks that returns after 30s
@app.route('/longTask')
def long_running_task():
start = datetime.datetime.now()
sleep(30)
return 'Started at {0}, returned at …
Run Code Online (Sandbox Code Playgroud) 在什么情况下,像eventlet/gevent这样的东西比扭曲更好?Twisted似乎是最常用的,但eventlet/gevent必须具有一些优势......我不是在寻找特定场景的答案,只是一般性.
我目前正在编写一个基于Python Eventlet库的基本调度模型服务器(http://eventlet.net/doc/).看过Eventlet上的WSGI文档(http://eventlet.net/doc/modules/wsgi.html),我可以看到eventlet.wsgi.server函数除了客户端之外还记录了x-forwarded-for头文件IP地址.
但是,获取此方法的方法是附加一个类文件对象(默认为sys.stderr),然后将该服务器管道连接到该对象.
我希望能够从应用程序本身获取客户端IP(即具有start_response和environ作为参数的函数).实际上,环境密钥对此非常完美.有没有办法简单地获取IP地址(即通过环境字典或类似),而不必采取某种方式重定向日志对象?
在做Gevent/Eventlet猴子修补之后 - 我可以假设每当DB驱动程序(例如redis-py,pymongo)通过标准库(例如socket
)使用IO时,它将是异步的吗?
所以使用eventlets猴子修补就足以使例如:在eventlet应用程序中使用redis-py非阻塞?
据我所知,如果我注意连接使用(例如,为每个greenlet使用不同的连接),这应该足够了.但我想确定.
如果您知道还需要什么,或者如何使用Gevent/Eventlet正确使用DB驱动程序,请同时输入.
实际设计:
对于那些回到这个问题的人,下面的有用答案将我推向了一个运行良好的可行设计.三个见解是关键:
recv()
或同时send()
从同一个套接字尝试,那么Eventlet会优雅地杀死第二个greenlet.这非常棒,意味着如果amqplib
"绿色"不佳,将导致简单的异常,而不是不可能重现数据交错错误.amqplib
方法大致分为两组:wait()
内部循环recv()
直到组装AMQP消息,而其他方法send()
消息返回并且不会尝试自己的recv()
.鉴于amqplib
作者不知道有人试图"绿化"他们的图书馆,这是非常好的运气!这意味着消息发送不仅可以安全地从调用的回调中获取wait()
,而且消息也可以安全地从完全不在wait()
循环控制之外的其他greenlet发送.这些安全的方法 - 可以从任何greenlet调用,而不仅仅是从wait()
回调调用 - 是:
basic_ack
basic_consume
同 nowait=True
basic_publish
basic_recover
basic_reject
exchange_declare
同 nowait=True
exchange_delete
同 nowait=True
queue_bind
同 nowait=True
queue_unbind
同 nowait=True
queue_declare
同 nowait=True
queue_delete
同 nowait=True
queue_purge
同 nowait=True
1
,然后acquire()
与release()
锁定和解锁.我想要写消息的所有异步greenlet都可以使用这样的锁来避免它们的单独send()
调用交错并破坏AMQP协议.所以我的代码大致如下:
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
class Processor(object):
def __init__(self):
write_lock …
Run Code Online (Sandbox Code Playgroud) 使用Node.js,或eventlet或任何其他非阻塞服务器,当给定请求需要很长时间时会发生什么,它会阻止所有其他请求吗?
例如,一个请求进来,并且需要200ms来计算,这将阻止其他请求,因为例如nodejs使用单个线程.
这意味着每秒15K将大幅下降,因为计算给定请求的响应所需的实际时间.
但这对我来说似乎不对,所以我问的是究竟发生了什么,因为我无法想象这是怎么回事.
我遇到了一个 Django 应用程序的问题,该应用程序在Gunicorn
使用异步eventlet
工作者运行时超过了与 Postgres 的最大同时连接数 (100) 。当达到连接限制时,应用程序开始返回500
-errors,直到可以建立新连接。
这是我的数据库配置:
DATABASES = {
'default': {
'ENGINE': 'django.contrib.gis.db.backends.postgis',
'NAME': 'django',
'USER': 'django',
'HOST': 'postgres',
'PORT': 5432,
'CONN_MAX_AGE': 60,
}
}
Run Code Online (Sandbox Code Playgroud)
Gunicorn 是这样启动的:
gunicorn --bind 0.0.0.0:8080 --worker-class eventlet --workers 5 myapp.wsgi:application
Run Code Online (Sandbox Code Playgroud)
这些是已安装的软件包:
与 Gunicorn 工作人员一起运行时,Django 是否无法跨 HTTP 请求重用连接?某种 3rd 方数据库连接池是我唯一的选择吗?
15-03-23 更新:CONN_MAX_AGE
异步 Gunicorn 工人似乎存在问题。连接确实是持久的,但从未在本文中提到的任何顺序请求中重用。设置CONN_MAX_AGE
为0
强制 Django 在请求结束时关闭连接,防止形成未使用的持久连接。
我的主 celery 应用程序在 AWS 的 EC2 实例(“main”)中运行,它生成的任务与同一可用区(“db”)中的 RDS 数据库进行交互。工作负载每分钟生成多达数千个任务,我需要尽快并行执行它们。我有工作人员在两个物理位置执行任务。一个来自主实例的单独 EC2 实例(“工作 EC2”),但与它和数据库位于同一可用区,另一个来自我们办公室私有数据中心的物理计算机(“工作本地”)。
工作 EC2 和本地都运行 prefork 事件池并且autoscale==70,4
工作正常(任务在 2-3 秒内完成),但 CPU 和内存使用率很高,如果可能的话,我需要更多的并行性。所以我一直在尝试eventlet
和。我陷入以下问题:gevent
concurrency=100
BrokenPipe
。注意,这种情况不会发生在 prefork 上,所以我认为这不是连接/网络的问题,而是与池包有关。这种行为对于 Gevent 来说比 Eventlet 更糟糕,但两者都不好问题:
为什么 Eventlet 和 Gevent 在本地和 EC2 工作线程之间的表现会不同?这两个工作人员之间的唯一区别在于,其中一个工作人员在物理上更接近主芹菜应用程序和数据库,这会影响绿色线程的执行方式吗? …
eventlet ×10
python ×8
gevent ×3
asynchronous ×2
celery ×2
amqp ×1
django ×1
flask ×1
granularity ×1
grequests ×1
gunicorn ×1
node.js ×1
nonblocking ×1
postgresql ×1
psycopg2 ×1
pymongo ×1
twisted ×1
wsgi ×1