当前位置: 首页 > 知识库问答 >
问题:

如何配置频率Kafka消费者投票在sping-kafka

云洋
2023-03-14

我试图在我的spring boot项目中使用spring kafka来阅读来自我的kafka的消息。我正在使用@KafkaListener,但问题是我的消费者总是在运行。只要我从控制台生成一条消息,它就会在我的应用程序中弹出。我想定期投票。我怎样才能做到这一点?

@Service
public class KafkaReciever {

private static final Logger LOGGER =
        LoggerFactory.getLogger(KafkaReciever.class);

private CountDownLatch latch = new CountDownLatch(1);

public CountDownLatch getLatch() {
    return latch;
}

@KafkaListener(topics = "test")
public void receive(String payload) {
    LOGGER.info("received payload='{}'", payload);
    latch.countDown();
}

}

这是我的消费者配置:

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();

    // list of host:port pairs used for establishing the initial connections to the Kafka cluster
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    // allows a pool of processes to divide the work of consuming and processing records
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo1");

    // automatically reset the offset to the earliest offset
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}

共有2个答案

高夜洛
2023-03-14

在2.3版本中,添加了一个名为idleBetweenPolls的新属性。

从2.3版开始,ContainerProperties提供了一个idleBetweenPolls选项,让侦听器容器中的主循环在KafkaConsumer.poll()调用之间Hibernate。从提供的选项中选择一个实际的Hibernate间隔作为最小值,以及max.poll.interval.ms消费者配置和当前记录批次处理作业时间之间的差异。

通过这种方式,您可以添加消费者调查之间的间隔。

来源:https://docs.spring.io/spring-kafka/reference/html

吕文林
2023-03-14

这就是它的设计方式;它是一个消息驱动的容器(与其他Spring消息传递技术抽象-RabbitMQ、JMS等一致)。

要仅按需获取消息,您有两种选择:

  • 使用消费者工厂创建消费者,订阅(或分配)主题/分区,并调用poll()

在这两种情况下,如果您使用的是kafka组管理,则需要注意max.poll.interval.ms以避免重新平衡。

您可以使用spring集成入站通道适配器定期轮询消息源。

 类似资料:
  • 我想在特定时间停止对特定主题的轮询。 Spring防尘套2.X Springkafka 2.5.5 Kafka版本2.5.1 比如即使有消息进来测试题目分区,消息也是从00到01堆在分区里,没有消耗。 01点之后,我想再次使用有关TEST主题的消息。 如何暂停和恢复?

  • 我在使用者组中轮询来自 Kafka 的消息时遇到问题。我的使用者对象分配给给定的分区 之后,消费者向该分区分配: 之后,我可以计算分区内的消息 和 ..... 在我的主题中有超过30000条消息。问题是我只收到一条消息。 具有< code > max _ poll _ records = 200 < code > AUTO _ OFFSET _ RESET 的消费者配置是最早的 这是我的函数,我正

  • 假设我的使用者从一个代理轮询,该代理有多个主题,每个主题有多个分区。我在同一个消费群体中总共有5个消费者。如果我的每个消费者都进行投票,将返回的数据顺序是什么? topicD-分区5 我的问题是,在这个单一的1轮询中,在按顺序移动到下一个主题/分区之前,我会收到来自该主题/分区的所有可用消息吗?意思例如: 在一次投票循环中,我收到了这个... 或者在那个单一的1轮询循环中,有可能接收到这个消息顺序

  • 我的问题是,我无法足够快地轮询我的队列,以保持我的队列为空或接近空。我最初的想法是,我可以让使用者以x/s的速率通过Camel从SQS接收消息。从那里,我可以简单地创建更多的消费者,以达到我需要的消息处理速度。 我的消费者: 如图所示,我设置了和以提高消息的速率,但是我无法生成具有相同endpoint的多个使用者。 我在文档中读到,我相信SQSendpoint也是如此,因为生成多个使用者将只给我一

  • 我有几个连接到Kafka集群的消费者,但我无法控制。同时,我想了解这些消费者是如何配置的。 有没有一个API可以列出所有的消费者(如果有发布者的话,这是一个额外的好处),然后读取他们所有的配置?我说的是这些消费者设置: https://docs . confluent . io/current/installation/configuration/consumer-configs . html #

  • Kafka 消费者在每个投票中轮询 500 条消息。我们禁用了, 假设我们已成功处理 100 条消息,偏移量也为 100 现在在第101条消息中,我们遇到了一个错误,我们没有提交偏移量 但是因为我们已经有了500条消息,所以我们处理了第102条消息,我们成功地处理了它,并且我们还提交了第102条消息的偏移量。 雀: 第 101 条消息会发生什么。 如何克服这个问题。