当前位置: 首页 > 工具软件 > Redisson > 使用案例 >

Redisson之几种分布式队列

司空坚
2023-12-01

前言:

   消息队列很好理解,顾明思议就是排队执行。Redisson 队列类似实现Java接口,例如Queue和BlockingQueue基于Redisson Api 来处理一些复杂的业务逻辑。

1.Queue

   Redisson中的RQueue对象实现了java.util.Queue接口。队列用于首先从最旧的元素开始处理元素的情况(也称为“先进先出”或FIFO)。与普通Java一样,RQueue的第一个元素可以使用peek()方法检查,或者使用poll()方法检查和删除。

RQueue<SomeObject> queue = redisson.getQueue("anyQueue");//定义个队列
queue.add(new SomeObject());
SomeObject obj = queue.peek(); //检查
SomeObject someObj = queue.poll();//取值

2.BlockingQueue

Redisson中的RBlockingQueue对象实现了java.util.BlockingQueue接口。BlockingQueues是阻止尝试从空队列轮询或尝试在已满队列中插入元素的线程的队列。线程被阻塞,直到另一个线程将一个元素插入空队列,或者从完整队列中轮询第一个元素。

您可以使用参数调用poll()方法,该参数指定线程等待元素变为可用的时间。

RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES); //使用参数调用poll()方法,该参数指定线程等待时间:

备注: 故障转移或重新连接到Redis服务器期间,将自动重新订阅poll(),pollFromAny(),pollLastAndOfferFirstTo()和take()方法.

3.BoundedBlockingQueue

 Redisson中的RBoundedBlockingQueue对象实现了有界阻塞队列结构。有界阻塞队列其容量有限。

下面的代码演示了如何在Redisson中实例化和使用RBoundedBlockingQueue。trySetCapacity()方法用于。:

RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
queue.trySetCapacity(2); //尝试设置阻塞队列的容量,trySetCapacity() 返回值: boolean
queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// 将会被阻止直到队列中有可用空间
queue.put(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

//

4.DelayedQueue

  Redisson中的RDelayedQueue对象允许您在Redis中实现延迟队列。在使用指数退避等策略向消费者传递消息,在每次尝试传递消息失败后,重试之间的时间将呈指数级增长。

RQueue<String> destinationQueue = redisson.getQueue("anyQueue");
RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);
// 在10秒内将对象移动到DestinationQueue
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 在1分中内将对象移动到DestinationQueue
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

//延迟队列中的每个元素将在与元素一起指定的延迟之后传输到目标队列。此目标队列可以是实现RQueue接口的任何队列,例如RBlockingQueue或RBoundedBlockingQueue。

5.PriorityQueue

Redisson中的RPriorityQueue对象实现了java.util.Queue接口。优先级队列是不按元素的年龄排序的队列,而是按每个元素关联的优先级排序的队列。


RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // 使用Comparator对队列中的元素进行排序:
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.poll();

6.PriorityBlockingQueue

 Redisson中的RPriorityBlockingQueue对象结合了RPriorityQueue和RBlockingQueue的功能。与RPriorityQueue一样,RPriorityBlockingQueue使用Comparator对队列中的元素进行排序。

RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // 设置比较对象
queue.add(3);
queue.add(1);
queue.add(2);
queue.removeAsync(0);
queue.addAsync(5);
queue.take();

//故障转移或重新连接到Redis服务器期间,将自动重新订阅pol(),pollLastAndOfferFirstTo()和take()

# 根据自己的业务场景选取对应的消息队列。

更多redis,关注专栏:https://blog.csdn.net/u011663149/column/info/35664


 

 类似资料: