目标是拥有一个在 python 3 中运行并读取特定 gmail 帐户上传入电子邮件的应用程序,如何侦听该电子邮件的接收?
它应该做的是等到收件箱收到新邮件,从电子邮件中读取主题和正文,并从正文中获取文本(无格式)。
这是我到目前为止得到的:
import imaplib
import email
import datetime
import time
mail = imaplib.IMAP4_SSL('imap.gmail.com', 993)
mail.login(user, password)
mail.list()
mail.select('inbox')
status, data = mail.search(None, 'ALL')
for num in data[0].split():
status, data = mail.fetch(num, '(RFC822)')
email_msg = data[0][1]
email_msg = email.message_from_bytes(email_msg)
maintype = email_msg.get_content_maintype()
if maintype == 'multipart':
for part in email_msg.get_payload():
if part.get_content_maintype() == 'text':
print(part.get_payload())
elif maintype == 'text':
print(email_msg.get_payload())
Run Code Online (Sandbox Code Playgroud)
但这有几个问题:当消息是多部分的时,每个部分都会被打印,有时最后一部分基本上是整个消息,但采用 html 格式。
另外,这会打印收件箱中的所有消息,如何使用 imaplib 监听新电子邮件?或与其他库。
我不确定执行此操作的同步方式,但如果您不介意使用异步循环并将未读电子邮件定义为目标,那么它就可以工作。
(我没有实现 IMAP 轮询循环,仅实现了电子邮件获取循环)
import imaplib, email, getpass
from email import policy
imap_host = 'imap.gmail.com'
imap_user = 'example@gmail.com'
# init imap connection
mail = imaplib.IMAP4_SSL(imap_host, 993)
rc, resp = mail.login(imap_user, getpass.getpass())
# select only unread messages from inbox
mail.select('Inbox')
status, data = mail.search(None, '(UNSEEN)')
# for each e-mail messages, print text content
for num in data[0].split():
# get a single message and parse it by policy.SMTP (RFC compliant)
status, data = mail.fetch(num, '(RFC822)')
email_msg = data[0][1]
email_msg = email.message_from_bytes(email_msg, policy=policy.SMTP)
print("\n----- MESSAGE START -----\n")
print("From: %s\nTo: %s\nDate: %s\nSubject: %s\n\n" % ( \
str(email_msg['From']), \
str(email_msg['To']), \
str(email_msg['Date']), \
str(email_msg['Subject'] )))
# print only message parts that contain text data
for part in email_msg.walk():
if part.get_content_type() == "text/plain":
for line in part.get_content().splitlines():
print(line)
print("\n----- MESSAGE END -----\n")
Run Code Online (Sandbox Code Playgroud)
你检查过下面由 git 用户 nickoala 发布的脚本 (3_emailcheck.py)吗?它是一个 python 2 脚本,在 Python3 中,您需要首先使用电子邮件内容对字节进行解码。
import time
from itertools import chain
import email
import imaplib
imap_ssl_host = 'imap.gmail.com' # imap.mail.yahoo.com
imap_ssl_port = 993
username = 'USERNAME or EMAIL ADDRESS'
password = 'PASSWORD'
# Restrict mail search. Be very specific.
# Machine should be very selective to receive messages.
criteria = {
'FROM': 'PRIVILEGED EMAIL ADDRESS',
'SUBJECT': 'SPECIAL SUBJECT LINE',
'BODY': 'SECRET SIGNATURE',
}
uid_max = 0
def search_string(uid_max, criteria):
c = list(map(lambda t: (t[0], '"'+str(t[1])+'"'), criteria.items())) + [('UID', '%d:*' % (uid_max+1))]
return '(%s)' % ' '.join(chain(*c))
# Produce search string in IMAP format:
# e.g. (FROM "me@gmail.com" SUBJECT "abcde" BODY "123456789" UID 9999:*)
def get_first_text_block(msg):
type = msg.get_content_maintype()
if type == 'multipart':
for part in msg.get_payload():
if part.get_content_maintype() == 'text':
return part.get_payload()
elif type == 'text':
return msg.get_payload()
server = imaplib.IMAP4_SSL(imap_ssl_host, imap_ssl_port)
server.login(username, password)
server.select('INBOX')
result, data = server.uid('search', None, search_string(uid_max, criteria))
uids = [int(s) for s in data[0].split()]
if uids:
uid_max = max(uids)
# Initialize `uid_max`. Any UID less than or equal to `uid_max` will be ignored subsequently.
server.logout()
# Keep checking messages ...
# I don't like using IDLE because Yahoo does not support it.
while 1:
# Have to login/logout each time because that's the only way to get fresh results.
server = imaplib.IMAP4_SSL(imap_ssl_host, imap_ssl_port)
server.login(username, password)
server.select('INBOX')
result, data = server.uid('search', None, search_string(uid_max, criteria))
uids = [int(s) for s in data[0].split()]
for uid in uids:
# Have to check again because Gmail sometimes does not obey UID criterion.
if uid > uid_max:
result, data = server.uid('fetch', uid, '(RFC822)') # fetch entire message
msg = email.message_from_string(data[0][1])
uid_max = uid
text = get_first_text_block(msg)
print 'New message :::::::::::::::::::::'
print text
server.logout()
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)