我试图从__consumer_offsets主题中使用,因为这似乎是检索关于消费者的kafka度量(如消息滞后等)的最简单的方法。理想的方法是从jmx访问它,但希望先尝试一下,返回的消息似乎是加密的或不可读的。尝试添加stringDeserializer属性。有没有人对如何纠正这一点有什么建议?这里的提法也是重复的
重复的consumer_offset
没有帮助,因为它没有引用我的问题,即在Java中以字符串形式读取消息。还更新了代码,使用kafka.client消费者尝试consumerRecord。
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(
consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for (KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
//errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");
while (it.hasNext()) {
try {
String mesg = new String(it.next().message());
System.out.println( mesg);
代码更改:
try {
// errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");
Properties consumerProps = new Properties();
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , "test");
consumerProps.put("bootstrap.servers", servers);
consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
//ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
// consumerConfig);
//Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//topicCountMap.put(topic, new Integer(1));
//Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
//List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps);
kconsumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = kconsumer.poll(10);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
}
} finally {
kconsumer.close();
}
以及以下消息的快照;在图像的底部:
虽然可以直接读取__consumer_offsets
主题,但这不是推荐的或最简单的方法。
如果您可以使用Kafka 2.0,最好的方法是使用AdminClient API来描述组:
ReadoffsetMessageValue()可用于解码消息值(用于offsetKey
的键)并查找偏移量信息。
您链接的问题的答案包含执行所有这些操作的框架代码。
还要注意,您不应该将记录反序列化为string,而是将它们保留为原始字节,以便这些方法能够正确解码它们。
我目前正在为我正在制作的一款游戏进行单元测试,但在maven身上遇到了一个奇怪的错误,我无法理解。 我已经运行了,现在我的测试失败了。 以下是我在运行mvn测试-X时遇到的错误: 这是我的Maven文件: 这是JUnit测试的代码: 下面是我得到错误的代码: 我很确定它与maven文件有关,但我不确定它是什么。也许是我正在使用的图像阅读库?任何帮助都将不胜感激。
因此,根据我对Apache Kafka中事务的理解,read_committed消费者不会返回作为正在进行的事务一部分的消息。因此,我猜想,消费者可以选择将其偏移量提交给那些正在进行的事务消息(例如,读取非事务消息),或者可以选择在提交/中止遇到的事务之前不进一步推进。我只是假设(Kafka)允许跳过那些挂起的交易记录,但考虑到它的抵消可能已经很远了,那么消费者在提交时将如何读取它们呢? 更新 考
问题内容: 尝试通过ServiceStack.Redis读取Redis列表时,间歇性出现以下错误:“无法从传输连接读取数据:已建立的连接被主机中的软件中止了”。我想知道我如何使用ServiceStack可靠地连接和池化Redis的整个概念是否是错误的。这是我使用密封类和单例模式进行连接的代码: 然后,我实例化另一个使用单例的类: 这又是从“服务” DTO回调中实例化和调用的: 然后,我使用“邮递员
我们计划有多个Kafka消费者(Java),它们具有相同的组ID..所以如果它从分配的分区中顺序读取,那么我们如何实现高吞吐量..例如,生产者每秒发布40条消息...消费者每秒处理1条消息...虽然我们可以有多个消费者,但不能有40条RT???如果我错了就纠正我... 在我们的情况下,使用者必须提交偏移量,只有在消息处理成功后...否则消息将被重新处理...有没有更好的解决方法???
有没有解决这个问题的方法???我无法读取KAFKA-AVRO架构消息。我正在尝试将消息从logstash发送到KAFKA到hdfs。 以下是技术堆栈: LogStash 2.3-当前生产版本 汇流3.0。 插件:A。Logstash-kafka-Output插件B。logstash-codec-avro。 动物园管理员:3.4.6 Kafka:0.10.0.0 Logstash配置文件如下所示:
我们正在使用Kafka流将数据写入接收器主题。我正在运行一个avro消费者命令行来检查接收器主题中是否有数据: bin/kafka-avro控制台-消费者-主题sink.output.topic-从开始-新消费者-引导-服务器 当我在kafka streams应用程序运行时同时运行消费者时,我会看到数据,但如果我停止消费者并在几分钟后再次运行,我不会看到任何数据。几乎没有可能: 1) 这是因为Ka