我最近尝试加速一个小工具(使用urllib2向多处理模块发送请求到(非官方)twitter-button-count-url(> 2000 urls)并解析它的结果)(并且它工人池).我在这里阅读了几个关于多线程的讨论(与标准的非线程版本相比减慢了整个过程)和多处理,但我找不到一个(可能非常简单)问题的答案:
你可以通过多处理加速网址调用,还是像网络适配器这样的瓶颈?我不知道urllib2-open-method的哪一部分可以并行化,以及它应该如何工作......
编辑:这是我想要加速的请求和当前的多处理设置:
urls=["www.foo.bar", "www.bar.foo",...]
tw_url='http://urls.api.twitter.com/1/urls/count.json?url=%s'
def getTweets(self,urls):
for i in urls:
try:
self.tw_que=urllib2.urlopen(tw_url %(i))
self.jsons=json.loads(self.tw_que.read())
self.tweets.append({'url':i,'date':today,'tweets':self.jsons['count']})
except ValueError:
print ....
continue
return self.tweets
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
result = [pool.apply_async(getTweets(i,)) for i in urls]
[i.get() for i in result]
Run Code Online (Sandbox Code Playgroud) 我目前正在开发一个python 3.4网络流媒体应用程序.我的套接字有一些疯狂的行为.(如果可能,目标3.3兼容)
定义:当我谈到Stream时,意味着UDP-Stream.
问题
虽然发送socket.send操作有时会开始花费1-3ms,因为我将在下面描述更多传输目标.我在这里找到了其他线程来讲述速度问题,但他们每秒发送200k包,但他们只发送"A".在我的情况下,每个数据包是1500字节公司.由socket添加的UDP和IP头.如果此时问题不明确,请参阅下面的解释.
题
有谁知道为什么这会延误?或者如何加快发送到达实时?
def _transfer(self):
self.total_num_samps_sent = 0
self.sequence_out = 0
self.send_in_progress = True
send = self.udp_socket.send
for i in range(0, len(streams), 1):
stream_data, stream_samps, stream_seq = self.packed_streams[i]
# commit the samples
start_try_send_time = monotonic()
while not self.ready():
if monotonic() - start_try_send_time > self.timeout > 0:
# timeout; if timeout == 0 wait endless
return 0
self.sequence_out = stream_seq
# ######################
# Here is the bottleneck
# ######################
s = monotonic()
send(stream_data) …Run Code Online (Sandbox Code Playgroud) 我正在实现一个Python脚本,需要在不到5秒的时间内并行发送1500多个数据包.
简而言之,我需要的是:
def send_pkts(ip):
#craft packet
while True:
#send packet
time.sleep(randint(0,3))
for x in list[:1500]:
send_pkts(x)
time.sleep(randint(1,5))
Run Code Online (Sandbox Code Playgroud)
我尝试过简单的单线程,多线程,多处理和多处理+多线程表单,并遇到以下问题:
有没有更好的方法可以用来完成这项任务?
[1]编辑1:
def send_pkt(x):
#craft pkt
while True:
#send pkt
gevent.sleep(0)
gevent.joinall([gevent.spawn(send_pkt, x) for x in list[:1500]])
Run Code Online (Sandbox Code Playgroud)
[2]编辑2(gevent monkey-patching):
from gevent import monkey; monkey.patch_all()
jobs = [gevent.spawn(send_pkt, x) for x in list[:1500]]
gevent.wait(jobs)
#for send_pkt(x) check [1]
Run Code Online (Sandbox Code Playgroud)
但是我收到以下错误:"ValueError:filedescriptor超出了select()"范围.所以我检查了我的系统ulimit(软和硬都是最大值:65536).之后,我检查了它与Linux 上的select()限制(最多1024 fds)有关.请检查:http://man7.org/linux/man-pages/man2/select.2.html(BUGS部分) - 为了解决这个问题,我应该使用poll()(http://man7.org/linux/ man-pages/man2/poll.2.html)相反.但是使用poll() …
在这个页面上我读到了这样的内容:
asyncio 模块中的协程不受全局解释器锁或 GIL 的限制。
asyncio但是,如果事件循环和threading线程都在带有 GIL 的单个 Python 进程中运行,这怎么可能呢?
据我了解, GIL 对 的影响asyncio不会像对 那样强烈threading,因为在 的情况下threading,解释器会切换到无用的操作,例如time.sleep()。因此,在使用时asyncio,建议使用asyncio.sleep().
据我所知,这些工具的设计目的略有不同,threading更常用于执行 IO 绑定操作的“遗留”阻塞代码,以及asyncio非阻塞代码。
我刚开始研究一个有一些CPU问题的龙卷风应用程序.随着时间的推移,CPU时间将单调增长,最大化CPU为100%.该系统目前设计为不阻止主线程.如果需要执行阻塞和异步驱动程序不可用的操作,它将生成另一个线程来执行阻塞操作.
因此,我们的主线程几乎完全是CPU绑定的,而且其他一些线程几乎完全是IO绑定的.从我读过的内容来看,这似乎是解决GIL问题的最佳方式.另外,我的分析表明我们花了很多时间等待信号(我假设是__semwait_signal在做什么),这与GIL在我有限的理解中会产生的影响是一致的.
如果我使用sys.setcheckinterval将检查间隔设置为300,则CPU增长将显着减慢.我想要确定的是我是否应该增加检查间隔,将其保持在300,或者是否需要增加检查间隔.毕竟,我注意到CPU性能变得更好,但我有点担心这会对系统的响应性产生负面影响.
当然,正确答案可能是我们需要重新考虑我们的架构以考虑GIL.但这不是可以立即完成的事情.那么如何确定短期内采取的适当行动?
假设我有一个C扩展函数,它可以完全独立于Python解释器.有没有理由不释放GIL?
例如,有没有理由不写这样的代码(除了可读性和避免微优化之类的问题 - 重要但与我的问题无关的事情)?
Py_BEGIN_ALLOW_THREADS
a = 1 + 1;
Py_END_ALLOW_THREADS
Run Code Online (Sandbox Code Playgroud)
显然,这是一个简单的代码,其中性能可能无关紧要.但有没有任何表现理由不在这里发布GIL?或者只应发布GIL以获得更多CPU密集型代码?
我正在尝试使用Cython来提高某些度量计算的性能prange.这是我的代码:
def shausdorff(float64_t[:,::1] XA not None, float64_t[:,:,::1] XB not None):
cdef:
Py_ssize_t i
Py_ssize_t n = XB.shape[2]
float64_t[::1] hdist = np.zeros(n)
#arrangement to fix contiguity
XB = np.asanyarray([np.ascontiguousarray(XB[:,:,i]) for i in range(n)])
for i in range(n):
hdist[i] = _hausdorff(XA, XB[i])
return hdist
def phausdorff(float64_t[:,::1] XA not None, float64_t[:,:,::1] XB not None):
cdef:
Py_ssize_t i
Py_ssize_t n = XB.shape[2]
float64_t[::1] hdist = np.zeros(n)
#arrangement to fix contiguity (EDITED)
cdef float64_t[:,:,::1] XC = np.asanyarray([np.ascontiguousarray(XB[:,:,i]) for i in range(n)])
with nogil, …Run Code Online (Sandbox Code Playgroud) 我设计了一个 C++ 系统,它从在单独线程中运行的过程调用用户定义的回调。简化后system.hpp如下所示:
#pragma once
#include <atomic>
#include <chrono>
#include <functional>
#include <thread>
class System
{
public:
using Callback = std::function<void(int)>;
System(): t_(), cb_(), stop_(true) {}
~System()
{
stop();
}
bool start()
{
if (t_.joinable()) return false;
stop_ = false;
t_ = std::thread([this]()
{
while (!stop_)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (cb_) cb_(1234);
}
});
return true;
}
bool stop()
{
if (!t_.joinable()) return false;
stop_ = true;
t_.join();
return true;
}
bool registerCallback(Callback cb)
{
if (t_.joinable()) return false; …Run Code Online (Sandbox Code Playgroud) 我正在使用 FastAPI,非async端点在带有多个工作线程的 Gunicorn 上运行,来自此处uvicorn.workers.UvicornWorker建议的类。Latley,我注意到在我们的应用程序比平时更繁忙的时候,我们的一些端点存在高延迟。我开始调查它,发现我们应用程序中的并发性并没有像我们预期的那样工作。
假设我有一个具有以下端点的 FastAPI 应用程序 (main.py)
app = FastAPI()
logger = logging.getLogger()
@app.get("/")
def root():
logger.info(f"Running on {os.getpid()}")
time.sleep(3600)
return {"message": "Hello World"}
Run Code Online (Sandbox Code Playgroud)
我gunicorn使用以下命令运行:
gunicorn main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
Run Code Online (Sandbox Code Playgroud)
当我向服务器发送五个请求时,除了最后一个请求之外,它们都到达同一个工作线程,而不是在所有工作线程上并行运行:
INFO:root:Running on 643
INFO:root:Running on 643
INFO:root:Running on 643
INFO:root:Running on 643
INFO:root:Running on 642
Run Code Online (Sandbox Code Playgroud)
如果我将端点转换为async,则每个请求都将在不同的工作线程上处理(最后一个请求将被保留)。我知道,当使用非异步端点时,FastAPI 使用 AnyIO 线程来处理请求,最大线程的默认值为 40。当我尝试将此限制降低到 2 个线程时,例如使用此处的建议,只有前两个请求正在处理,而其他人正在等待(尽管我还有 4 个工人!)
这很糟糕,因为既没有使用我们所有的资源,又因为同一个工作线程上的 GIL 而遭受 python 线程问题。
有没有办法在不转向端点的情况下克服这些问题async?