我使用一个生产者在本地主机上运行的Kafka服务器中输入了一些消息。Zookeeper也在本地主机上。我使用了这里给出的ConsumerGroupExample
。
但是,消费者似乎没有收到任何消息!kafka-console-consumer.sh
脚本可以提取所有这些消息,但代码不能。怎么了?消费者代码与该页面上给出的代码完全相同。
zookeeper="localhost:2181", group id = "test-a", topic = "test".
这是我发布消息的同一主题。以下是制作人的代码:
package test;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
public class KakfaProducer {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("zookeeper.connect", "localhost:2181");
prop.put("metadata.broker.list", "localhost:9092");
prop.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(prop);
Producer<String, String> producer = new<String, String> Producer(producerConfig);
Task t = new Task(producer);
Thread newThread = new Thread(t);
newThread.start();
}
}
class Task implements Runnable {
Producer<String, String> producer;
public Task(Producer producer) {
this.producer = producer;
}
public void run() {
long num = 1;
while (true) {
String topic = "test";
KeyedMessage<String, String> message = new<String, String> KeyedMessage(topic, "Hello Test message " + num);
producer.send(message);
synchronized (this) {
num++;
}
if (num % 1000 == 0) {
System.out.println("Total messages sent by Thread-" + Thread.currentThread().getId() + num);
}
}
}
}
尝试将 group.id
更改为其他内容并进行测试。
当您使用 kafka-console-conumser.sh
测试使用者时,它会使用主题中的数据并更改偏移量。这就是您的使用者代码无法使用相同的 group.id
使用的原因。
此外,当您更改组时。id
,添加到您的消费者:
props.put("auto.offset.reset", "earliest");
这将确保您的新消费者群体从一开始就使用数据。
您的消费者没有收到任何数据:
>
您已使用同一组。id,并且它有一个新的偏移量。它将仅在Broker中有新数据时使用。(更改组.id
并再次测试。)
尝试 auto.offset.reset=最早
。使用者将从一开始就读取消息,您可以使用它来测试您的使用者。还有一些方法可以从某个偏移量读取。seek()
、seekToBeginning
() 和 seekToEnd()
。
我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是
我不知道是怎么回事,我的java客户机消费者用@KafkaListener注释后没有收到任何消息。当我通过命令行创建消费者时,它可以工作。同样,Producer也能按预期工作(同样在java中)。有人能帮我理解这种行为吗? application.yml 生产者配置: 消费者配置: 制作人 Spring控制器: 这是我的控制台输出,正如您所看到的,它发送一条消息,但该方法不接收任何内容。如果我没有
我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新
Kafka消费者不接收在消费者开始之前产生的消息。 ConsumerRecords始终为空 虽然,如果我启动我的消费者比生产者比它接收消息。(Kafka-客户端版本2.4.1)
我正在尝试设置一个基本的Java消费者来接收来自Kafka主题的消息。我在-https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例-并具有以下代码: 和 Kafka在有问题的EC2主机上运行,我可以使用kafka-console-producer.sh和kafka-console-consumer.sh工具发送和接收关于主题“测试
我是Kafka的新手,运行一个简单的Kafka消费者/生产者的例子,就像在Kafka消费者和KafkaProducer上给出的那样。当我从终端运行消费者时,消费者正在接收消息,但我不能使用Java代码监听。我也在StackoverFlow上搜索了类似的问题(链接: Link1,Link2),并尝试了解决方案,但似乎没有什么对我有用。kafka版本:和相应的maven依赖在pom中使用。 Java生