我可以在命令行上针对Kafka位置安装发送和接收消息。我也可以通过Java代码发送消息。这些消息显示在Kafka命令提示符中。我还有一个Kafka消费者的Java代码。代码昨天收到了消息。但是今天早上没有收到任何消息。代码没有更改。我想知道属性配置是否不太正确。这是我的配置:
制片人:
bootstrap.servers - localhost:9092
group.id - test
key.serializer - StringSerializer.class.getName()
value.serializer - StringSerializer.class.getName()
生产记录设置为
ProducerRecord<String, String>("test", "mykey", "myvalue")
消费者:
zookeeper.connect - "localhost:2181"
group.id - "test"
zookeeper.session.timeout.ms - 500
zookeeper.sync.time.ms - 250
auto.commit.interval.ms - 1000
key.deserializer - org.apache.kafka.common.serialization.StringDeserializer
value.deserializer - org.apache.kafka.common.serialization.StringDeserializer
对于Java代码:
Map<String, Integer> topicCount = new HashMap<>();
topicCount.put("test", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer
.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
少了什么?
可能会发生很多事情。
首先,消费者的ZooKeeper会话超时非常低,这意味着消费者可能会由于垃圾收集暂停而经历许多“软故障”。当这种情况发生时,消费者群体将重新平衡,从而暂停消费。如果这种情况发生得非常频繁,消费者可能会进入一种从不使用消息的状态,因为消息不断被重新平衡。我建议将ZooKeeper会话超时增加到30秒,看看这是否解决了问题。如果是这样,您可以尝试将其设置得更低。
其次,您能否确认正在为“测试”主题生成新消息?您的消费者只会消费它尚未提交的新消息。该主题可能没有任何新消息。
第三,您是否有同一消费者组中的其他消费者可以处理消息?如果一个消费者遇到频繁的软故障,其他消费者将被分配其分区。
最后,您使用的是最终将被删除的“旧”消费者。如果可能的话,我建议转移到Kafka 0.9中提供的“新”消费者(KafkaConsumer.java)。虽然我不能保证这会解决你的问题。
希望这有帮助。
我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是
我不知道是怎么回事,我的java客户机消费者用@KafkaListener注释后没有收到任何消息。当我通过命令行创建消费者时,它可以工作。同样,Producer也能按预期工作(同样在java中)。有人能帮我理解这种行为吗? application.yml 生产者配置: 消费者配置: 制作人 Spring控制器: 这是我的控制台输出,正如您所看到的,它发送一条消息,但该方法不接收任何内容。如果我没有
Kafka消费者不接收在消费者开始之前产生的消息。 ConsumerRecords始终为空 虽然,如果我启动我的消费者比生产者比它接收消息。(Kafka-客户端版本2.4.1)
我是Kafka的新手,运行一个简单的Kafka消费者/生产者的例子,就像在Kafka消费者和KafkaProducer上给出的那样。当我从终端运行消费者时,消费者正在接收消息,但我不能使用Java代码监听。我也在StackoverFlow上搜索了类似的问题(链接: Link1,Link2),并尝试了解决方案,但似乎没有什么对我有用。kafka版本:和相应的maven依赖在pom中使用。 Java生
我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新
我正在为Kafka0.9.0.0做Kafka快速入门。 我让zookeeper在监听,因为我运行了 只有一个代理在处侦听,因为我运行了 我有一个制作人在主题“测试”上发帖,因为我跑了 当我运行旧的API使用者时,它通过运行 但是,当我运行新的API使用者时,我在运行时没有得到任何东西 是否可以使用新的API从控制台使用者订阅主题?我该怎么修好它?