当前位置: 首页 > 知识库问答 >
问题:

无法获取Kafka主题中的邮件数

景翰音
2023-03-14

我对Kafka还很陌生。我已经用Java创建了一个示例生产者和消费者。使用producer,我可以将数据发送到kafka主题,但我无法使用以下消费者代码获取主题中的记录数。

    public class ConsumerTests {
        public static void main(String[] args) throws Exception {
            BasicConfigurator.configure();

            String topicName = "MobileData";
            String groupId = "TestGroup";
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "localhost:9092");
            properties.put("group.id", groupId);
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
            kafkaConsumer.subscribe(Arrays.asList(topicName));


try {
  while (true) {
    ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
    System.out.println("Record count is " + records.count());
  }
} catch (WakeupException e) {
  // ignore for shutdown
} finally {
  consumer.close();
}

}
}

共有1个答案

邹丰羽
2023-03-14

轮询(...)调用通常应处于循环中。在分区分配过程中,初始轮询(...)始终可能不返回数据(取决于超时时间)。这里有一个例子:

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    System.out.println("Record count is " + records.count());
  }
} catch (WakeupException e) {
  // ignore for shutdown
} finally {
  consumer.close();
}

欲知更多信息,请参阅相关文章:

 类似资料:
  • 下面的json数据示例 下面的错误消息 线程“main”org.apache.spark.sql.analysisException中出现异常:未能找到数据源:Kafka。请按照“结构化流+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookupdatasource(datasourc

  • 在我们的docker-swarm中运行kafka connect,使用以下撰写文件: kafka connect节点成功启动,我可以设置任务并查看这些任务的状态······ 我是否在撰写文件或任务配置中缺少某些配置?

  • 问题内容: 如何从代码中获取任何kafka主题的分区数。我研究了许多链接,但似乎没有一个起作用。 提及一些: http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count- through-simpleclient-api http://grokbase.com/t/kafka/users/151cv3htga/ge

  • 我有一个Kafka集群(版本:0.10.1.0),有9个代理和10个分区。 我尝试使用camel kafka从java应用程序中获取消息。这是我的pom。xml 这只是我使用的与骆驼Kafka相关的依赖项。下面是骆驼Kafka消费者代码。 我正在使用文档中指定的KafkaURIhttps://camel.apache.org/components/latest/kafka-component.ht

  • 我有一个Kafka集群(版本:0.10.1.0),有9个代理和10个分区。 我尝试使用camel kafka从java应用程序中消费消息 路由启动正常,但没有读取任何消息。 当我尝试使用camel kafka,路线如下时,我能够阅读信息 由于依赖关系,我只能使用camel kafka。我该如何解决这个问题?

  • 我正在运行ansible在一台服务器上安装Kafka。(在Dev env上,独立设置)但我的剧本在列出Kafka主题的任务列表时失败了。下面是日志。 Journal alctl-fu动物园管理员 /opt/kafka/kafka\u 2.12-2.2.2/config/zookeeper。属性 动物园管理员。服务