当前位置: 首页 > 工具软件 > Senna > 使用案例 >

senna延时队列

匡安宜
2023-12-01

1.应用场景

目前系统中有很多需要用到延时处理的功能:支付超时取消、排队超时、短信、微信等提醒发送、token刷新等等。通过延时处理,极大的节省系统的资源,不必轮询数据库处理任务。

大部分功能目前通过定时任务完成,定时任务还分使用quartz及xxljob两种类型轮询时间短,每秒执行一次,对数据库造成一定的压力,并且会有1秒的误差。轮询时间久,如30分钟一次,会造成30分钟的误差,例:轮询时3:00执行一次轮询,3.01插入一条数据,正常3:31执行过期,但是3:30执行轮询时,是扫描不到的,需要4:00的时候才能扫描到。

2.延时处理方式调研

1.DelayQueue

  jvm提供的延迟阻塞队列

存在的问题:

  1. 单机运行,系统宕机后,无法进行有效的重试
  2. 没有执行记录和备份
  3. 没有重试机制
  4. 没有监控
  5. 上线时,会将任务清空!

优势:实现简单,无任务时阻塞,节省资源

2.延迟队列mq

依赖mq,通过设置延迟消费时间,达到延迟消费功能。像rabbitMq 通过设置过期时间,放入死信队列进行消费实现。

存在的问题:

  1. 时间设置不灵活,每个queue是固定的到期时间,每次新创建延时队列,需要创建新的消息队列

优点:依靠jmq,可以有效的监控、消费记录、重试,具备多机同时消费能力,不惧怕宕机

3.定时任务

通过定时任务轮询符合条件的数据

缺点: 必须要读业务数据库,对数据库造成一定的压力,并且存在延时

优点:消费失败后,下次还能继续消费,具备重试能力,消费能力稳定

4.redis

任务存储在redis中,使用redis的 zset队列根据score进行排序消费实现延时队列

5.数据库

存储在数据库中,死循环取数据,通过将任务存储在单独的数据库中,异步循环任务,查询符合的记录,再将其异步进行调用

优点:不对业务数据库造成压力,具备重试、记录、等能力

缺点:存在延时,当数据量大时,查询效率降低,不能根据请求压力,灵活扩展能力。

6. 时间轮

通过时间轮实现的延迟任务执行,也是基于jvm单机运行,如kafka、netty都有实现时间轮,redisson的看门狗也是通过netty的时间轮实现的。

缺点:不适合分布式服务的使用,宕机后,会丢失任务。

3.实现目标

兼容目前在使用的异步事件组件,并提供更可靠,可重试、有记录、可监控报警、高性能的延迟组件

4.架构设计

                                                           基于数据流转的架构图

5.延迟组件实现方式

1.实现原理

目前选择使用jimdb通过zset实现延时功能,将任务id和对应的执行时间作为score存在在zset队列中,默认会按照score排序,每次取0-当前时间内的score的任务id,将任务id插入到发布队列,兼容jimdb分片,开启hashtag功能即可。

监控方通过集成ump,消费记录通过redis备份+数据库持久化完成。

通过缓存实现的方式,只是实现的一种,可以通过参数控制使用哪一种实现方式,并可通过spi自由扩展。

2.流程图

基于redis-disruptor方式进行发布、消费,可以作为消息来进行使用,性能的瓶颈在redis,不在组件本身,全组件只在发布任务需要获取任务id时会加一个基于jvm的锁,不同应用、不同queue之间无锁,在消费阶段无锁。

设计1:支持应用只发布,不消费,达到消息队列的功能。

设计2:支持分桶,针对大key问题,若事件多,可以设置延迟队列和任务队列桶的数量,减小因大key造成的redis阻塞问题。

设计3:  通过ducc配置,进行性能的扩展,目前只支持开启消费和关闭消费。  后期考虑增加桶的配置

