当前位置: 首页 > 知识库问答 >
问题:

Kafka消费品属性从一开始就是一个话题

龙智
2023-03-14

我试图写一个Kafka消费者从一开始就消费这些信息。我可以从控制台消费者开始使用同样的方法

但是我在JAVA API中找不到相应的属性。

 def consumeFromKafka(topic: String) = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Arrays.asList(topic))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator)
        println(data.value())
    }
  }

还有一个问题是什么应该是价值。Avro消息的反序列化程序?

共有1个答案

王航
2023-03-14

可通过设置auto来实现Kafka控制台消费者中使用的影响--从一开始。抵消将重置为最早的。与独特的/新的组组合使用。id它具有相同的效果。

基本上,您想要创建一个新的消费者组(通过group.id),由于Kafka Broker不知道这个消费者组,它会根据配置自动重置这个消费者组的偏移量auto.offset.reset.当设置为最早时,它将从头开始。当设置为最新的时,它等待新的传入数据。

关于Avro反序列化,这里可能会有所帮助。

 类似资料:
  • 我在本地机器上安装了Kafka,并启动了zookeeper和一个代理服务器。 现在我有一个单独的主题,描述如下: 我有一个生产者在消费者启动之前产生了一些消息,如下所示: 当我使用--从头开始选项启动消费者时,它不会显示生产者生成的所有消息: 但是,它显示的是新添加的消息。 我在这里怎么了?有什么帮助吗?

  • 我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。

  • 我看了Kafka的文件,还不知道如何消费一个话题平行? 假设:我有一个像“发生了一些事情”这样的话题(不要拆分这个话题),我有很多想消费它的客户。那么该怎么办,让多个客户并行消费呢?我应该使用分区和客户群吗?

  • 我正在使用spring boot构建一个web应用程序,现在我需要接收实时通知。我正计划使用apache kafka作为这方面的消息代理。要求用户具有不同的角色,并且根据角色,他们应该接收其他用户正在执行的操作的通知。 我设置了一个生产者和消费者,作为消费者,我可以接收发布到一个主题的信息,比如说topic1。 我遇到的问题是,我可以让多个用户收听同一个主题,而每个用户都应该得到发布到该主题的消息

  • 虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?

  • null 我在这一页上读到以下内容: 使用者从任何单个分区读取,允许您以与消息生成类似的方式扩展消息消耗的吞吐量。 也可以将使用者组织为给定主题的使用者组-组内的每个使用者从唯一分区读取,并且组作为一个整体使用来自整个主题的所有消息。 如果使用者多于分区,则某些使用者将空闲,因为它们没有可从中读取的分区。 如果分区多于使用者,则使用者将从多个分区接收消息。 如果使用者和分区的数量相等,则每个使用者