当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者不接收旧消息

凌通
2023-03-14

Kafka消费者不接收在消费者开始之前产生的消息。

 public class MyKafkaConsumer {
    private final KafkaConsumer<String, String> consumer;
    private final String TOPIC="javaapp";
    private final String BOOTSTRAP_SERVERS="localhost:9092";
    private int receivedCounter=0;
    private ExecutorService executorService=Executors.newFixedThreadPool(1);

    private BlockingQueue<ConsumerRecords<String, String>> queue=new LinkedBlockingQueue<>(500000);
    private MyKafkaConsumer() {
        final Properties props=new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaGroup6");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumer=new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
    }

    public static void main(String[] args) throws InterruptedException {
        MyKafkaConsumer perfKafkaConsumer=new MyKafkaConsumer();
        perfKafkaConsumer.consumeMessage();
        perfKafkaConsumer.runConsumer();
    }

    private void runConsumer() throws InterruptedException {
        consumer.poll(Duration.ofMillis(1000));
        while (true) {
            final ConsumerRecords<String, String> consumerRecords=consumer.poll(Duration.ofMillis(10000));
            if (!consumerRecords.isEmpty()) {
                System.out.println("Adding result in queue " + queue.size());
                queue.put(consumerRecords);
            }
            consumer.commitAsync();

        }
    }

    private void consumeMessage() {
        System.out.println("Consumer starts at " + Instant.now());
        executorService.submit(() -> {
            while (true) {
                ConsumerRecords<String, String> poll=queue.take();
                poll.forEach(record -> {
                    System.out.println("Received " + ++receivedCounter + " time " + Instant.now(Clock.systemUTC()));
                });

            }
        });
    }
}

ConsumerRecords始终为空

虽然,如果我启动我的消费者比生产者比它接收消息。(Kafka-客户端版本2.4.1)

共有1个答案

鱼安然
2023-03-14

AUTO.OFFSET.RESET使用者设置控制新使用者组将从何处开始使用某个主题。默认情况下,它被设置为“最新”,这会将消费者组的偏移量设置为最新的偏移量。如果所有消费者组都应从主题中最早的偏移量开始,则您希望将此设置为“最早”。

 类似资料:
  • 我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是

  • 我不知道是怎么回事,我的java客户机消费者用@KafkaListener注释后没有收到任何消息。当我通过命令行创建消费者时,它可以工作。同样,Producer也能按预期工作(同样在java中)。有人能帮我理解这种行为吗? application.yml 生产者配置: 消费者配置: 制作人 Spring控制器: 这是我的控制台输出,正如您所看到的,它发送一条消息,但该方法不接收任何内容。如果我没有

  • 我是Kafka的新手,运行一个简单的Kafka消费者/生产者的例子,就像在Kafka消费者和KafkaProducer上给出的那样。当我从终端运行消费者时,消费者正在接收消息,但我不能使用Java代码监听。我也在StackoverFlow上搜索了类似的问题(链接: Link1,Link2),并尝试了解决方案,但似乎没有什么对我有用。kafka版本:和相应的maven依赖在pom中使用。 Java生

  • 我可以在命令行上针对Kafka位置安装发送和接收消息。我也可以通过Java代码发送消息。这些消息显示在Kafka命令提示符中。我还有一个Kafka消费者的Java代码。代码昨天收到了消息。但是今天早上没有收到任何消息。代码没有更改。我想知道属性配置是否不太正确。这是我的配置: 制片人: 生产记录设置为 消费者: 对于Java代码: 少了什么?

  • 我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新

  • 我正在为Kafka0.9.0.0做Kafka快速入门。 我让zookeeper在监听,因为我运行了 只有一个代理在处侦听,因为我运行了 我有一个制作人在主题“测试”上发帖,因为我跑了 当我运行旧的API使用者时,它通过运行 但是,当我运行新的API使用者时,我在运行时没有得到任何东西 是否可以使用新的API从控制台使用者订阅主题?我该怎么修好它?