我对Kafka很陌生。我使用的是Kafka0.9.0.0客户端for Java。在使用特定主题的数据时,当我启动生产者-消费者java项目时,我每次都得到相同的消息(这是第一次发布的消息)。
我的要求是生成一些消息并使用它,检查两个消息是否相同。
KafkaConsumer<String, String> newConsumer = new KafkaConsumer<String, String>(properties);
newConsumer.subscribe(Collections.singletonList(props.getProperty("monitoring.topic")));
String consumerRecord = "";
ConsumerRecords<String, String> consumerRecords = newConsumer.poll(120000);
for (ConsumerRecord<String, String> record : consumerRecords) {
logger.info("Found message for {} {} {}", adapter, record.key(), record.value());
System.out.println("consumerMessage : " + record.value());
JSONObject jsonConsumerMessage = (JSONObject) (parser.parse(record.value()));
Long offset = record.offset();
System.out.println("Offset of this record is " + offset);
String UUIDProducer = message.get("UUID").toString();
String UUIDConsumer = jsonConsumerMessage.get("UUID").toString();
System.out.println("UUIDProducer : " + UUIDProducer);
System.out.println("UUIDConsumer : " + UUIDConsumer);
if (UUIDProducer.equals(UUIDConsumer)) {
return true;
} else
return false;
}
有谁能给我指点一下吗?
在for循环中返回true和false值是我的愚蠢错误。它会导致当第一条消息从主题传来时循环就会出现。
我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?
我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断
向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前
我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个
我正在尝试使用NodeJS从远程机器连接到远程Apache Kafka服务器。我无法从nodejs代码中生成所需的kafka主题的消息。我也无法消费任何数据从主题以及。 我使用的是Apache-kafka版本2.122.2.1和Java8。我也在使用节点版本8.11.0。我还启动了zookeeper服务器和kafka服务器。我在ubuntu机器上本地创建了一个主题和一个生产者和消费者,以检查apa