当前位置: 首页 > 知识库问答 >
问题:

如何在一段时间间隔后从Kafka消费者处读取消息

法景明
2023-03-14

在我的Spring启动应用程序中,我有kafka消费者类,每当主题中有可用消息时,它会频繁读取消息。我想限制消费者每隔2小时消费一次消息。就像阅读完一条消息后,消费者将暂停2小时,然后再消费另一条消息。这是我的消费者配置方法:-

@Bean
public Map<String, Object> scnConsumerConfigs() {
    Map<String, Object> propsMap = new HashMap<>();

    // common props
    logger.info("KM Dataloader :: Kafka Brokers for Software topic: {}", bootstrapServersscn);
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersscn);
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 7200000);

    // ssl props
    propsMap.put("security.protocol", mpaasSecurityProtocol);
    propsMap.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
    propsMap.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword);
    propsMap.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystorePath);
    propsMap.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword);
    return propsMap;
}

然后我创建了这个容器方法,在其中我设置了kafka配置的其余部分

    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    LOGGER.info("Setting concurrency to {} for {}", config.getConcurrency(), topicName);
    factory.setConcurrency(config.getConcurrency());
    factory.setConsumerFactory(cFactory);
    factory.setRetryTemplate(retryTemplate);
    factory.getContainerProperties().setIdleBetweenPolls(7200000);
    return factory;

使用此代码分区每2小时重新平衡一次,但它根本没有读取消息。我的kafka消费者方法:-

@Bean
public KmKafkaListener softwareKafkaListener(KmSoftwareService softwareService) {
    return new KmKafkaListener(softwareService) {
        @KafkaListener(topics = SOFTWARE_TOPIC, containerFactory = "softwareMessageContainer", groupId = SOFTWARE_CONSUMER_GROUP)
        public void onscnMessageforSA20(@Payload ConsumerRecord<String, Object> record)
                throws InterruptedException {
            this.onMessage(record);
        }
    };
}

共有1个答案

王云
2023-03-14

尝试在<code>KmKafkaListener</code>中添加<code>@KafkaListener</code>带注释的方法,这样Spring kafka将负责调用它。

public class KmKafkaListener{

  @KafkaListener(topics = SOFTWARE_TOPIC, containerFactory = "softwareMessageContainer", groupId = SOFTWARE_CONSUMER_GROUP)
  public void onscnMessageforSA20(@Payload ConsumerRecord<String, Object> record)
                throws InterruptedException {
            this.onMessage(record);
        }
}

并以这种方式初始化bean

@Bean
public KmKafkaListener softwareKafkaListener(KmSoftwareService softwareService) {
    return new KmKafkaListener(softwareService);
}
 类似资料:
  • 我有一个Kafka主题,并为其附加了1个消费者(主题只有1个分区)。现在对于超时,我使用默认值(心跳:3秒,会话超时:10秒,轮询超时:5分钟)。 根据留档,轮询超时定义消费者必须在其他代理将该消费者从消费者组中删除之前处理消息。现在假设,消费者只需1分钟即可完成处理消息。 现在我有两个问题

  • 我有一种在shell中执行此任务的方法:如何使kafka消费者从上次消耗的偏移量读取,而不是从开始读取 但是,我愿意在Python中这样做,使用 我找不到任何关于这种情况的api。 http://kafka-python.readthedocs.io/en/latest/apidoc/KafkaConsumer.html

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 问题内容: 这可能是一个重复的问题,但我没有找到想要的东西。我在UI活动中调用AsyncTask, 在doInBackground中调用需要时间的方法。如果一段时间后没有返回数据,我想中断该线程。以下是我尝试执行此操作的代码。 但这并不能在30秒后停止任务,事实上,这花费了更多时间。我也尝试过,但这也不起作用。 谁能告诉我该怎么做或如何在doInBackground中使用isCancelled()

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我们有一个具有Ha all策略的2节点RabbitMQ集群。我们在应用程序中使用Spring AMQP与RabbitMQ对话。生产者部分工作正常,但消费者工作了一段时间并暂停。生产者和消费者作为不同的应用程序运行。更多关于消费者部分的信息。 我们将与一起使用,使用手动模式和默认 在我们的应用程序中,我们创建队列(按需)并将其添加到侦听器中 当我们从10个和20个开始时,消费大约持续15个小时并暂停