当前位置: 首页 > 知识库问答 >
问题:

java - redis断连重启后,stream流监听会失效?

慕宜民
2023-11-15

公司项目使用了StreamListener进行监听redis stream流消息数据;但每隔十几二十天的就会失效监听不到数据;
初步判断:应该是网络或者连接数等问题导致程序与redis服务断开连接,但问题还是无法定位。
以下是代码,有大佬知道或者遇到过这类问题嘛,还请指教如何解决!

@Bean    public List<Subscription> subscription(RedisConnectionFactory factory){        List<Subscription> resultList = new ArrayList<>();        var options = StreamMessageListenerContainer                .StreamMessageListenerContainerOptions                .builder()                .pollTimeout(Duration.ofSeconds(1))                .build();        for (String redisStreamName : redisStreamNames) {            initStream(redisStreamName,groups[0]);            var listenerContainer = StreamMessageListenerContainer.create(factory,options);            Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(groups[0], this.getClass().getName()),                    StreamOffset.create(redisStreamName, ReadOffset.lastConsumed()), streamListener);            resultList.add(subscription);            listenerContainer.start();        }        return resultList;    }
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {    RedisCache redisCache;    public ListenerMessage(RedisCache redisCache){        this.redisCache = redisCache;    }    @Override    public void onMessage(MapRecord<String, String, String> entries) {        try{            Map<String, String> map = entries.getValue();            String private_chat = map.get("private_chat");            MessageSave messageSave = JSON.toJavaObject(JSON.parseObject(private_chat),MessageSave.class);                       log.info("当前正在处理:{}",messageSave.getMsgtime());            QyTagService qyTagService = SpringUtils.getBean(QyTagService.class);            qyTagService.auditPrivateMessage(messageSave);            //check用于验证key和对应消息是否一直            log.info("stream name :{}, body:{}, check:{}",entries.getStream(), map,(entries.getStream().equals(map.get("name"))));            redisCache.ack(entries.getStream(),"group2",entries.getId().getValue());            redisCache.delField(entries.getStream(),entries.getId().getValue());        }catch (Exception e){            log.error("error message:{}",e.getMessage());        }    }}

redis配置:

  redis:    expire: 60000 # 过期时间    database: 0 # Redis使用的库    host:     port: 6379 #端口号    timeout: 100000        #  连接超时时间(毫秒)    cache:      type: redis           #使用redis做缓存

共有2个答案

扈韬
2023-11-15

使用cancelOnError方法,返回false

//注册            var options = StreamMessageListenerContainer.StreamReadRequest                    .builder(StreamOffset.create(redisStreamName, ReadOffset.lastConsumed()))                    .cancelOnError(throwable -> {                        System.out.println("这是一个错误"+throwable);                        // 不能取消                        return !(throwable instanceof RuntimeException);                    })                    .consumer(Consumer.from(groups[0], this.getClass().getName()))                    .autoAck(true)                    .build();
燕青青
2023-11-15

这种问题可能是由于以下原因导致的:

  1. Redis连接超时:在你的代码中,可以看到设置了Redis连接的超时时间,但是这个超时时间可能不足以应对长时间的网络不稳定或者Redis服务器的延迟。这可能导致程序与Redis服务断开连接,从而使得stream流监听失效。可以尝试增加超时时间的设置,例如设置为60秒或者更长。
  2. Redis服务器重启:如果Redis服务器在程序断开连接后重启,那么监听可能会失效。这种情况下,可能需要重新连接Redis服务器。
  3. 网络问题:网络问题也可能导致连接丢失。可以尝试检查应用程序和Redis服务器之间的网络连接是否稳定。
  4. 订阅数量限制:Redis Streams对每个连接的订阅数量有限制,如果超过这个限制,可能会导致一些订阅被丢弃。
  5. Redis配置问题:检查Redis配置,确保Redis服务器的配置参数(如超时时间、最大客户端连接数等)与应用程序的需求相匹配。

如果以上方法都不能解决问题,可能需要进一步检查应用程序的日志和监控数据,以了解更多关于问题的详细信息。例如,可以查看应用程序是否有任何与Redis连接或操作相关的错误或异常,以及Redis服务器的状态和性能等。

 类似资料:
  • 我们创建的后门使用反向有效负载。为了处理反向有效负载,我们需要在Kali机器中打开一个端口,以便目标机器可以连接到它。当我们创建后门时将端口设置为,因此我们需要在Kali机器上打开8080端口。在此示例中,我们选择的有效负载的名称是。 现在,我们使用Metasploit框架拆分屏幕并侦听传入连接。我们使用命令来运行Metasploit,它应该生成类似于以下屏幕截图的输出: 要监听传入的连接,需要在

  • 问题内容: 我有一个页面,其中通过ajax引入了内容。我遇到的问题是在内容加载后添加相关的事件侦听器。有没有办法告诉浏览器再次从头开始运行所有脚本? 以下是从页面顶部开始运行的简单代码示例,显然,通过AJAX引入的与’.RRCustomizeBox .customize’匹配的任何新html元素都不会具有以下click事件。 例如: 谢谢你的提示 约翰 问题答案: 您可以将所有初始化代码放在函数中

  • 如题所示,有没有办法找回丢失的数据

  • 这只是一个例子。我有一个输入流,我想为它设置一个侦听器。我怎么能做到。第一种方法是创建一个后台线程,反复检查它。 但我认为没有线。睡眠会浪费cpu周期。和它一起;它会降低获取事件的准确性。假设输入流是一个无法访问的输出流正在写入的文件。这只是一个例子,说明我不知道这种后台线程的运行时成本。请解释一下。

  • 本文向大家介绍Android中监听判断网络连接状态的方法,包括了Android中监听判断网络连接状态的方法的使用技巧和注意事项,需要的朋友参考一下 在无网或网速差的状态下,没必要去连接服务器。 你可以使用 ConnectivityManager 来判断是否连到网络,以及网络类型。 判断是否有网络连接 下面的代码用ConnectivityManager查询是活动网络连接判断是否有Internet连接

  • Listener架构概述 Listener Listener.DrainType (Enum) Filter FilterChainMatch FilterChain Listener Listener proto { "name": "...", "address": "{...}", "filter_chains": [], "use_original_dst": "{...}