Redis通过锁定分布增量

Kam*_*hid 9 c# redis servicestack.redis stackexchange.redis .net-4.5.2

我需要生成一个计数器,它将被发送到一些api调用.我的应用程序在多个节点上运行,所以我想要生成一个独特的计数器.我试过以下代码

 public static long GetTransactionCountForUser(int telcoId)
    {
        long valreturn = 0;
        string key = "TelcoId:" + telcoId + ":Sequence";
        if (Muxer != null && Muxer.IsConnected && (Muxer.GetDatabase()) != null)
        {
            IDatabase db = Muxer.GetDatabase();
            var val = db.StringGet(key);
            int maxVal = 999;
            if (Convert.ToInt32(val) < maxVal)
            {
                valreturn = db.StringIncrement(key);
            }
            else
            {
                bool isdone = db.StringSet(key, valreturn);
                //db.SetAdd(key,new RedisValue) .StringIncrement(key, Convert.ToDouble(val))
            }
        }
        return valreturn;
    }
Run Code Online (Sandbox Code Playgroud)

并通过Task Parallel libray运行测试.当我有边界值时,我看到的是设置了多个时间0条目

请让我知道我需要做哪些更正

更新:我的最终逻辑如下

 public static long GetSequenceNumberForTelcoApiCallViaLuaScript(int telcoId)
    {
        long valreturn = 0;
        int maxIncrement = 9999;//todo via configuration
        if (true)//todo via configuration
        {
            IDatabase db;
            string key = "TelcoId:" + telcoId + ":SequenceNumber";
            if (Muxer != null && Muxer.IsConnected && (db = Muxer.GetDatabase()) != null)
            {
                valreturn = (int)db.ScriptEvaluate(@"
                    local result = redis.call('incr', KEYS[1])
                    if result > tonumber(ARGV[1]) then
                    result = 1
                    redis.call('set', KEYS[1], result)
                    end
                    return result", new RedisKey[] { key }, flags: CommandFlags.HighPriority, values: new RedisValue[] { maxIncrement });
            }
        }
        return valreturn;
    }
Run Code Online (Sandbox Code Playgroud)

Mar*_*ell 14

实际上,您的代码在翻转边界周围并不安全,因为您正在执行"获取",(延迟和思考),"设置" - 而不检查"获取"中的条件是否仍然适用.如果服务器在项目1000周围忙碌,则可能会获得各种疯狂​​的输出,包括:

1
2
...
999
1000 // when "get" returns 998, so you do an incr
1001 // ditto
1002 // ditto
0 // when "get" returns 999 or above, so you do a set
0 // ditto
0 // ditto
1
Run Code Online (Sandbox Code Playgroud)

选项:

  1. 使用事务和约束API使您的逻辑并发安全
  2. 通过改写您的逻辑作为Lua脚本 ScriptEvaluate

现在,redis事务(每个选项1)很难.就个人而言,我使用"2" - 除了更简单的代码和调试,它意味着你只有1次往返和操作,而不是"获取,观看,获取,多,incr /设置,exec /丢弃",并从"开始重试"循环来解释中止场景.如果你愿意,我可以试着把它写成Lua - 它应该是4行左右.


这是Lua的实现:

string key = ...
for(int i = 0; i < 2000; i++) // just a test loop for me; you'd only do it once etc
{
    int result = (int) db.ScriptEvaluate(@"
local result = redis.call('incr', KEYS[1])
if result > 999 then
    result = 0
    redis.call('set', KEYS[1], result)
end
return result", new RedisKey[] { key });
    Console.WriteLine(result);
}
Run Code Online (Sandbox Code Playgroud)

注意:如果您需要参数化最大值,您可以使用:

if result > tonumber(ARGV[1]) then
Run Code Online (Sandbox Code Playgroud)

和:

int result = (int)db.ScriptEvaluate(...,
    new RedisKey[] { key }, new RedisValue[] { max });
Run Code Online (Sandbox Code Playgroud)

(所以ARGV[1]取值max)

有必要了解eval/ evalsha(这是什么ScriptEvaluate调用)不与其他服务器请求竞争,因此在incr和可能之间没有任何变化set.这意味着我们不需要复杂的watch等逻辑.

通过事务/约束API,这是相同的(我认为!):

static int IncrementAndLoopToZero(IDatabase db, RedisKey key, int max)
{
    int result;
    bool success;
    do
    {
        RedisValue current = db.StringGet(key);
        var tran = db.CreateTransaction();
        // assert hasn't changed - note this handles "not exists" correctly
        tran.AddCondition(Condition.StringEqual(key, current));
        if(((int)current) > max)
        {
            result = 0;
            tran.StringSetAsync(key, result, flags: CommandFlags.FireAndForget);
        }
        else
        {
            result = ((int)current) + 1;
            tran.StringIncrementAsync(key, flags: CommandFlags.FireAndForget);
        }
        success = tran.Execute(); // if assertion fails, returns false and aborts
    } while (!success); // and if it aborts, we need to redo
    return result;
}
Run Code Online (Sandbox Code Playgroud)

复杂,嗯?这里简单的成功案例是:

GET {key}    # get the current value
WATCH {key}  # assertion stating that {key} should be guarded
GET {key}    # used by the assertion to check the value
MULTI        # begin a block
INCR {key}   # increment {key}
EXEC         # execute the block *if WATCH is happy*
Run Code Online (Sandbox Code Playgroud)

这是......相当多的工作,并涉及多路复用器上的管道停顿.更复杂的情况(断言失败,监视失败,环绕)会产生稍微不同的输出,但应该有效.