Python的分布式锁管理器

Jan*_*nis 11 python python-2.7

我有一堆服务器,其中有多个实例访问对每秒请求有硬限制的资源.

我需要一种机制来锁定所有正在运行的服务器和实例对此资源的访问.

我在github上找到了一个安静的分布式锁管理器:https://github.com/thefab/restful-distributed-lock-manager

不幸的是,似乎有一分钟.锁定时间为1秒,相对不可靠.在几次测试中,解锁1秒锁需要1到3秒.

有没有经过python接口测试的东西我可以用于此目的?

编辑:我需要在1秒内自动解锁的东西.锁永远不会在我的代码中释放.

Jan*_*sky 17

我的第一个想法是使用Redis.但是有更多伟大的工具,有些甚至更轻,所以我的解决方案建立在zmq上.因此,您不必运行Redis,只需运行小型Python脚本即可.

要求审查

在描述解决方案之前,让我先回顾一下您

  • 在固定的时间段内限制某些资源对多个请求的请求数.

  • 自动解锁

  • 资源(自动)解锁应在短于1秒的时间内完成.

  • 它应该分发.我假设,你的意思是消耗一些资源的多个分布式服务器应该能够并且只有一个更衣室服务(在结论上更多)

概念

限制时间段内的请求数

时间段可以是秒,秒或更短的时间.唯一的限制是Python中的时间测量精度.

如果您的资源每秒定义了硬限制,则应使用时间段1.0

监控每个时隙的请求数,直到下一个时隙开始

首次请求访问您的资源时,请设置下一个时间段的开始时间并初始化请求计数器.

对于每个请求,增加请求计数器(对于当前时隙)并允许请求,除非您已达到当前时隙中允许的最大请求数.

使用带有REQ/REP的zmq服务

您的消费服务器可以分布在更多计算机上.要提供对LockerServer的访问,您将使用zmq.

示例代码

zmqlocker.py:

import time
import zmq

class Locker():
    def __init__(self, max_requests=1, in_seconds=1.0):
        self.max_requests = max_requests
        self.in_seconds = in_seconds
        self.requests = 0
        now = time.time()
        self.next_slot = now + in_seconds

    def __iter__(self):
        return self

    def next(self):
        now = time.time()
        if now > self.next_slot:
            self.requests = 0
            self.next_slot = now + self.in_seconds
        if self.requests < self.max_requests:
            self.requests += 1
            return "go"
        else:
            return "sorry"


class LockerServer():
    def __init__(self, max_requests=1, in_seconds=1.0, url="tcp://*:7777"):
        locker=Locker(max_requests, in_seconds)
        cnt = zmq.Context()
        sck = cnt.socket(zmq.REP)
        sck.bind(url)
        while True:
            msg = sck.recv()
            sck.send(locker.next())

class LockerClient():
    def __init__(self, url="tcp://localhost:7777"):
        cnt = zmq.Context()
        self.sck = cnt.socket(zmq.REQ)
        self.sck.connect(url)
    def next(self):
        self.sck.send("let me go")
        return self.sck.recv()
Run Code Online (Sandbox Code Playgroud)

运行你的服务器:

run_server.py:

from zmqlocker import LockerServer

svr = LockerServer(max_requests=5, in_seconds=0.8)
Run Code Online (Sandbox Code Playgroud)

从命令行:

$ python run_server.py
Run Code Online (Sandbox Code Playgroud)

这将开始在localhost上的默认端口7777上提供更衣室服务.

运行您的客户

run_client.py:

from zmqlocker import LockerClient
import time

locker_cli = LockerClient()

for i in xrange(100):
    print time.time(), locker_cli.next()
    time.sleep(0.1)
Run Code Online (Sandbox Code Playgroud)

从命令行:

$ python run_client.py
Run Code Online (Sandbox Code Playgroud)

你应该看到"go","go","sorry"......回复打印.

尝试运行更多客户端.

一点压力测试

您可以先启动客户端,然后再启动服务器.客户端将阻塞,直到服务器启动,然后将愉快地运行.

结论

  • 描述的要求得到满足
    • 请求数量有限
    • 无需解锁,只要下一个时间段可用,它就会允许更多请求
    • LockerService可通过网络或本地套接字使用.
  • 它应该是可靠的,zmq是成熟的解决方案,python代码相当简单
  • 它不需要所有参与者的时间同步
  • 表现会很好

另一方面,您可能会发现,资源的限制并不像您假设的那样可预测,因此请准备好使用参数来找到适当的平衡,并始终为此方面的异常做好准备.

还有一些空间可以优化提供"锁定" - 例如,如果锁定器超出允许的请求,但当前时间段已经基本完成,您可能会考虑等待"抱歉",并在一小段时间后提供"go" ".

将其扩展到真正的分布式锁管理器

通过"分布式",我们也可以了解一起运行的多个更衣室服务器.这样做更难,但也有可能.zmq允许非常容易地连接到多个URL,因此客户端可以非常轻松地连接到多个更衣室服务器.有一个问题,如何协调更衣室服务器不允许对您的资源提出太多请求.zmq允许服务器间通信.一个模型可能是,每个更衣室服务器将发布PUB/SUB上提供的每个"go".将订阅所有其他更衣室服务器,并使用每个"go"来增加其本地请求计数器(具有位修改逻辑).