Caj*_*uu' 7 python io asynchronous
首先,我们给出了以下代码:
from validate_email import validate_email
import time
import os
def verify_emails(email_path, good_filepath, bad_filepath):
good_emails = open(good_filepath, 'w+')
bad_emails = open(bad_filepath, 'w+')
emails = set()
with open(email_path) as f:
for email in f:
email = email.strip()
if email in emails:
continue
emails.add(email)
if validate_email(email, verify=True):
good_emails.write(email + '\n')
else:
bad_emails.write(email + '\n')
if __name__ == "__main__":
os.system('cls')
verify_emails("emails.txt", "good_emails.txt", "bad_emails.txt")
Run Code Online (Sandbox Code Playgroud)
我希望在emails.txt包含大量行(> 1k)时,联系SMTP服务器是我程序中最昂贵的部分.使用某种形式的并行或异步I/O应该可以加快速度,因为我可以等待多个服务器响应而不是顺序等待.
据我所知:
异步I/O通过将对I/O的请求排队到文件描述符来进行操作,独立于调用进程进行跟踪.对于支持异步I/O(通常是原始磁盘)的文件描述符,进程可以调用aio_read()(例如)来请求从文件描述符中读取多个字节.无论I/O是否完成,系统调用都会立即返回.一段时间之后,该过程然后轮询操作系统以完成I/O(即,缓冲区充满数据).
为了真诚,我不太明白如何在我的程序上实现异步I/O. 任何人都可以花一点时间向我解释整个过程吗?
根据PArakleta的编辑建议:
from validate_email import validate_email
import time
import os
from multiprocessing import Pool
import itertools
def validate_map(e):
return (validate_email(e.strip(), verify=True), e)
seen_emails = set()
def unique(e):
if e in seen_emails:
return False
seen_emails.add(e)
return True
def verify_emails(email_path, good_filepath, bad_filepath):
good_emails = open(good_filepath, 'w+')
bad_emails = open(bad_filepath, 'w+')
with open(email_path, "r") as f:
for result in Pool().imap_unordered(validate_map,
itertools.ifilter(unique, f):
(good, email) = result
if good:
good_emails.write(email)
else:
bad_emails.write(email)
good_emails.close()
bad_emails.close()
if __name__ == "__main__":
os.system('cls')
verify_emails("emails.txt", "good_emails.txt", "bad_emails.txt")
Run Code Online (Sandbox Code Playgroud)
看过这个validate_email软件包,你真正的问题是你没有有效地批量处理结果.您应该只对每个域执行一次MX查找,然后只连接到每个MX服务器一次,完成握手,然后在一个批处理中检查该服务器的所有地址.值得庆幸的是,该validate_email软件包会为您执行MX结果缓存,但您仍需要按服务器对电子邮件地址进行分组,以将查询批量处理到服务器本身.
您需要编辑validate_email包以实现批处理,然后可能使用实际threading库而不是为每个域提供一个线程multiprocessing.
如果程序很慢并且弄清楚实际花费时间的位置而不是盲目地应用优化技巧,那么对程序进行概述总是很重要的.
如果您使用缓冲IO并且您的用例适合OS缓冲,则IO已经是异步的.你可能获得一些优势的唯一地方是预读,但如果你使用迭代器访问文件(你正在做),Python已经这样做了.AsyncIO对于移动大量数据的程序是一个优势,并且已禁用OS缓冲区以防止两次复制数据.
您需要对程序进行实际配置/基准测试,以确定它是否有任何改进空间.如果您的磁盘尚未受吞吐量限制,则可以通过并行执行每个电子邮件的处理(地址?)来提高性能.检查这个的最简单方法可能是检查运行程序的核心是否超出(即你是CPU绑定而不是IO绑定).
如果你是CPU绑定的,那么你需要看看线程.不幸的是,Python线程不能并行工作,除非你有非Python工作要做,所以你必须使用多处理(我假设validate_email是一个Python函数).
你究竟是如何进行的,取决于你的程序中的瓶颈在哪里以及你需要加速到达IO界限所需的速度(因为你实际上不能超过你可以在你达到目标时停止优化的速度点).
该emails组对象是很难分享,因为你需要围绕它锁上,所以它可能是最好的,你记在一个线程.查看多处理库,最简单的使用机制可能是Process Pools.
使用它你需要将你的文件包装在一个itertools.ifilter丢弃重复项的文件中,然后将其提供给a Pool.imap_unordered然后迭代该结果并写入你的两个输出文件.
就像是:
with open(email_path) as f:
for result in Pool().imap_unordered(validate_map,
itertools.ifilter(unique, f):
(good, email) = result
if good:
good_emails.write(email)
else:
bad_emails.write(email)
Run Code Online (Sandbox Code Playgroud)
该validate_map函数应该是简单的:
def validate_map(e):
return (validate_email(e.strip(), verify=True), e)
Run Code Online (Sandbox Code Playgroud)
该unique功能应该是这样的:
seen_emails = set()
def unique(e):
if e in seen_emails:
return False
seen_emails.add(e)
return True
Run Code Online (Sandbox Code Playgroud)
ETA:我刚刚意识到这validate_email是一个实际联系SMTP服务器的库.鉴于Python代码并不繁忙,您可以使用线程.虽然线程 API不像多处理库那样方便,但您可以使用multiprocessing.dummy来创建基于线程的池.
如果你受CPU限制,那么拥有比核心更多的线程/进程并不值得,但由于你的瓶颈是网络IO,你可以从更多的线程/进程中受益.由于进程很昂贵,你想交换线程,然后并行运行的数字(尽管你应该礼貌地不要DOS攻击你连接的服务器).
考虑from multiprocessing.dummy import Pool as ThreadPool然后打电话ThreadPool(processes=32).imap_unordered().
| 归档时间: |
|
| 查看次数: |
1652 次 |
| 最近记录: |