当前位置: 首页 > 知识库问答 >
问题:

使用者读取__consumer_offsets时会传递无法读取的消息

封昊天
2023-03-14

我试图从__consumer_offsets主题中使用,因为这似乎是检索关于消费者的kafka度量(如消息滞后等)的最简单的方法。理想的方法是从jmx访问它,但希望先尝试一下,返回的消息似乎是加密的或不可读的。尝试添加stringDeserializer属性。有没有人对如何纠正这一点有什么建议?这里的提法也是重复的

重复的consumer_offset

没有帮助,因为它没有引用我的问题,即在Java中以字符串形式读取消息。还更新了代码,使用kafka.client消费者尝试consumerRecord。

consumerProps.put("exclude.internal.topics",  false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);


consumerProps.put("key.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");  
consumerProps.put("value.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");

ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(
       consumerConfig);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
   consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

for (KafkaStream stream : streams) {

    ConsumerIterator<byte[], byte[]> it = stream.iterator();

    //errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");

    while (it.hasNext()) {
         try {

             String mesg = new String(it.next().message());
             System.out.println( mesg);

代码更改:

try {       
    // errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");   
    Properties consumerProps = new Properties();
    consumerProps.put("exclude.internal.topics",  false);
    consumerProps.put("group.id" , "test");
    consumerProps.put("bootstrap.servers", servers);
    consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");  
    consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    //ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
    //ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
    //       consumerConfig);

    //Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    //topicCountMap.put(topic, new Integer(1));
    //Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    //List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps); 
    kconsumer.subscribe(Arrays.asList(topic)); 

    try {
        while (true) {
            ConsumerRecords<String, String> records = kconsumer.poll(10);

            for (ConsumerRecord<String, String> record : records)

                System.out.println(record.offset() + ": " + record.value());
        }
    } finally {
          kconsumer.close();
    }    

以及以下消息的快照;在图像的底部:

共有1个答案

钦耀
2023-03-14

虽然可以直接读取__consumer_offsets主题,但这不是推荐的或最简单的方法。

如果您可以使用Kafka 2.0,最好的方法是使用AdminClient API来描述组:

  • ListConsumerGroupOffSets():查找特定组的所有偏移量
  • descripeConsumerGroups():查找组成员的详细信息

ReadoffsetMessageValue()可用于解码消息值(用于offsetKey的键)并查找偏移量信息。

您链接的问题的答案包含执行所有这些操作的框架代码。

还要注意,您不应该将记录反序列化为string,而是将它们保留为原始字节,以便这些方法能够正确解码它们。

 类似资料:
  • 我目前正在为我正在制作的一款游戏进行单元测试,但在maven身上遇到了一个奇怪的错误,我无法理解。 我已经运行了,现在我的测试失败了。 以下是我在运行mvn测试-X时遇到的错误: 这是我的Maven文件: 这是JUnit测试的代码: 下面是我得到错误的代码: 我很确定它与maven文件有关,但我不确定它是什么。也许是我正在使用的图像阅读库?任何帮助都将不胜感激。

  • 因此,根据我对Apache Kafka中事务的理解,read_committed消费者不会返回作为正在进行的事务一部分的消息。因此,我猜想,消费者可以选择将其偏移量提交给那些正在进行的事务消息(例如,读取非事务消息),或者可以选择在提交/中止遇到的事务之前不进一步推进。我只是假设(Kafka)允许跳过那些挂起的交易记录,但考虑到它的抵消可能已经很远了,那么消费者在提交时将如何读取它们呢? 更新 考

  • 问题内容: 尝试通过ServiceStack.Redis读取Redis列表时,间歇性出现以下错误:“无法从传输连接读取数据:已建立的连接被主机中的软件中止了”。我想知道我如何使用ServiceStack可靠地连接和池化Redis的整个概念是否是错误的。这是我使用密封类和单例模式进行连接的代码: 然后,我实例化另一个使用单例的类: 这又是从“服务” DTO回调中实例化和调用的: 然后,我使用“邮递员

  • 我们计划有多个Kafka消费者(Java),它们具有相同的组ID..所以如果它从分配的分区中顺序读取,那么我们如何实现高吞吐量..例如,生产者每秒发布40条消息...消费者每秒处理1条消息...虽然我们可以有多个消费者,但不能有40条RT???如果我错了就纠正我... 在我们的情况下,使用者必须提交偏移量,只有在消息处理成功后...否则消息将被重新处理...有没有更好的解决方法???

  • 有没有解决这个问题的方法???我无法读取KAFKA-AVRO架构消息。我正在尝试将消息从logstash发送到KAFKA到hdfs。 以下是技术堆栈: LogStash 2.3-当前生产版本 汇流3.0。 插件:A。Logstash-kafka-Output插件B。logstash-codec-avro。 动物园管理员:3.4.6 Kafka:0.10.0.0 Logstash配置文件如下所示:

  • 我们正在使用Kafka流将数据写入接收器主题。我正在运行一个avro消费者命令行来检查接收器主题中是否有数据: bin/kafka-avro控制台-消费者-主题sink.output.topic-从开始-新消费者-引导-服务器 当我在kafka streams应用程序运行时同时运行消费者时,我会看到数据,但如果我停止消费者并在几分钟后再次运行,我不会看到任何数据。几乎没有可能: 1) 这是因为Ka