sql >> Base de Datos >  >> NoSQL >> Redis

Incremento distribuido Redis con bloqueo

De hecho, su código no es seguro alrededor del límite de rollover, porque está haciendo un "obtener", (latencia y pensamiento), "establecer", sin verificar que las condiciones en su "obtener" todavía se aplican. Si el servidor está ocupado con el elemento 1000, sería posible obtener todo tipo de salidas locas, incluidas cosas como:

1
2
...
999
1000 // when "get" returns 998, so you do an incr
1001 // ditto
1002 // ditto
0 // when "get" returns 999 or above, so you do a set
0 // ditto
0 // ditto
1

Opciones:

  1. utilice las API de transacciones y restricciones para que su lógica sea segura para la concurrencia
  2. reescribe tu lógica como un script Lua a través de ScriptEvaluate

Ahora, las transacciones redis (según la opción 1) son difíciles. Personalmente, usaría "2"; además de ser más simple de codificar y depurar, significa que solo tiene 1 ida y vuelta y operación, a diferencia de "get, watch, get, multi, incr/set, exec/ descartar" y un bucle de "reintentar desde el inicio" para tener en cuenta el escenario de aborto. Puedo tratar de escribirlo como Lua si lo desea; debe tener unas 4 líneas.

Aquí está la implementación de Lua:

string key = ...
for(int i = 0; i < 2000; i++) // just a test loop for me; you'd only do it once etc
{
    int result = (int) db.ScriptEvaluate(@"
local result = redis.call('incr', KEYS[1])
if result > 999 then
    result = 0
    redis.call('set', KEYS[1], result)
end
return result", new RedisKey[] { key });
    Console.WriteLine(result);
}

Nota:si necesita parametrizar el máximo, usaría:

if result > tonumber(ARGV[1]) then

y:

int result = (int)db.ScriptEvaluate(...,
    new RedisKey[] { key }, new RedisValue[] { max });

(entonces ARGV[1] toma el valor de max )

Es necesario entender que eval /evalsha (que es lo que ScriptEvaluate llamadas) no compiten con otras solicitudes del servidor , por lo que nada cambia entre incr y el posible set . Esto significa que no necesitamos un watch complejo etc lógica.

Aquí está lo mismo (¡creo!) a través de la API de transacción/restricción:

static int IncrementAndLoopToZero(IDatabase db, RedisKey key, int max)
{
    int result;
    bool success;
    do
    {
        RedisValue current = db.StringGet(key);
        var tran = db.CreateTransaction();
        // assert hasn't changed - note this handles "not exists" correctly
        tran.AddCondition(Condition.StringEqual(key, current));
        if(((int)current) > max)
        {
            result = 0;
            tran.StringSetAsync(key, result, flags: CommandFlags.FireAndForget);
        }
        else
        {
            result = ((int)current) + 1;
            tran.StringIncrementAsync(key, flags: CommandFlags.FireAndForget);
        }
        success = tran.Execute(); // if assertion fails, returns false and aborts
    } while (!success); // and if it aborts, we need to redo
    return result;
}

Complicado, ¿eh? El caso de éxito sencillo aquí está entonces:

GET {key}    # get the current value
WATCH {key}  # assertion stating that {key} should be guarded
GET {key}    # used by the assertion to check the value
MULTI        # begin a block
INCR {key}   # increment {key}
EXEC         # execute the block *if WATCH is happy*

lo cual es... un poco de trabajo, e implica un bloqueo de tubería en el multiplexor. Los casos más complicados (fallos de aserción, fallos de observación, reinicios) tendrían un resultado ligeramente diferente, pero deberían funcionar.