我有一个像这样的DataFrame:
date open high low close vwap
0 1498907700 0.00010020 0.00010020 0.00009974 0.00010019 0.00009992
1 1498908000 0.00010010 0.00010010 0.00010010 0.00010010 0.00010010
2 1498908300 0.00010010 0.00010010 0.00009957 0.00009957 0.00009992
3 1498908600 0.00009957 0.00009957 0.00009957 0.00009957 0.00000000
4 1498908900 0.00010009 0.00010009 0.00009949 0.00009959 0.00009952
5 1498909200 0.00009987 0.00009991 0.00009956 0.00009956 0.00009974
6 1498909500 0.00009948 0.00009948 0.00009915 0.00009915 0.00009919
...
789
Run Code Online (Sandbox Code Playgroud)
并且想要对每3行做一个平均值并且有一个新的DataFrame,然后比源DataFrame中所有3行的平均值短3倍.
我在使用 gevent 运行 Celery 时遇到性能负载问题,所有内容都在我的 VPS 上的同一核心上运行。
这是 4 个 Celery 实例的屏幕截图,每个实例有 20 个 gevent 并发
如何解决这个问题?我究竟做错了什么 ?
这是我的第一个任务:
def update_sender():
items = models.Item.objects.filter(active=True).all()
count = items.count()
items = [i.id for i in items]
step = count / settings.WORKERS
for job in list(chunks(items, step)):
update_item.apply_async(args=[job])
Run Code Online (Sandbox Code Playgroud)
调用以下子任务:
def update_item(items):
for item in items:
try:
i = models.Item.objects.get(id=item)
url = "someurl"
rep = requests.get(url)
jrep = rep.json()
tracker = ItemTracker(i, jrep)
if tracker.skip():
continue
if tracker.method1():
if not tracker.method2():
tracker.method3()
tracker.save()
Run Code Online (Sandbox Code Playgroud)
这都是关于同时执行大量 …