如何从动物园管理员那里获得最后一次偏移时间?当使用Storm喷口阅读来自Kafka的消息时。上下文:Kafka 不断获取消息,使用者读取一段时间,然后由于任何原因关闭,然后使用者仅读取最新消息,但不读取上次偏移量读取
消费者阅读一段时间,然后由于任何原因关闭
不知道你到底指的是什么,因为消费者应该无限运行,除非它被明确停止。
现在假设您使用的是Storm的KafkaSpout实现,有一个名为forceStartOffsetTime
的配置用于强制spout倒带到以前的偏移
。使用方法如下
spoutConfig.forceStartOffsetTime(-2);
如文档页面所示
它将选择围绕该时间戳写入的最新偏移量以开始使用。您可以通过传入 -1 来强制 Spout 始终从最新的偏移量开始,也可以通过传入 -2 来强制它从最早的偏移量开始。
因此,将其设置为-2将始终强制它从一开始就读取您正在使用的配置,如果您可以发布一些代码,那就太好了。
我是Storm世界的新手。在我的拓扑中,我使用Kafka的数据,并使用。 通过一些测试,我得到了以下警告消息: 2015-10-01 23:31:51.753 s.k.KafkaUtils[警告]获取了偏移量超出范围的获取请求:[85970]2015-10-01 23:31:51.755 s.k.PartitionManager[警告]使用新偏移量:0 我的\\\\\\\\\\\\\\\\\\\\
我已经开发了一个使用apache storm使用kafka消息的应用程序,当我在eclipse中运行topology using in LocalCluster时,它可以正常工作,消息也可以正常使用,但是当我使用storm命令(bin\storm jar..\kafka-storm-0.0.1-SNAPSHOT.jar com.kafka_storm.util.topology storm kaf
我想使用rabbitMq队列中Storm喷口中的消息。 现在,我们使用Spring AMQP异步发送和接收来自RabbitMq的消息。 Spring AMQP提供了从队列读取消息的机制(创建监听器或使用注释@RabbitListner)。 问题是我可以让一个侦听器从队列中读取消息。但是,我如何将此消息发送到Storm群上运行的Storm喷口? 拓扑将启动一个集群,但在我的spout的nextTup
我们希望在读取消息表单kafka时实现并行性。因此我们想在flinkkafkaconsumer中指定分区号。它将从kafka中的所有分区读取消息,而不是特定的分区号。以下是示例代码: 请建议任何更好的选择来获得并行性。
我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测
我正在运行一个简单的Kafka streams应用程序,它将使用Node JS记录的信息带到一个Kafka主题。 还需要注意的是,时间戳只是一个数字,表示自1970年6月以来的秒数。 我使用scala中的Kafka流来使用这些数据。 例如。 然而,我不确定如何将时间戳(我从nodeJS发送的)提取到这个流中。 例如,如果我尝试做这样的事情 这会导致错误“无法解析符号流”。我在想我该怎么解决这个问题