当前位置: 首页 > 知识库问答 >
问题:

面向ConcurrentKafkaListenerContainerFactory的Spring-Kafka消费者群体协调

唐伟
2023-03-14

关于斯普林斯-Kafka在某些场景中的行为,我有几个问题。任何答案或指针都将是伟大的。

背景:我正在构建一个与外部API对话并返回确认的kafka消费者。我的配置如下所示:

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServers());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, this.configuration.getString("kafka-generic.consumer.group.id"));
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000000");
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "6000000");

    return props;
}


@Bean
public RetryTemplate retryTemplate() {
    final ExponentialRandomBackOffPolicy backOffPolicy = new ExponentialRanhtml" target="_blank">domBackOffPolicy();
    backOffPolicy.setInitialInterval(this.configuration.getLong("retry-exp-backoff-init-interval"));
    final SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(this.configuration.getInt("retry-max-attempts"));
    final RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy);
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> retryKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Event> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    factory.setConcurrency(this.configuration.getInt("kafka-concurrency"));
    factory.setRetryTemplate(retryTemplate());
    factory.getContainerProperties().setIdleEventInterval(this.configuration.getLong("kafka-rtm-idle-time"));
    //factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setErrorHandler(kafkaConsumerErrorHandler);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    return factory;
}
@KafkaListener(topicPartitions = @TopicPartition(topic = "topic", partitions = {"0", "1"}),
            containerFactory = "retryKafkaListenerContainerFactory")
public void receive(Event event, Acknowledgment acknowledgment) throws Exception {
    serviceInvoker.callService(event);
    acknowledgment.acknowledge();
}

@KafkaListener(topicPartitions = @TopicPartition(topic = "topic", partitions = {"2", "3"}),
        containerFactory = "retryKafkaListenerContainerFactory")
public void receive1(Event event, Acknowledgment acknowledgment) throws Exception {
    serviceInvoker.callService(event);
    acknowledgment.acknowledge();
}
  @Named
        public class KafkaConsumerErrorHandler implements ErrorHandler {
          @Inject
          private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

          @Override
          public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
              System.out.println("Shutting down all the containers");
              kafkaListenerEndpointRegistry.stop();
          }
        }

让我们来讨论一个场景,其中调用了消费者的kafkalistener,它调用ServiceInvoker.callService(event);但是服务关闭,然后根据RetryKafkalistenerContainerFactory,它重试3次然后失败,然后调用errorhandler,从而停止KafkalistenerEndpointRegistry。这将关闭具有相同消费者组的所有其他消费者或机器,还是仅此消费者或机器?

让我们谈谈2中的场景。有什么配置我们需要改变,让Kafka知道推迟确认这么长时间吗?

我的Kafka制作人每10分钟产生一次信息。我需要在我的消费者代码中的任何地方配置这10分钟吗?

任何帮助都是非常感谢的。提前谢了。:)

共有1个答案

殷永嘉
2023-03-14
  1. 正确;只有%1将获得它。
  2. 它只会停止本地容器--Spring不知道其他实例的任何情况。
  3. 因为您有ackonerror=false,所以不会提交偏移量。
  4. 使用者不需要知道消息发布的频率。
  5. 不能在运行时更改它们,但可以使用属性占位符${...}或Spel表达式#{...}在应用程序初始化期间设置它们。
 类似资料:
  • 我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群

  • 我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,

  • 我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable

  • 我是Kafka的新手。我看了一眼Kafka文档。似乎分派给订阅消费者组的消息是通过将分区与消费者实例绑定来实现的。 在使用Apache Kafka时,我们应该记住一件重要的事情,即同一消费者组中的消费者数量应该小于或等于所使用主题中的分区数量。否则,将不会收到来自主题的任何消息。 在非prod环境中,我没有配置主题分区。在这种情况下,Kafka是否只有一个分区。如果我启动共享同一组的多个消费者并向

  • 我正处于探索Kafka0.8.1.1版本的初始阶段。 使用API进行触发器再平衡 将kafka配置为等待消费者活动一段时间,并假设它被不优雅地关闭,自动重新平衡。 这里的问题是,分配给死亡使用者的分区中的所有消息都保留在队列中,并且在重新平衡发生之前不会被处理。

  • 当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了