分布式锁

优质
小牛编辑
140浏览
2023-12-01

本系统中的分布式锁设计用于Storm多个线程实例抢占Redis缓存资源时出现的事务性问题,这个事务性问题是由客户端本身业务逻辑需求产生的,无法在服务端进行有效处理,需给出一个分布式资源同步的方案,此处我们采用了分布式锁来完成这项设计。 锁是编程中非常常见的概念。在维基百科上对锁有个相当精确的定义:在计算机科学中,锁是一种在多线程环境中用于强行限制资源访问的同步机制。锁被设计用于执行一个互斥的并发控制策略。 简单的说,锁是一个单一的参考点,多个线程基于它来检查是否允许访问资源。例如,一个想写数据的线程,它必须先检查是否存在一个写锁。如果写锁存在,需要等待直到锁释放后它才能获取到属于它的锁并执行写操作。这样,通过锁就可以避免多个线程的同时写造成的数据冲突。

但是如果多线程的程序运行在多台机器上呢?如何在分布式系统下控制对资源的访问呢?这在分布式领域中是一个永恒的难题。

1. 使用一个中心化的锁服务

首先,我们需要一个所有线程都可以访问到的地方来存储锁。这个锁只能存在于一个地方,从而保证只有一个权威的地方可以定义锁的建立和释放。Redis 是实现锁的一个理想的候选方案。作为一个轻量级的内存数据库,快速,事务性和一致性是选择Redis所为锁服务的主要原因。

2. 设计锁

锁本身是很简单的,就是Redis数据库中一个简单的key。建立和释放锁,并保证绝对的安全,是这个锁的设计比较棘手的地方。有两个潜在的陷阱:

  • 应用程序通过网络和Redis交互,这意味着从应用程序发出命令到Redis结果返回之间会有延迟。这段时间内,Redis可能正在运行其他的命令,而Redis内数据的状态可能不是你的程序所期待的。如何保证程序中获取锁的线程和其他线程不发生冲突?
  • 如果程序在获取锁后突然crash,而无法释放它?这个锁会一直存在而导致程序进入“死锁”。

    3. 建立锁

    可能想到的最简单的方法是“用GET方法检查锁,如果锁不存在,就用SET方式设置一个值”。 这个方法虽然简单,但是不能保证独占锁。回顾前面所说的第1个陷阱:因为在GET和SET操作之间有延迟,我们没法知道从“发送命令”到“Redis服务器返回结果”之间的这段时间内是否有其他线程也去建立锁。当然,这些都在几毫秒之内,发生的可能性相当低。但是如果在一个繁忙的环境中运行着大量的并发线程和命令,重叠的可能性并不是微不足道的。 为了解决这个问题,应该用SETNX命令。SETNX消除了GET命令需要等待返回值的问题,SETNX只有在key不存在时才返回成功。这意味着只有一个线程可以成功运行SETNX命令,而其他线程会失败,然后不断重试,直到它们能建立锁。

    4. 释放锁

    一旦线程成功执行了SETNX命令,它就建立了锁并且可以基于资源进行工作。工作完成后,线程需要通过删除Redis的key来释放这个锁,从而允许其他线程能尽快的获取锁。 尽管如此,也有需要小心的地方!回顾前面说的第2个陷阱:如果线程crash了,它永远都不会删除Redis的key,所以这个锁会一直存在,从而导致“饿死”现象。那么如何避免这个问题呢?

    5. 锁的存活时间

    我们可以给锁加一个存活时间(TTL),这样一旦TTL超时,这个锁的key会被Redis自动删除。任何由于线程错误而遗留下来的锁在一个合适的时间之后都会被释放,从而避免了“饿死”。这纯粹是一个安全特性,更有效的方式仍然是确保尽量在线程里面释放锁。 可以通过PEXPIRE命令为Redis的key设置TTL,而且线程里可以通过MULTI/EXEC事务的方式在SETNX命令后立即执行,例如:
    MULTI
    SETNX lock-key
    PEXPIRE 10000 lock-key
    EXEC
    
    尽管如此,这会产生另外一个问题。PEXPIRE命令没有判断SETNX命令的返回结果,无论如何都会设置key的TTL。如果这个地方无法获取到锁或有异常,那么多个线程每次想获取锁时,都会频繁更新key的TTL,这样会一直延长key的TTL,导致key永远都不会过期。为了解决这个问题,我们需要Redis在一个命令里面处理这个逻辑。我们可以通过Redis 2.6.12之后版本SET命令的PX和NX参数来实现。 1.1.1.6 分布式锁服务实现 // 加锁函数
    public synchronized boolean lock(long timeout) {
      this.jedis = pool.getResource();
      long nano = System.nanoTime();
      timeout *= ONE_MILLI_NANOS;
      try {
          while ((System.nanoTime() - nano) < timeout) {
              if (jedis.setnx(key, LOCKED) == 1) {
                  jedis.expire(key, EXPIRE);
                  locked = true;
                  return locked;
              }
              // 短暂休眠,nano避免出现活锁
              Thread.sleep(3, r.nextInt(500));
           }
      } catch (Exception e) {
          e.printStackTrace();
      }finally {
          pool.returnResource(jedis);
      }
      return false;
    }
    // 去锁函数
    
    public synchronized void unlock() { this.jedis = pool.getResource(); try {
      if (locked)
    
    jedis.del(key); } finally { pool.returnResource(jedis); } }
    客户端调用
    
    RedisLock lock = new RedisLock("lockTest"); //加锁 lock.lock(); //去锁 lock.unlock(); ```