Uts*_*tel 7 python asynchronous smtplib python-asyncio
我正在尝试学习 asyncio。如果我在没有 asyncio 库的情况下正常运行这个程序,那么它需要更少的时间,而以这种方式需要更多的时间,那么这是使用 asyncio 发送邮件的正确方法还是还有其他方法?
import smtplib
import ssl
import time
import asyncio
async def send_mail(receiver_email):
try:
print(f"trying..{receiver_email}")
server = smtplib.SMTP(smtp_server, port)
server.ehlo()
server.starttls(context=context)
server.ehlo()
server.login(sender_email, password)
message = "test"
await asyncio.sleep(0)
server.sendmail(sender_email, receiver_email, message)
print(f"done...{receiver_email}")
except Exception as e:
print(e)
finally:
server.quit()
async def main():
t1 = time.time()
await asyncio.gather(
send_mail("test@test.com"),
send_mail("test@test.com"),
send_mail("test@test.com"),
send_mail("test@test.com")
)
print(f"End in {time.time() - t1}sec")
if __name__ == "__main__":
smtp_server = "smtp.gmail.com"
port = 587 # For starttls
sender_email = "*****"
password = "*****"
context = ssl.create_default_context()
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
Boo*_*boo 16
您并没有真正使用 asyncio 正确发送电子邮件。您应该使用aiosmtplib进行异步 SMTP 调用,例如connect、 starttls、login等。请参阅以下示例,我已将其从处理附件的更复杂的程序中剥离出来。此代码异步发送两封电子邮件:
#!/usr/bin/env python3
import asyncio
import aiosmtplib
import sys
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
MAIL_PARAMS = {'TLS': True, 'host': 'xxxxxxxx', 'password': 'xxxxxxxx', 'user': 'xxxxxxxx', 'port': 587}
async def send_mail_async(sender, to, subject, text, textType='plain', **params):
"""Send an outgoing email with the given parameters.
:param sender: From whom the email is being sent
:type sender: str
:param to: A list of recipient email addresses.
:type to: list
:param subject: The subject of the email.
:type subject: str
:param text: The text of the email.
:type text: str
:param textType: Mime subtype of text, defaults to 'plain' (can be 'html').
:type text: str
:param params: An optional set of parameters. (See below)
:type params; dict
Optional Parameters:
:cc: A list of Cc email addresses.
:bcc: A list of Bcc email addresses.
"""
# Default Parameters
cc = params.get("cc", [])
bcc = params.get("bcc", [])
mail_params = params.get("mail_params", MAIL_PARAMS)
# Prepare Message
msg = MIMEMultipart()
msg.preamble = subject
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = ', '.join(to)
if len(cc): msg['Cc'] = ', '.join(cc)
if len(bcc): msg['Bcc'] = ', '.join(bcc)
msg.attach(MIMEText(text, textType, 'utf-8'))
# Contact SMTP server and send Message
host = mail_params.get('host', 'localhost')
isSSL = mail_params.get('SSL', False)
isTLS = mail_params.get('TLS', False)
if isSSL and isTLS:
raise ValueError('SSL and TLS cannot both be True')
port = mail_params.get('port', 465 if isSSL else 25)
# For aiosmtplib 3.0.1 we must set argument start_tls=False
# because we will explicitly be calling starttls ourselves when
# isTLS is True:
smtp = aiosmtplib.SMTP(hostname=host, port=port, start_tls=False, use_tls=isSSL)
await smtp.connect()
if isTLS:
await smtp.starttls()
if 'user' in mail_params:
await smtp.login(mail_params['user'], mail_params['password'])
await smtp.send_message(msg)
await smtp.quit()
async def main():
email_address = 'xxxxxxxx';
co1 = send_mail_async(email_address,
[email_address],
'Test 1',
'Test 1 Message',
textType='plain'
)
co2 = send_mail_async(email_address,
[email_address],
'Test 2',
'Test 2 Message',
textType='plain'
)
await asyncio.gather(co1, co2)
if __name__ == "__main__":
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
13187 次 |
| 最近记录: |