当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者只给出第一个产生的信息

张星洲
2023-03-14

我对Kafka很陌生。我使用的是Kafka0.9.0.0客户端for Java。在使用特定主题的数据时,当我启动生产者-消费者java项目时,我每次都得到相同的消息(这是第一次发布的消息)。

我的要求是生成一些消息并使用它,检查两个消息是否相同。

KafkaConsumer<String, String> newConsumer = new KafkaConsumer<String, String>(properties);
newConsumer.subscribe(Collections.singletonList(props.getProperty("monitoring.topic")));

String consumerRecord = "";
ConsumerRecords<String, String> consumerRecords = newConsumer.poll(120000);

for (ConsumerRecord<String, String> record : consumerRecords) {
    logger.info("Found message  for  {} {} {}", adapter, record.key(), record.value());
    System.out.println("consumerMessage : " + record.value());
    JSONObject jsonConsumerMessage = (JSONObject) (parser.parse(record.value()));

    Long offset = record.offset();
    System.out.println("Offset of this record is " + offset);
    String UUIDProducer = message.get("UUID").toString();

    String UUIDConsumer = jsonConsumerMessage.get("UUID").toString();
    System.out.println("UUIDProducer :  " + UUIDProducer);
    System.out.println("UUIDConsumer :  " + UUIDConsumer);
    if (UUIDProducer.equals(UUIDConsumer)) {
        return true;
    } else
        return false;
}

有谁能给我指点一下吗?

共有1个答案

鞠建安
2023-03-14

在for循环中返回true和false值是我的愚蠢错误。它会导致当第一条消息从主题传来时循环就会出现。

 类似资料:
  • 我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。

  • 在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?

  • 我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个

  • 我正在尝试使用NodeJS从远程机器连接到远程Apache Kafka服务器。我无法从nodejs代码中生成所需的kafka主题的消息。我也无法消费任何数据从主题以及。 我使用的是Apache-kafka版本2.122.2.1和Java8。我也在使用节点版本8.11.0。我还启动了zookeeper服务器和kafka服务器。我在ubuntu机器上本地创建了一个主题和一个生产者和消费者,以检查apa