cal*_*leb 5 python ssl pyopenssl
我的目标是允许 ssl 客户端从服务器的许多有效证书对中进行选择。客户端有一个 CA 证书,它将用来验证来自服务器的证书。
因此,为了尝试实现这一点,我ssl.SSLContext.set_servername_callback()将服务器上的 与ssl.SSLSocket.wrap_socket's parameter:server_hostname`结合使用,以尝试允许客户端指定要使用的密钥对。代码如下所示:
服务器代码:
import sys
import pickle
import ssl
import socket
import select
request = {'msgtype': 0, 'value': 'Ping', 'test': [chr(i) for i in range(256)]}
response = {'msgtype': 1, 'value': 'Pong'}
def handle_client(c, a):
print("Connection from {}:{}".format(*a))
req_raw = c.recv(10000)
req = pickle.loads(req_raw)
print("Received message: {}".format(req))
res = pickle.dumps(response)
print("Sending message: {}".format(response))
c.send(res)
def run_server(hostname, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((hostname, port))
s.listen(8)
print("Serving on {}:{}".format(hostname, port))
try:
while True:
(c, a) = s.accept()
def servername_callback(sock, req_hostname, cb_context, as_callback=True):
print('Loading certs for {}'.format(req_hostname))
server_cert = "ssl/{}/server".format(req_hostname) # NOTE: This use of socket input is INSECURE
cb_context.load_cert_chain(certfile="{}.crt".format(server_cert), keyfile="{}.key".format(server_cert))
# Seems like this is designed usage: https://github.com/python/cpython/blob/3.4/Modules/_ssl.c#L1469
sock.context = cb_context
return None
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.set_servername_callback(servername_callback)
default_cert = "ssl/3.1/server"
context.load_cert_chain(certfile="{}.crt".format(default_cert), keyfile="{}.key".format(default_cert))
ssl_sock = context.wrap_socket(c, server_side=True)
try:
handle_client(ssl_sock, a)
finally:
c.close()
except KeyboardInterrupt:
s.close()
if __name__ == '__main__':
hostname = ''
port = 6789
run_server(hostname, port)
Run Code Online (Sandbox Code Playgroud)
客户端代码:
import sys
import pickle
import socket
import ssl
request = {'msgtype': 0, 'value': 'Ping', 'test': [chr(i) for i in range(256)]}
response = {'msgtype': 1, 'value': 'Pong'}
def client(hostname, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print("Connecting to {}:{}".format(hostname, port))
s.connect((hostname, port))
ssl_sock = ssl.SSLSocket(sock=s, ca_certs="server_old.crt", cert_reqs=ssl.CERT_REQUIRED, server_hostname='3.2')
print("Sending message: {}".format(request))
req = pickle.dumps(request)
ssl_sock.send(req)
resp_raw = ssl_sock.recv(10000)
resp = pickle.loads(resp_raw)
print("Received message: {}".format(resp))
ssl_sock.close()
if __name__ == '__main__':
hostname = 'localhost'
port = 6789
client(hostname, port)
Run Code Online (Sandbox Code Playgroud)
但它不起作用。似乎正在发生的事情是servername_callback被调用,正在获取指定的“主机名”,并且context.load_cert_chain回调中的调用没有失败(尽管如果给定的路径不存在,它确实会失败)。但是,服务器始终返回调用之前加载的证书对context.wrap_socket(c, server_side=True)。所以我的问题是:在 , 中是否有某种方法servername_callback可以修改 ssl 上下文使用的密钥对,并获取该密钥对的证书以用于连接?
我还应该注意,我检查了流量,并且在servername_callback函数返回之前不会发送服务器的证书(如果它无法成功完成,则永远不会发送,或返回“失败”值)。
在您的回调中,cb_context是被调用的相同上下文wrap_socket(),并且与 相同socket.context,因此socket.context = cb_context将上下文设置为与之前相同。
更改上下文的证书链不会影响当前wrap_socket()操作使用的证书。对此的解释在于 openssl 如何创建其底层对象,在这种情况下,底层 SSL 结构已经创建并使用链的副本:
笔记
当调用 SSL_new() 时,与 SSL_CTX 结构关联的链将被复制到任何 SSL 结构。SSL 结构不会受到父级 SSL_CTX 中随后更改的任何链的影响。
设置新上下文时,SSL 结构会更新,但当新上下文等于旧上下文时,不会执行该更新。
您需要设置sock.context不同的上下文才能使其工作。当前,您在每个新传入连接上实例化一个新上下文,但这是不需要的。相反,您应该仅实例化您的标准上下文一次并重用它。动态加载的上下文也是如此,您可以在启动时创建它们并将它们放入字典中,这样您就可以进行查找,例如:
...
contexts = {}
for hostname in os.listdir("ssl"):
print('Loading certs for {}'.format(hostname))
server_cert = "ssl/{}/server".format(hostname)
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile="{}.crt".format(server_cert),
keyfile="{}.key".format(server_cert))
contexts[hostname] = context
def servername_callback(sock, req_hostname, cb_context, as_callback=True):
context = contexts.get(req_hostname)
if context is not None:
sock.context = context
else:
pass # handle unknown hostname case
def run_server(hostname, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((hostname, port))
s.listen(8)
print("Serving on {}:{}".format(hostname, port))
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.set_servername_callback(servername_callback)
default_cert = "ssl/3.1/server"
context.load_cert_chain(certfile="{}.crt".format(default_cert),
keyfile="{}.key".format(default_cert))
try:
while True:
(c, a) = s.accept()
ssl_sock = context.wrap_socket(c, server_side=True)
try:
handle_client(ssl_sock, a)
finally:
c.close()
except KeyboardInterrupt:
s.close()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3374 次 |
| 最近记录: |