当前位置: 首页 > 工具软件 > Keyspace > 使用案例 >

Redis的发布订阅与keyspaces-notification实现定时通知

萧韬
2023-12-01

Redis的发布订阅与MQ消息队列是一样的,客户端需要首先订阅消息,此事件是阻塞的,发布者发布消息后,订阅了此消息的所有订阅者都会收到进行消费。

1.自定义发布订阅

#同时订阅了三个频道,一旦其中一个接收到消息就会进行数据的展示
subscribe java php c++
#发布消息
publish java springboot
#订阅所有通配的频道
psubscribe java* php*

2.内部事件发布订阅

2.1 业务场景分析

业务场景举例:

订单超时未支付,实现自动关闭,如何实现?

  1. 使用Quartz任务调度框架,定时调度方式来检查,如果使用5分钟为单位轮询一次,这样的话假如订单60分钟超时,那么我们可能到(60+5)分钟-1秒才检测到。Quartz更适合周期性的任务,所以在这里显然不适用。
  2. Timer Java原生的定时器,以秒为单位进行检测,可以少量使用,如果数据量大,性能是瓶颈。
  3. Quartz+Timer形式来作:5分钟做一次大循环,在5分钟内查出超时的订单,然后创建多线程通过timer完成订单定时检查,实现比较复杂。

所以如果能够主动的通知的形式来做过期提醒就好了。
恰好redis给我们提供了这样的一个监听机制,当键过期了给我们一个通知。Redis 2.8.X版本后,推出了Keyspace Notifications特性,类似数据库的trigger触发器。
keyspace notifications是基于sub/pub发布订阅机制的,可以接收对数据库中影响key操作的所有事件:比如del、set、expire(过期事件)。。。

2.2 接收的事件类型

有两种:keyspace,keyevent
keyspace: 是key触发的事件的具体操作
keyevent: 是事件影响的键名

#pub这个动作是系统自动发布的
127.0.0.1:6379>del mykey
#数据库0会发布以下两个信息,__这是两个英文状态下的下划线
publish __keyspace@0__:mykey del
publish __keyevent@0__:del mykey
2.3 开启系统通知

redis.conf配置文件里

# 这都是配置项
#  K     Keyspace events, published with __keyspace@<db>__ prefix.
#  E     Keyevent events, published with __keyevent@<db>__ prefix.
#  g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
#  $     String commands
#  l     List commands
#  s     Set commands
#  h     Hash commands
#  z     Sorted set commands
#  x     Expired events (events generated every time a key expires)
#  e     Evicted events (events generated when a key is evicted for maxmemory)
#  A     Alias for g$lshzxe, so that the "AKE" string means all the events.

开启事件通知

#开关在redis.conf配置里
notify-keyspace-events ""  #默认是空字符串,是关闭状态
notify-keyspace-events "KEx"  #配置文件里只配置了space和event的expried事件,就只自动发布这个事件。
2.4 订阅key的编写

通知事件的发生是由Redis自动触发的,不需要手动pub,只需要进行sub订阅即可

订阅语法

#主动订阅key的过期通知
subscribe __keyspace@0__:keyname   #具体的key对应的影响
subscribe __keyevent@0__:expired    #具体的事件影响的key
#如果我们有很多数据库,可以通配订阅
psubscribe __key*@*__:* 

3 springboot集成

POM依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    </dependencies>

yaml配置

spring:
  redis:
	 host: 127.0.0.1
 	 port: 6379

第一种方式:

service

@Service
public class MessageReceiver {
    //接收消息的方法
    public void receiveMessage(String message){
        //message接收到的过期key
        System.out.println("Redis 监听:"+message);
    }
}

配置类configuration

@Configuration
public class RedisMessageConfig {

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅触发的通道
        container.addMessageListener(listenerAdapter,
                new PatternTopic("__keyevent@0__:expired"));
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(MessageReceiver receiver){
        return new MessageListenerAdapter(receiver,"receiveMessage");
    }
}

第二种方式:继承MessageListener
service
继承KeyspaceEventMessageListener可以订阅到所有的事件,set,del,expire

@Service
public class RedisNormalCmd extends KeyspaceEventMessageListener {

	public RedisNormalCmd(RedisMessageListenerContainer listenerContainer) {
		super(listenerContainer);
	}

	@Override
	protected void doHandleMessage(Message message) {
		System.out.println(message);
	}
}

继承KeyExpirationEventMessageListener只能订阅到键过期的事件

@Service
public class RedisTask  extends KeyExpirationEventMessageListener {

	private Logger logger = LoggerFactory.getLogger(RedisTask.class);

	public RedisTask(RedisMessageListenerContainer listenerContainer) {
		super(listenerContainer);
	}

	@Override
	public void onMessage(Message message, byte[] pattern) {
		String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
		//过期的key
		String key = new String(message.getBody(),StandardCharsets.UTF_8);
		logger.info("redis key 过期:pattern={},channel={},key={}",new String(pattern),channel,key);

	}

}

配置类configuration

@Configuration
public class RedisTimerConfiguration {
	private Logger logger = LoggerFactory.getLogger(RedisTimerConfiguration.class);
	@Autowired
	private RedisConnectionFactory redisConnectionFactory;
	@Bean
	public RedisMessageListenerContainer redisMessageListenerContainer() {
		RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
		redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
		return redisMessageListenerContainer;
	}
}
 类似资料: