当前位置: 首页 > 知识库问答 >
问题:

使用ServiceStack Redis实现分布式锁定的互斥体违规

舒博雅
2023-03-14

我试图使用ServiceStack-Redis库提供的锁定机制实现DLM,并在此进行了描述,但我发现API似乎呈现了一种竞争条件,有时会将相同的锁授予多个客户端。

BasicRedisClientManager mgr = new BasicRedisClientManager(redisConnStr);

using(var client = mgr.GetClient())
{
    client.Remove("touchcount");
    client.Increment("touchcount", 0);
}

Random rng = new Random();

Action<object> simulatedDistributedClientCode = (clientId) => {

    using(var redisClient = mgr.GetClient())
    {
        using(var mylock = redisClient.AcquireLock("mutex", TimeSpan.FromSeconds(2)))
        {
            long touches = redisClient.Get<long>("touchcount");
            Debug.WriteLine("client{0}: I acquired the lock! (touched: {1}x)", clientId, touches);
            if(touches > 0) {
                Debug.WriteLine("client{0}: Oh, but I see you've already been here. I'll release it.", clientId);
                return;
            }
            int arbitraryDurationOfExecutingCode = rng.Next(100, 2500);
            Thread.Sleep(arbitraryDurationOfExecutingCode); // do some work of arbitrary duration
            redisClient.Increment("touchcount", 1);
        }
        Debug.WriteLine("client{0}: Okay, I released my lock, your turn now.", clientId);
    }
};
Action<Task> exceptionWriter = (t) => {if(t.IsFaulted) Debug.WriteLine(t.Exception.InnerExceptions.First());};

int arbitraryDelayBetweenClients = rng.Next(5, 500);
var clientWorker1 = new Task(simulatedDistributedClientCode, 1);
var clientWorker2 = new Task(simulatedDistributedClientCode, 2);

clientWorker1.Start();
Thread.Sleep(arbitraryDelayBetweenClients);
clientWorker2.Start();

Task.WaitAll(
    clientWorker1.ContinueWith(exceptionWriter),
    clientWorker2.ContinueWith(exceptionWriter)
    );

using(var client = mgr.GetClient())
{
    var finaltouch = client.Get<long>("touchcount");
    Console.WriteLine("Touched a total of {0}x.", finaltouch);
}

mgr.Dispose();

当运行上述代码来模拟两个客户端在短时间内相继尝试相同的操作时,有三个可能的输出。第一种是互斥体正常工作、客户端按正确顺序进行的最佳情况。第二种情况是当第二个客户端在等待获取锁时超时;也是一个可以接受的结果。然而,问题是,当arbitrydurationofexecutingcode接近或超过获取锁的超时时间时,很容易再现这样的情况:在第一个客户端释放锁之前,第二个客户端获得锁,产生如下输出:

客户1:我买了锁!(触摸:0x)
客户端2:我获得了锁!(触摸:0x)
客户端1:好的,我释放了我的锁,现在轮到您。
客户端2:好的,我释放了我的锁,现在轮到您。
总共触摸了2x。

我对API及其文档的理解是,获取锁时的timeout参数仅仅意味着--获取锁的超时。如果我必须猜测timeout值足够高,以至于总是长于我执行代码的持续时间,以防止出现这种情况,那么这似乎很容易出错。除了传递null以永远等待锁之外,有人有其他工作吗?我绝对不想这样做,否则我知道我会得到死机工人的鬼锁。

共有1个答案

岳晟
2023-03-14

来自mythz的答案(感谢及时回复!)确认ServiceStack.Redis中内置的AcquireLock方法不区分锁获取周期和锁到期周期。出于我们的目的,我们已有的代码预期分布式锁定机制在锁定被占用时会迅速失败,但允许在锁定范围内长时间运行的进程。为了适应这些需求,我在ServiceStack重新解锁上派生了这个变体,它允许区分两者。

// based on ServiceStack.Redis.RedisLock
// https://github.com/ServiceStack/ServiceStack.Redis/blob/master/src/ServiceStack.Redis/RedisLock.cs
internal class RedisDlmLock : IDisposable
{
    public static readonly TimeSpan DefaultLockAcquisitionTimeout = TimeSpan.FromSeconds(30);
    public static readonly TimeSpan DefaultLockMaxAge = TimeSpan.FromHours(2);
    public const string LockPrefix = "";    // namespace lock keys if desired

    private readonly IRedisClient _client; // note that the held reference to client means lock scope should always be within client scope

    private readonly string _lockKey;
    private string _lockValue;

    /// <summary>
    /// Acquires a distributed lock on the specified key.
    /// </summary>
    /// <param name="redisClient">The client to use to acquire the lock.</param>
    /// <param name="key">The key to acquire the lock on.</param>
    /// <param name="acquisitionTimeOut">The amount of time to wait while trying to acquire the lock. Defaults to <see cref="DefaultLockAcquisitionTimeout"/>.</param>
    /// <param name="lockMaxAge">After this amount of time expires, the lock will be invalidated and other clients will be allowed to establish a new lock on the same key. Deafults to <see cref="DefaultLockMaxAge"/>.</param>
    public RedisDlmLock(IRedisClient redisClient, string key, TimeSpan? acquisitionTimeOut = null, TimeSpan? lockMaxAge = null)
    {
        _client = redisClient;
        _lockKey = LockPrefix + key;

        ExecExtensions.RetryUntilTrue(
            () =>
            {
                //Modified from ServiceStack.Redis.RedisLock
                //This pattern is taken from the redis command for SETNX http://redis.io/commands/setnx
                //Calculate a unix time for when the lock should expire

                lockMaxAge = lockMaxAge ?? DefaultLockMaxAge; // hold the lock for the default amount of time if not specified.
                DateTime expireTime = DateTime.UtcNow.Add(lockMaxAge.Value);
                _lockValue = (expireTime.ToUnixTimeMs() + 1).ToString(CultureInfo.InvariantCulture);

                //Try to set the lock, if it does not exist this will succeed and the lock is obtained
                var nx = redisClient.SetEntryIfNotExists(_lockKey, _lockValue);
                if (nx)
                    return true;

                //If we've gotten here then a key for the lock is present. This could be because the lock is
                //correctly acquired or it could be because a client that had acquired the lock crashed (or didn't release it properly).
                //Therefore we need to get the value of the lock to see when it should expire
                string existingLockValue = redisClient.Get<string>(_lockKey);
                long lockExpireTime;
                if (!long.TryParse(existingLockValue, out lockExpireTime))
                    return false;
                //If the expire time is greater than the current time then we can't let the lock go yet
                if (lockExpireTime > DateTime.UtcNow.ToUnixTimeMs())
                    return false;

                //If the expire time is less than the current time then it wasn't released properly and we can attempt to 
                //acquire the lock. This is done by setting the lock to our timeout string AND checking to make sure
                //that what is returned is the old timeout string in order to account for a possible race condition.
                return redisClient.GetAndSetEntry(_lockKey, _lockValue) == existingLockValue;
            },
            acquisitionTimeOut ?? DefaultLockAcquisitionTimeout // loop attempting to get the lock for this amount of time.
            );
    }

    public override string ToString()
    {
        return String.Format("RedisDlmLock:{0}:{1}", _lockKey, _lockValue);
    }

    public void Dispose()
    {
        try
        {
            // only remove the entry if it still contains OUR value
            _client.Watch(_lockKey);
            var currentValue = _client.Get<string>(_lockKey);
            if (currentValue != _lockValue)
            {
                _client.UnWatch();
                return;
            }

            using (var tx = _client.CreateTransaction())
            {
                tx.QueueCommand(r => r.Remove(_lockKey));
                tx.Commit();
            }
        }
        catch (Exception ex)
        {
            // log but don't throw
        }
    }
}

为了尽可能简化使用,我还公开了IredisClient的一些扩展方法,以并行AcquireLock方法,如下所示:

internal static class RedisClientLockExtensions
{
    public static IDisposable AcquireDlmLock(this IRedisClient client, string key, TimeSpan timeOut, TimeSpan maxAge)
    {
        return new RedisDlmLock(client, key, timeOut, maxAge);
    }
}
 类似资料:
  • Go语言包中的 sync 包提供了两种锁类型:sync.Mutex 和 sync.RWMutex。 Mutex 是最简单的一种锁类型,同时也比较暴力,当一个 goroutine 获得了 Mutex 后,其他 goroutine 就只能乖乖等到这个 goroutine 释放该 Mutex。 RWMutex 相对友好些,是经典的单写多读模型。在读锁占用的情况下,会阻止写,但不阻止读,也就是多个 gor

  • 问题内容: 帮助客户解决他们遇到的问题。我更多地是sysadmin / DBA的人,所以我在努力帮助他们。他们说这是内核/环境中的错误,在我坚持要在他们的代码中或寻求供应商对OS的支持之前,我试图证明或证明这一点。 发生在Red Hat和Oracle Enterprise Linux 5.7(和5.8)上,应用程序用C ++编写 他们遇到的问题是主线程启动一个单独的线程来执行可能长时间运行的TCP

  • Introduction This is the fourth part of the chapter which describes synchronization primitives in the Linux kernel and in the previous parts we finished to consider different types spinlocks and semap

  • 我已经编写了3个互斥类TMutex、TCondition和TSpinLock,它们都有一个void lock()和一个void unlock()成员。现在我想对它们使用std::lock\u-guard。我在源文件末尾为我的新互斥类安装了lock_guard,如下所示: 如果我使用: 我收到以下编译器错误消息: .../src/inc/threads.cpp:317: 39:错误:没有匹配函数调用

  • 本文向大家介绍互斥锁死锁,包括了互斥锁死锁的使用技巧和注意事项,需要的朋友参考一下 死锁可以在使用互斥锁的多线程Pthread程序中发生。让我们看看它如何发生。未锁定的互斥锁由pthread_mutex_init()函数初始化。 使用pthread_mutex_lock()和pthread_mutex_unlock()获取并释放互斥锁。如果线程尝试获取锁定的互斥锁,则对pthread_mutex_

  • 背景 在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等。大部分的解决方案是基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对redis的连接并不存在竞争关系。其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。 一、使用分布式锁要满足的几个条件: 系统是一个分布式系统(关键是分布式,单机的可以