在集群下,经常会因为同时处理发生资源争抢和并发问题,但是我们都知道同步锁 synchronized 、 cas 、 ReentrankLock 这些锁的作用范围都是 JVM ,说白了在集群下没啥用。这时我们就需要能在多台 JVM 之间决定执行顺序的锁了,现在分布式锁主要有 redis 、 Zookeeper 实现的,还有数据库的方式,不过性能太差,也就是需要一个第三方的监管。
背景
最近在做一个消费 Kafka 消息的时候发现,由于线上的消费者过多,经常会遇到,多个机器同时处理一个主键类型的数据的情况发生,如果最后是执行更新操作的话,也就是一个更新顺序的问题,但是如果恰好都需要插入数据的时候,会出现主键重复的问题。这是生产上不被允许的(因为公司有异常监管的机制,扣分啥的),这是就需要个分布式锁了,斟酌后用了 Redis 的实现方式(因为网上例子多)
分析
redis 实现的分布式锁,实现原理是 set 方法,因为多个线程同时请求的时候,只有一个线程可以成功并返回结果,还可以设置有效期,来避免死锁的发生,一切都是这么的完美,不过有个问题,在 set 的时候,会直接返回结果,成功或者失败,不具有阻塞效果,需要我们自己对失败的线程进程处理,有两种方式
代码
直接上代码 其实直接redis的工具类就可以解决了
package com.test import redis.clients.jedis.Jedis; import java.util.Collections; import java.util.List; /** * @desc redis队列实现方式 * @anthor * @date **/ public class RedisUcUitl { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; private RedisUcUitl() { } /** * logger **/ /** * 存储redis队列顺序存储 在队列首部存入 * * @param key 字节类型 * @param value 字节类型 */ public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) { return jedis.lpush(key, value); } /** * 移除列表中最后一个元素 并将改元素添加入另一个列表中 ,当列表为空时 将阻塞连接 直到等待超时 * * @param srckey * @param dstkey * @param timeout 0 表示永不超时 * @return */ public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) { return jedis.brpoplpush(srckey, dstkey, timeout); } /** * 返回制定的key,起始位置的redis数据 * @param redisKey * @param start * @param end -1 表示到最后 * @return */ public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) { return jedis.lrange(redisKey, start, end); } /** * 删除key * @param redisKey */ public static void delete(Jedis jedis, final byte[] redisKey) { return jedis.del(redisKey); } /** * 尝试加锁 * @param lockKey key名称 * @param requestId 身份标识 * @param expireTime 过期时间 * @return */ public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) { String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); return LOCK_SUCCESS.equals(result); } /** * 释放锁 * @param lockKey key名称 * @param requestId 身份标识 * @return */ public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) { final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); return RELEASE_SUCCESS.equals(result); } }
业务逻辑主要代码如下
1.先消耗队列中的
while(true){ // 消费队列 try{ // 被放入redis队列的数据 序列化后的 byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1); if(bytes == null || bytes.isEmpty()){ // 队列中没数据时退出 break; } // 反序列化对象 Map<String, Object> singleMap = (Map<String, Object>) ObjectSerialUtil.bytesToObject(bytes); // 塞入唯一的值 防止被其他线程误解锁 String requestId = UUID.randomUUID().toString(); boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100); if(lockGetFlag){ // 成功获取锁 进行业务处理 //TODO // 处理完毕释放锁 boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId); }else{ // 未能获得锁放入等待队列 RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param)); } }catch(Exception e){ break; } }
2.处理最新接到的数据
同样是走尝试获取锁,获取不到放入队列的流程
一般序列化用 fastJson 之列的就可以了,这里用的是 JDK 自带的,工具类如下
public class ObjectSerialUtil { private ObjectSerialUtil() { // 工具类 } /** * 将Object对象序列化为byte[] * * @param obj 对象 * @return byte数组 * @throws Exception */ public static byte[] objectToBytes(Object obj) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); byte[] bytes = bos.toByteArray(); bos.close(); oos.close(); return bytes; } /** * 将bytes数组还原为对象 * * @param bytes * @return * @throws Exception */ public static Object bytesToObject(byte[] bytes) { try { ByteArrayInputStream bin = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bin); return ois.readObject(); } catch (Exception e) { throw new BaseException("反序列化出错!", e); } } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。
背景 在很多互联网产品应用中,有些场景需要加锁处理,比如:秒杀,全局递增ID,楼层生成等等。大部分的解决方案是基于DB实现的,Redis为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对redis的连接并不存在竞争关系。其次Redis提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。 一、使用分布式锁要满足的几个条件: 系统是一个分布式系统(关键是分布式,单机的可以
使用Redis实现分布式锁 redis命令:set users 10 nx ex 12 原子性命令 //使用uuid,解决锁释放的问题 @GetMapping public void testLock() throws InterruptedException { String uuid = UUID.randomUUID().toString(); Boolean b_loc
本文向大家介绍Redis 怎么实现分布式锁?相关面试题,主要包含被问及Redis 怎么实现分布式锁?时的应答技巧和注意事项,需要的朋友参考一下 Redis 分布式锁其实就是在系统里面占一个“坑”,其他程序也要占“坑”的时候,占用成功了就可以继续执行,失败了就只能放弃或稍后重试。 占坑一般使用 setnx(set if not exists)指令,只允许被一个程序占有,使用完调用 del 释放锁。
本文向大家介绍SpringBoot集成Redisson实现分布式锁的方法示例,包括了SpringBoot集成Redisson实现分布式锁的方法示例的使用技巧和注意事项,需要的朋友参考一下 上篇 《SpringBoot 集成 redis 分布式锁优化》对死锁的问题进行了优化,今天介绍的是 redis 官方推荐使用的 Redisson ,Redisson 架设在 redis 基础上的 Java 驻内存
主要内容:Redis分布式锁介绍,Redis分布式锁命令在分布式系统中,当不同进程或线程一起访问共享资源时,会造成资源争抢,如果不加以控制的话,就会引发程序错乱。此时使用分布式锁能够非常有效的解决这个问题,它采用了一种互斥机制来防止线程或进程间相互干扰,从而保证了数据的一致性。 提示:如果对分布式系统这一概念不清楚,可参考百度百科《分布式系统》,简而言之,它是一种架构、一种模式。 Redis分布式锁介绍 分布式锁并非是 Redis 独有,比如 MySQ
一个挺着啤酒肚,身穿格子衫,发际线严重后移的中年男子,手拿着保温杯,胳膊夹着MacBook向你走来,看样子是架构师级别。 面试开始, 直入正题。 面试官: 你有没有参与过秒杀系统的设计? 我: 没有,我平时都是开发后台管理系统、OA办公系统、内部管理系统,从来没有开发过秒杀系统。 面试官: 嗯...,小伙子很实诚。今天就先到这里吧,后面有消息会主动联系你。 后面还可能有消息吗?你们啥时候主动联系过