我是Kafka的新手,正在开发一个原型,将专有的流媒体服务连接到Kafka中。
我希望得到一个主题上发送的最后一条消息的密钥,因为我们的内部流消费者需要用连接时收到的最后一条消息的ID登录。
我尝试使用使用者执行以下操作,但当同时运行控制台使用者时,我看到消息被重播。
// Poll so we know we're connected
consumer.poll(100);
// Get the assigned partitions
Set<TopicPartition> assignedPartitions = consumer.assignment();
// Seek to the end of those partitions
consumer.seekToEnd(assignedPartitions);
for(TopicPartition partition : assignedPartitions) {
final long offset = consumer.committed(partition).offset();
// Seek to the previous message
consumer.seek(partition,offset - 1);
}
// Now get the last message
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
lastKey = record.key();
}
consumer.close();
这是意料之中的行为还是我走错了路?
问题出在final long offset=consumer.committe(partition).offset()
上,因为链接api引用committe
方法是获取给定分区的最后提交的偏移量
,即使用者告诉kafka服务器它已经读取的最后一个偏移量。因此,肯定会重播消息
,因为您总是从特定的偏移量读取。因为我想我只需要移除第一个块。
我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测
是否可以验证/筛选发送到Kafka主题的消息?
我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。
我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)
我以前是学习Kafka的传统ActiveMQ用户。我有一个问题。 使用Active MQ,您可以执行以下操作: 将100条消息提交到队列中 我试着在Kafka做同样的事情 如果不启动Consumer,等待它启动,然后运行producer,则此示例不起作用。 谁能告诉我如何修改我的示例程序,以在消息等待被消费的地方执行操作?
我正在使用kafka java客户端和kafka服务器。 我的代码: Kafka马纳格 当我的循环长度如果在1000左右(在类)时,我就能成功地向Kafka主题发送数据。 但当我的循环长度为1或小于10时,我无法向Kafka主题发送数据。注意我没有得到任何错误。 根据我的发现,如果我想发送一个单一的消息到Kafka主题,根据这个程序我得到了成功的消息,但从来没有得到一个关于我的主题的消息。 但是如