当前位置: 首页 > 知识库问答 >
问题:

获取发送到Kafka主题的最后一条消息

充小云
2023-03-14

我是Kafka的新手,正在开发一个原型,将专有的流媒体服务连接到Kafka中。

我希望得到一个主题上发送的最后一条消息的密钥,因为我们的内部流消费者需要用连接时收到的最后一条消息的ID登录。

我尝试使用使用者执行以下操作,但当同时运行控制台使用者时,我看到消息被重播。

    // Poll so we know we're connected
    consumer.poll(100);
    // Get the assigned partitions
    Set<TopicPartition> assignedPartitions = consumer.assignment();
    // Seek to the end of those partitions
    consumer.seekToEnd(assignedPartitions);

    for(TopicPartition partition : assignedPartitions) {
        final long offset = consumer.committed(partition).offset();
        // Seek to the previous message
        consumer.seek(partition,offset - 1);
    }

    // Now get the last message
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        lastKey = record.key();
    }
    consumer.close();

这是意料之中的行为还是我走错了路?

共有1个答案

贺乐意
2023-03-14

问题出在final long offset=consumer.committe(partition).offset()上,因为链接api引用committe方法是获取给定分区的最后提交的偏移量,即使用者告诉kafka服务器它已经读取的最后一个偏移量。因此,肯定会重播消息,因为您总是从特定的偏移量读取。因为我想我只需要移除第一个块。

 类似资料:
  • 我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测

  • 是否可以验证/筛选发送到Kafka主题的消息?

  • 我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。

  • 我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)

  • 我以前是学习Kafka的传统ActiveMQ用户。我有一个问题。 使用Active MQ,您可以执行以下操作: 将100条消息提交到队列中 我试着在Kafka做同样的事情 如果不启动Consumer,等待它启动,然后运行producer,则此示例不起作用。 谁能告诉我如何修改我的示例程序,以在消息等待被消费的地方执行操作?

  • 我正在使用kafka java客户端和kafka服务器。 我的代码: Kafka马纳格 当我的循环长度如果在1000左右(在类)时,我就能成功地向Kafka主题发送数据。 但当我的循环长度为1或小于10时,我无法向Kafka主题发送数据。注意我没有得到任何错误。 根据我的发现,如果我想发送一个单一的消息到Kafka主题,根据这个程序我得到了成功的消息,但从来没有得到一个关于我的主题的消息。 但是如