设计4:  支持设置超时时间,防止消费线程执行过久,或者死锁出现(不是组件本身导致的死锁问题),如:分布式锁有了看门狗功能,会出现无限等待问题。

瓶颈1: 消费速度慢,生产速度过快,会导致ringbuffer队列占满,这时候生产者会休眠,性能取决于消费速度,可通过水平扩展机器,直接提升性能。监控redis队列的长度,若不断增长,可考虑增加消费者,大促时,可暂时编写消费者,直接提高性能。

可能出现的情况: 因一个应用公用一个disruptor,拥有64个消费者线程,如果某一个事件消费过慢,导致64个线程都在消费这个事件,会导致其他事件无消费线程消费,生产者线程也被阻塞,导致所有事件的消费都被阻塞。

后期观察是否有这个性能瓶颈,可给每一个queue一个消费者线程池。

瓶颈2:生产慢,消费快,目前的生产睡眠策略,可以通过MaxPopNum参数控制,若每次弹出的任务小于maxPopNum,会导致睡眠,每次睡100毫秒,若无数据,每次还会增加100ms,最多睡眠1秒。若大于配置的参数,不会睡眠。所以性能瓶颈不在于睡眠策略上,在于redis的连接+lua脚本的执行速度。这个性能,也可以通过横向扩展,来增加生产速度。

无论哪一种慢,基于disruptor的策略,都不会导致数据丢失或者不执行的问题,阻塞策略会执行导致生产线程休眠,不去抽取jimdb中的数据。 

6.demo示例

增加配置文件

判断是否开启jd.event.enable:true

1

2

3

jd:

  event:

    enable: true

配置文件形式:

配置

 展开源码

消费代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;

import com.jd.car.senna.event.annotation.SennaEvent;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

/**

 * @author zhangluyao

 * @description

 * @create 2022-02-21-9:54 下午

 */

@Slf4j

@Component("retryQueue")

public class RetryQueueEvent extends EventHandler {

    @Override

    protected void onHandle(String key, String eventType) {

        log.info("Handler开始消费:{}", key);

    }

    @Override

    protected void onDelayHandle(String key, String eventType) {

        log.info("delayHandler开始消费:{}", key);

    }

}

注解形式:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;

import com.jd.car.senna.event.annotation.SennaEvent;

import lombok.extern.slf4j.Slf4j;

/**

 * @author zhangluyao

 * @description

 * @create 2022-02-21-9:54 下午

 */

@Slf4j

@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)

public class TestQueueEvent extends EventHandler {

    @Override

    protected void onHandle(String key, String eventType) {

        log.info("Handler开始消费:{}", key);

    }

    @Override

    protected void onDelayHandle(String key, String eventType) {

        log.info("delayHandler开始消费:{}", key);

    }

}

发送代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

package com.jd.car.senna.admin.controller;

import com.jd.car.senna.event.queue.IEventQueue;

import lombok.extern.slf4j.Slf4j;

import org.springframework.context.annotation.Lazy;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.ResponseBody;

import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

import java.util.concurrent.CompletableFuture;

/**

 * @author zly

 */

@RestController

@Slf4j

public class DemoController {

    @Lazy

    @Resource(name = "testQueue")

    private IEventQueue eventQueue;

    @ResponseBody

    @GetMapping("/api/v1/demo")

    public String demo() {

        log.info("发送无延迟消息");

        eventQueue.push("no delay 5000 millseconds message 3");

        return "ok";

    }

   @ResponseBody

    @GetMapping("/api/v1/demo1")

    public String demo1() {

        log.info("发送延迟5秒消息");

        eventQueue.push(" delay 5000 millseconds message,name",1000*5L);

        return "ok";

    }

    @ResponseBody

    @GetMapping("/api/v1/demo2")

    public String demo2() {

        log.info("发送延迟到2022-04-02 00:00:00执行的消息");

        eventQueue.push(" delay message,name to 2022-04-02 00:00:00"new Date(1648828800000));

        return "ok";

    

}

 类似资料: