我已经用Storm构建了一个示例拓扑,使用Kafka作为源。这是一个我需要解决的问题。
每次我杀死一个拓扑并重新启动它时,该拓扑都从一开始就开始处理。
假设Topology处理了主题X中的消息A,然后我终止了该拓扑。
现在,当我再次提交拓扑时,消息A仍然存在,主题X再次被处理。
有没有一个解决方案,也许某种偏移管理来处理这种情况。
在创建spoutconfig时,请确保它有一个固定的spout id,在重新启动后可以通过它来标识自己。
来自官方风暴站点:
重要提示:在重新部署拓扑时,确保未修改spoutconfig.zkroot和spoutconfig.id的设置,否则spout将无法从ZooKeeper读取其先前的使用者状态信息(即偏移量)--这可能导致意外行为和/或数据丢失,这取决于您的用例。
对于新代码,您不应该使用storm-kafka
,它是不推荐使用的,因为底层客户端API在Kafka中是不推荐使用的,并且从2.0.0开始被删除。相反,使用storm-kafka-client
。
使用storm-kafka-client
您希望设置组id和第一轮询偏移策略。
KafkaSpoutConfig.builder(bootstrapServers, "your-topic")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
.build();
以上将使您的喷口开始在最早的偏移第一次你启动它,然后它将拾起它离开的地方,如果你重新启动它。组id被Kafka用来在喷口重新启动时识别,因此它可以拿回存储的偏移量检查点。其他偏移量策略的行为会有所不同,您可以检查javadoc中的FirstPollOffsetStrategy枚举。
spout会定期检查它到达多远,在config中也有一个设置来控制这一点。检查点由配置中的setprocessingguranure
设置控制,可以设置为至少一次(只有检查点加密的偏移量)、最多一次(spout发出消息之前的检查点)和“任何时间”(定期检查点,忽略ACK)。
看看Storm https://github.com/apache/Storm/blob/dc56e32f3dcdd9396a827a85029d60ed97474786/examples/storm-kafka-client-examples/src/main/Java/org/apache/Storm/kafka/spout/kafkaPoutTopologyMainNamedTopics.Java#l93中包含的一个示例拓扑。
在我的例子中,我们有auto.commit.enable=false,以便在处理消息后将offset提交给Zookeeper。如果处理失败,那么offset将不会被提交,我们应该尝试在某个配置的时间内再次处理相同的消息,从zookeeper的offset开始。但它不起作用,因为,我假设,Apache Kafka客户端在内存中保留了偏移量。 我发现kafka.consumer.consumerite
我对使用Kafka和动物园管理员时偏移量的存储位置有点困惑。在某些情况下,偏移量似乎存储在动物园管理员中,而在其他情况下,它们存储在Kafka中。 什么决定了偏移量是存储在Kafka中还是存储在Zookeeper中?有哪些利弊? 注意:当然,我也可以将偏移量存储在不同的数据存储中,但这不是这篇文章的内容。 有关我的设置的更多详细信息: 我运行这些版本:KAFKA_VERSION=“0.10.1.0
我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。
我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。
我用Kafka和spring-布特: Kafka制作人班: Kafka-配置: 问题: 我有一个主题的5个分区,比方说。 发生的情况是,我获得成功(即消息成功发送到Kafka)日志,但是topic的无分区的偏移量增加。 正如您在上面看到的,我添加了日志和。我所期望的是,当Kafka不能发送消息给Kafka时,我应该得到一个错误,但在这种情况下,我没有收到任何错误消息。 Kafka的上述行为以的比例
为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?