如何在 python 3 中监听传入的电子邮件?

Cri*_*ivo 5 email python-3.x

目标是拥有一个在 python 3 中运行并读取特定 gmail 帐户上传入电子邮件的应用程序,如何侦听该电子邮件的接收?

它应该做的是等到收件箱收到新邮件,从电子邮件中读取主题和正文,并从正文中获取文本(无格式)。

这是我到目前为止得到的:

import imaplib
import email
import datetime
import time

mail = imaplib.IMAP4_SSL('imap.gmail.com', 993)
mail.login(user, password)
mail.list()
mail.select('inbox')

status, data = mail.search(None, 'ALL')
for num in data[0].split():
    status, data = mail.fetch(num, '(RFC822)')
    email_msg = data[0][1]
    email_msg = email.message_from_bytes(email_msg)
    maintype = email_msg.get_content_maintype()
    if maintype == 'multipart':
        for part in email_msg.get_payload():
            if part.get_content_maintype() == 'text':
                print(part.get_payload())
    elif maintype == 'text':
        print(email_msg.get_payload())
Run Code Online (Sandbox Code Playgroud)

但这有几个问题:当消息是多部分的时,每个部分都会被打印,有时最后一部分基本上是整个消息,但采用 html 格式。

另外,这会打印收件箱中的所有消息,如何使用 imaplib 监听新电子邮件?或与其他库。

mat*_*anz 5

我不确定执行此操作的同步方式,但如果您不介意使用异步循环并将未读电子邮件定义为目标,那么它就可以工作。
(我没有实现 IMAP 轮询循环,仅实现了电子邮件获取循环)

我的改变

  1. 将 IMAP 搜索过滤器从“全部”替换为“(未见)”以获取未读电子邮件。
  2. 将序列化策略从默认的policy.Compat32更改为policy.SMTP
  3. 使用email.message.walk()方法(新 API)来运行和过滤消息部分。
  4. 按照文档中的描述以及这些示例中的演示,用新的电子邮件API 调用替换旧的电子邮件 API 调用。

结果代码

import imaplib, email, getpass
from email import policy

imap_host = 'imap.gmail.com'
imap_user = 'example@gmail.com'

# init imap connection
mail = imaplib.IMAP4_SSL(imap_host, 993)
rc, resp = mail.login(imap_user, getpass.getpass())

# select only unread messages from inbox
mail.select('Inbox')
status, data = mail.search(None, '(UNSEEN)')

# for each e-mail messages, print text content
for num in data[0].split():
    # get a single message and parse it by policy.SMTP (RFC compliant)
    status, data = mail.fetch(num, '(RFC822)')
    email_msg = data[0][1]
    email_msg = email.message_from_bytes(email_msg, policy=policy.SMTP)

    print("\n----- MESSAGE START -----\n")

    print("From: %s\nTo: %s\nDate: %s\nSubject: %s\n\n" % ( \
        str(email_msg['From']), \
        str(email_msg['To']), \
        str(email_msg['Date']), \
        str(email_msg['Subject'] )))

    # print only message parts that contain text data
    for part in email_msg.walk():
        if part.get_content_type() == "text/plain":
            for line in part.get_content().splitlines():
                print(line)

    print("\n----- MESSAGE END -----\n")
Run Code Online (Sandbox Code Playgroud)


ZF0*_*007 4

你检查过下面由 git 用户 nickoala 发布的脚本 (3_emailcheck.py)?它是一个 python 2 脚本,在 Python3 中,您需要首先使用电子邮件内容对字节进行解码。

import time
from itertools import chain
import email
import imaplib

imap_ssl_host = 'imap.gmail.com'  # imap.mail.yahoo.com
imap_ssl_port = 993
username = 'USERNAME or EMAIL ADDRESS'
password = 'PASSWORD'

# Restrict mail search. Be very specific.
# Machine should be very selective to receive messages.
criteria = {
    'FROM':    'PRIVILEGED EMAIL ADDRESS',
    'SUBJECT': 'SPECIAL SUBJECT LINE',
    'BODY':    'SECRET SIGNATURE',
}
uid_max = 0


def search_string(uid_max, criteria):
    c = list(map(lambda t: (t[0], '"'+str(t[1])+'"'), criteria.items())) + [('UID', '%d:*' % (uid_max+1))]
    return '(%s)' % ' '.join(chain(*c))
    # Produce search string in IMAP format:
    #   e.g. (FROM "me@gmail.com" SUBJECT "abcde" BODY "123456789" UID 9999:*)


def get_first_text_block(msg):
    type = msg.get_content_maintype()

    if type == 'multipart':
        for part in msg.get_payload():
            if part.get_content_maintype() == 'text':
                return part.get_payload()
    elif type == 'text':
        return msg.get_payload()


server = imaplib.IMAP4_SSL(imap_ssl_host, imap_ssl_port)
server.login(username, password)
server.select('INBOX')

result, data = server.uid('search', None, search_string(uid_max, criteria))

uids = [int(s) for s in data[0].split()]
if uids:
    uid_max = max(uids)
    # Initialize `uid_max`. Any UID less than or equal to `uid_max` will be ignored subsequently.

server.logout()


# Keep checking messages ...
# I don't like using IDLE because Yahoo does not support it.
while 1:
    # Have to login/logout each time because that's the only way to get fresh results.

    server = imaplib.IMAP4_SSL(imap_ssl_host, imap_ssl_port)
    server.login(username, password)
    server.select('INBOX')

    result, data = server.uid('search', None, search_string(uid_max, criteria))

    uids = [int(s) for s in data[0].split()]
    for uid in uids:
        # Have to check again because Gmail sometimes does not obey UID criterion.
        if uid > uid_max:
            result, data = server.uid('fetch', uid, '(RFC822)')  # fetch entire message
            msg = email.message_from_string(data[0][1])

            uid_max = uid

            text = get_first_text_block(msg)
            print 'New message :::::::::::::::::::::'
            print text

    server.logout()
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)