当前位置: 首页 > 知识库问答 >
问题:

KafkaStorm喷口改变拓扑和消耗从旧的偏移

冯文彬
2023-03-14

我用Kafka壶嘴来消费信息。但是,如果我必须更改拓扑并上传,那么它将从旧消息恢复还是从新消息开始?Kafka壶嘴给了我们从哪里消费的时间戳,但我怎么知道时间戳呢?

共有3个答案

国景铄
2023-03-14

基本上,事件的顺序将是:

>

  • 第一次从以下属性开始读取拓扑:

    forceFromStart = true
    
    startOffsetTime = -2
    

    以上道具将迫使它从话题的开头开始。请记住具有这两个属性,因为forceFromStart告诉Storm读取starOffsetTime属性,并使用设置的值来确定从哪里开始读取,并忽略zoowatch偏移。

    从现在起,拓扑将运行,zookeeper将保持偏移。如果您的工作人员死亡,它将由主管启动,并从zookeeper中的偏移量开始读取。

    现在,如果要重新启动拓扑,并且要从关机前停止的位置读取拓扑,请使用以下属性并重新启动拓扑:

    forceFromStart = false
    

    通过上面的属性,您告诉Storm不要读取starOffsetTime值,而是使用在关闭拓扑结构之前维护的zoowatch偏移量。

    从现在开始,每次重新启动拓扑,它都将从它离开的地方读取。

    如果要重新启动拓扑,并且要从主题的开头/顶部读取,请使用以下属性并重新启动拓扑:

    forceFromStart = true
    
    startOffsetTime = -1
    

    通过上面的属性,你告诉Storm忽略动物园管理员的偏移量,从主题的最新偏移量开始。

  • 梁丘波鸿
    2023-03-14

    如果您使用KafkaSpout,请确保以下内容:

    1. 在SpoutConfig中,"id"和"zkroot"在重新部署新版本的拓扑后不会更改。Storm使用zkroot、id将主题偏移量存储到zoowatch中
    2. KafkaConfig.forceFromStart设置为false。

    KafkaSpout将偏移量存储到动物园管理员。在重新部署期间要非常小心,如果您在KafkaSpout的KafkaConfig中将forceFromStart设置为true(这可能是您第一次部署拓扑时的情况),它将忽略存储的动物园管理员偏移。确保设置为false。

    考虑写你的拓扑结构,让KAFKACONFIG。当拓扑的main()方法执行时,将从属性文件中读取forceFromStart值。这将允许管理员控制是否重播Kafka消息

    党佐
    2023-03-14

    spoutConfig。forceStartOffsetTime(-1)

    它将选择围绕该时间戳写入的最新偏移量来开始消费。您可以通过传入-1来强制喷口总是从最近的偏移开始,也可以通过传入-2来强制喷口从最早的偏移开始。

    参考文献

     类似资料:
    • 我们有一个不想连续运行storm拓扑的用例。相反,有一组输入(10K+)应该在指定的时间被处理,Spout连续发射这些输入,并得到拓扑中其余螺栓的处理。处理完所有输入后,在我的喷注中就没有任何东西可以从nextTuple发出。 此时,我们希望拓扑进入Hibernate状态,并在每天晚上12:00重新启动进程。 在storm配置中是否有任何属性可以设置为每天运行一次拓扑并在处理完成后Hibernat

    • 我对Apache Storm的性能有一个问题,主要是从喷口出来的。 我有一个从kestrel队列发出项目的拓扑。我获取大约2000个项目,每次在喷注中调用时,我都会发出一个。 我正在使用1个spout任务和1个spout执行器运行。我已将设置为10。 为什么每次调用之间有这么大的时间间隔?outputCollector在发出一个新元组之前是否正在等待听到每个元组的反馈? 我正在运行Java8和st

    • Spout被配置为从zookeeper读取最后的提交偏移量,并且在此场景中,该偏移量大于Kafka中最新的消息偏移量。我们也在研究为什么主题偏移被重置。 目前我们通过观察Storm日志中的范围外警告来解决这个问题,删除zookeeper偏移条目,然后重新部署拓扑。

    • 我创建了一个带有Spout的Storm拓扑,该Spout会发出许多元组用于基准测试。一旦所有的元组都从spout发出或者拓扑中不再有任何元组流动,我就想停止/终止我的拓扑。

    • 我是Storm和Kafka的新手,我可以在一段时间后在本地虚拟机上安装它们。我目前有一个有效的wordCount拓扑,从dropBox文本文件中提取句子: 现在我想升级我的喷口,使用Kafka的文本,以便在拓扑结构中提交到我的下一个螺栓。我试图在git中遵循许多文章和代码,但没有任何成功。例如:这个Kafka喷口。谁能帮助我,给我一些方向,以便实现新的spout.java文件?谢谢你!

    • 本文向大家介绍环形拓扑和网格拓扑之间的区别,包括了环形拓扑和网格拓扑之间的区别的使用技巧和注意事项,需要的朋友参考一下 环形拓扑 在环形拓扑中,每个节点都以环形方式连接到其左节点和右节点,信息可以从一个节点流向另一个方向。如果有n个节点,则存在n个链接。如果要添加一个新节点,则整个连接将被中断。 网格拓扑 在网状拓扑中,每个节点使用其自己的专用链接连接到其他节点,并且信息可以从这些链接传播到任何节