在 Python 中使用 ssl context.set_servername_callback

cal*_*leb 5 python ssl pyopenssl

我的目标是允许 ssl 客户端从服务器的许多有效证书对中进行选择。客户端有一个 CA 证书,它将用来验证来自服务器的证书。

因此,为了尝试实现这一点,我ssl.SSLContext.set_servername_callback()将服务器上的 与ssl.SSLSocket.wrap_socket's parameter:server_hostname`结合使用,以尝试允许客户端指定要使用的密钥对。代码如下所示:

服务器代码:

import sys
import pickle
import ssl
import socket
import select

request = {'msgtype': 0, 'value': 'Ping', 'test': [chr(i) for i in range(256)]}
response = {'msgtype': 1, 'value': 'Pong'}

def handle_client(c, a):
    print("Connection from {}:{}".format(*a))
    req_raw = c.recv(10000)
    req = pickle.loads(req_raw)
    print("Received message: {}".format(req))
    res = pickle.dumps(response)
    print("Sending message: {}".format(response))
    c.send(res)

def run_server(hostname, port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((hostname, port))
    s.listen(8)
    print("Serving on {}:{}".format(hostname, port))

    try:
        while True:
            (c, a) = s.accept()

            def servername_callback(sock, req_hostname, cb_context, as_callback=True):
                print('Loading certs for {}'.format(req_hostname))
                server_cert = "ssl/{}/server".format(req_hostname)  # NOTE: This use of socket input is INSECURE
                cb_context.load_cert_chain(certfile="{}.crt".format(server_cert), keyfile="{}.key".format(server_cert))

                # Seems like this is designed usage: https://github.com/python/cpython/blob/3.4/Modules/_ssl.c#L1469
                sock.context = cb_context
                return None

            context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
            context.set_servername_callback(servername_callback)
            default_cert = "ssl/3.1/server"
            context.load_cert_chain(certfile="{}.crt".format(default_cert), keyfile="{}.key".format(default_cert))
            ssl_sock = context.wrap_socket(c, server_side=True)

            try:
                handle_client(ssl_sock, a)
            finally:
                c.close()

    except KeyboardInterrupt:
        s.close()

if __name__ == '__main__':
    hostname = ''
    port = 6789
    run_server(hostname, port)
Run Code Online (Sandbox Code Playgroud)

客户端代码:

import sys
import pickle
import socket
import ssl

request = {'msgtype': 0, 'value': 'Ping', 'test': [chr(i) for i in range(256)]}
response = {'msgtype': 1, 'value': 'Pong'}


def client(hostname, port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print("Connecting to {}:{}".format(hostname, port))
    s.connect((hostname, port))

    ssl_sock = ssl.SSLSocket(sock=s, ca_certs="server_old.crt", cert_reqs=ssl.CERT_REQUIRED, server_hostname='3.2')

    print("Sending message: {}".format(request))
    req = pickle.dumps(request)
    ssl_sock.send(req)

    resp_raw = ssl_sock.recv(10000)
    resp = pickle.loads(resp_raw)
    print("Received message: {}".format(resp))

    ssl_sock.close()

if __name__ == '__main__':
    hostname = 'localhost'
    port = 6789
    client(hostname, port)
Run Code Online (Sandbox Code Playgroud)

但它不起作用。似乎正在发生的事情是servername_callback被调用,正在获取指定的“主机名”,并且context.load_cert_chain回调中的调用没有失败(尽管如果给定的路径不存在,它确实会失败)。但是,服务器始终返回调用之前加载的证书对context.wrap_socket(c, server_side=True)。所以我的问题是:在 , 中是否有某种方法servername_callback可以修改 ssl 上下文使用的密钥对,并获取该密钥对的证书以用于连接?

我还应该注意,我检查了流量,并且在servername_callback函数返回之前不会发送服务器的证书(如果它无法成功完成,则永远不会发送,或返回“失败”值)。

mat*_*ata 3

在您的回调中,cb_context是被调用的相同上下文wrap_socket(),并且与 相同socket.context,因此socket.context = cb_context将上下文设置为与之前相同。

更改上下文的证书链不会影响当前wrap_socket()操作使用的证书。对此的解释在于 openssl 如何创建其底层对象,在这种情况下,底层 SSL 结构已经创建并使用链的副本

笔记

当调用 SSL_new() 时,与 SSL_CTX 结构关联的链将被复制到任何 SSL 结构。SSL 结构不会受到父级 SSL_CTX 中随后更改的任何链的影响。

设置新上下文时,SSL 结构会更新,但当新上下文等于旧上下文时,不会执行该更新。

您需要设置sock.context不同上下文才能使其工作。当前,您在每个新传入连接上实例化一个新上下文,但这是不需要的。相反,您应该仅实例化您的标准上下文一次并重用它。动态加载的上下文也是如此,您可以在启动时创建它们并将它们放入字典中,这样您就可以进行查找,例如:

...

contexts = {}

for hostname in os.listdir("ssl"):
    print('Loading certs for {}'.format(hostname))
    server_cert = "ssl/{}/server".format(hostname)
    context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
    context.load_cert_chain(certfile="{}.crt".format(server_cert),
                            keyfile="{}.key".format(server_cert))
    contexts[hostname] = context

def servername_callback(sock, req_hostname, cb_context, as_callback=True):
    context = contexts.get(req_hostname)
    if context is not None:
        sock.context = context
    else:
        pass  # handle unknown hostname case

def run_server(hostname, port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((hostname, port))
    s.listen(8)
    print("Serving on {}:{}".format(hostname, port))

    context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
    context.set_servername_callback(servername_callback)
    default_cert = "ssl/3.1/server"
    context.load_cert_chain(certfile="{}.crt".format(default_cert),
                            keyfile="{}.key".format(default_cert))

    try:
        while True:
            (c, a) = s.accept()
            ssl_sock = context.wrap_socket(c, server_side=True)
            try:
                handle_client(ssl_sock, a)
            finally:
                c.close()

    except KeyboardInterrupt:
        s.close()
Run Code Online (Sandbox Code Playgroud)