当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者-暂停特定Kafka主题分区的事件轮询,以将其用作延迟队列

巫煌
2023-03-14

在我们的系统中有一个场景,kafka主题XYZ用户详细信息由某个其他生产应用程序A(不同的系统)发布,而我的应用程序B正在使用该主题。

要求是应用程序B需要消耗该事件45分钟后(或任何可配置的时间),它被放在kafka主题XYZ由A(这种延迟的原因是,一些系统C的另一个REST api需要根据特定用户的此用户详细信息事件触发,以确认它是否为该用户设置了一些标志,并且该标志可以在45分钟持续时间内的任何点设置,尽管如果C没有能力发布到kafka或以任何方式通知我们,它可能已经解决)。

我们的应用程序B是在Spring编写的。

我尝试的解决方案是从Kafka获取事件,并检查队列中第一个事件的时间戳,如果该事件已经45分钟,则处理该事件,如果该时间小于45分钟,那么使用MessageListnerContainer pause()方法暂停轮询Kafka容器一段时间,直到达到45分钟。类似以下内容-

@KafkaListener(id = "delayed_listener", topics = "test_topic", groupId = "test_group")
        public void delayedConsumer(@Payload  String message,
                                    Acknowledgment acknowledgment) {

            UserDataEvent userDataEvent = null;
            try {
                 userDataEvent = this.mapper.readValue(message, TopicRequest.class);
            } catch (JsonProcessingException e) {
                logger.error("error while parsing message");
            }
            MessageListenerContainer delayedContainer = this.kafkaListenerEndpointRegistry.getListenerContainer("delayed_listener");
            if (userDataEvent.getPublishTime() > 45 minutes) // this will be some configured value
 {
                long sleepTimeForPolling = userDataEvent.getPublishTime() - System.currentTimeMillis();
                // give negative ack to put already polled messages back to kafka topic
                acknowledgment.nack(1000);
                // pause container, and later resume it  
                delayedContainer.pause();
                ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
                scheduledExecutorService.schedule(() -> {
                    delayedContainer.resume();
                }, sleepTimeForPolling, TimeUnit.MILLISECONDS);
                return;
            }
            // if message was already 45 minutes old then process it
            this.service.processMessage(userDataEvent);
            acknowledgment.acknowledge();
        }

虽然它适用于单个分区,但我不确定这是否是一个正确的方法,对此有什么评论吗?我还看到多个分区,它会导致问题,因为上面的暂停方法调用将暂停整个容器,如果其中一个分区有旧消息,如果容器因其他分区中的新消息而暂停,它将不会被消费。我可以在分区级别以某种方式使用此暂停逻辑吗?

在一定数量的可配置时间后,有什么更好/推荐的解决方案来实现这种延迟处理,在这种情况下我可以采用这些时间,而不是做我上面所做的?

共有1个答案

柯捷
2023-03-14

Kafka并不是真的为这样的场景设计的。

我可以看到这种技术工作的一种方式是将容器并发设置为与主题中的分区数量相同,以便每个分区由不同线程上的不同消费者处理;然后暂停/恢复单个消费者

为此,请添加 消费者

 类似资料:
  • 假设我有一个名为“MyTopic”的主题,它有3个分区P0、P1和P2。这些分区中的每一个都有一个leader,并且本主题的数据(消息)分布在这些分区中。 1.Producer将始终根据代理上的负载以循环方式写到分区的领导者。对吗? 2.制作人如何认识隔断的首领?

  • 我正在使用Spring Kafka1.0.3来消费kafka消息。Kafka的2个主题,每个主题有1个分区。在java代码中,有2@KafKalistener来消费每个主题消息。ConcurrentKafkaListenerContainerFactory的并发设置为1。但消息有时会延迟20秒以上。 有人知道为什么吗? 添加调试日志,并且延迟不是每次都可以,有时也可以:

  • 我正在使用@StreamListener(Spring-Cloud-Stream)来使用来自主题(输入通道)的消息,进行一些处理并保存到一些缓存或数据库中。 我的要求是,如果DB在处理消费的消息时停止,我想暂停主消费者(输入通道),并从另一个主题(输入56通道)开始消费,一旦它消费了来自输入56通道的所有消息(没有很多),我想再次恢复主消费者(输入通道)。 这能做到吗??

  • 我有一个Kafka主题,并为其附加了1个消费者(主题只有1个分区)。现在对于超时,我使用默认值(心跳:3秒,会话超时:10秒,轮询超时:5分钟)。 根据留档,轮询超时定义消费者必须在其他代理将该消费者从消费者组中删除之前处理消息。现在假设,消费者只需1分钟即可完成处理消息。 现在我有两个问题

  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

  • Avro对单个Kafka主题的信息进行编码,单个分区。这些消息中的每一条都只能由特定的消费者使用。对于ex,关于这个主题的消息a1、a2、b1和c1,有3个消费者,分别名为A、B和C,每个消费者将获得所有消息,但最终A将使用a1和a2、b1上的B和c1上的C。 我想知道当在Kafka上使用avro时,这是如何典型地解决的: 让使用者反序列化消息,然后由某个应用程序逻辑决定使用消息还是删除消息 使用