我正在构建一个 SMTP 服务器,并aiosmtpd使用这些示例作为构建的基础。下面是程序入口点的代码片段。
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(amain(loop=loop))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
Run Code Online (Sandbox Code Playgroud)
当我运行该程序时,我收到以下警告:
server.py:61: DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
Run Code Online (Sandbox Code Playgroud)
实现这个的正确方法是什么?
我正在尝试使用 python 和库在我的计算机上运行我自己的 stmp 服务器aiosmtpd。
我运行这个例子,一切看起来都很好,但我从未收到对方的电子邮件。
不知道是否有日志可以看到。
我使用的是 Visual Studio 2015、Python 3.5 和 Windows 8.1
我看到过类似的帖子,但没有帮助。
重要提示:
在客户端代码中,我也尝试过不使用日期标头
服务器.py:
import asyncio import logging
from aiosmtpd.controller import Controller
from aiosmtpd.handlers import Sink
from smtplib import SMTP
async def amain(loop):
cont = Controller(Sink(), hostname='::0', port=8025)
cont.start()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.create_task(amain(loop=loop))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
Run Code Online (Sandbox Code Playgroud)
客户端.py:
from smtplib import SMTP import smtplib
s = SMTP('localhost', 8025) try:
s.set_debuglevel(True)
s.sendmail('andy@love.com', ['bob@hate.com'], """\
Date:17/05/2017,2:18
From: andy@love.com
To: …Run Code Online (Sandbox Code Playgroud) 我想用aiosmtpd在 python 中编写我自己的小型邮件服务器应用程序
a) 出于教育目的以更好地理解邮件服务器
b) 实现我自己的功能
所以我的问题是,Mail-Transfer-Agent缺少什么(除了 aiosmtpd),它可以向/从其他完整的 MTA(gmail.com、yahoo.com ...)发送和接收电子邮件?
我正在猜测:
1.) 当然是域和静态 ip
2.)该域的有效证书
...应该可以使用 Lets Encrypt
3.)加密
...应该可以使用 SSL/Context/Starttls...使用 aiosmtpd 本身
4 .)解析外发电子邮件的 MX DNS 条目!?
...应该可以使用 python 库 dnspython
5.)错误处理 SMTP 通信错误、来自其他 MTA 的错误回复、弹跳!?
6.)处理入站和待处理出站电子邮件的队列!?
是否缺少其他“基本”功能?
我当然知道,邮件服务器还有更多“高级”功能,例如垃圾邮件检查、恶意软件检查、证书验证、黑名单、规则、邮箱等等......
感谢所有提示!
编辑:
让我澄清一下我的想法:
我想为俱乐部编写一个邮件服务器。它的主要目的将是一个邮件列表服务器。俱乐部的不同团体会有不同的名单。假设我的域名是myclub.org那么就会有例如Youth@myclub.org、trainer@myclub.org等等。
只有会员才能使用此邮件服务器,并且只有会员才能收到来自此邮件服务器的电子邮件。不允许其他人向该邮件服务器发送电子邮件,也不会接收来自该服务器的电子邮件。成员电子邮件地址及其组存储在数据库中。
将来我想集成一些其他有用的功能,例如:
我不需要的东西:
打开中继问题:
我几乎直接从 aiosmtpd 文档中获取了以下服务器:
import asyncio
import ssl
from aiosmtpd.controller import Controller
class ExampleHandler:
async def handle_RCPT(self, server, session, envelope, address, rcpt_options):
if not address.endswith('@example.com'):
return '550 not relaying to that domain'
envelope.rcpt_tos.append(address)
return '250 OK'
async def handle_DATA(self, server, session, envelope):
print(f'Message from {envelope.mail_from}')
print(f'Message for {envelope.rcpt_tos}')
print(f'Message data:\n{envelope.content.decode("utf8", errors="replace")}')
print('End of message')
return '250 Message accepted for delivery'
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
controller = Controller(ExampleHandler(), port=8026, ssl_context=context)
controller.start()
input('Press enter to stop')
controller.stop()
Run Code Online (Sandbox Code Playgroud)
但是,当我启动此服务器并尝试使用 swak 向其发送电子邮件时:
echo 'Testing' | swaks …Run Code Online (Sandbox Code Playgroud) 如何转换以下基本SMTP服务器使用smtpd一个使用aiosmtpd呢?
import smtpd
class CustomSMTPServer(smtpd.SMTPServer):
def process_message(self, peer, mailfrom, rcpttos, data):
myqueue.queue.put(data)
self.server = CustomSMTPServer(('127.0.0.1', 10025), None)
Run Code Online (Sandbox Code Playgroud)