当前位置: 首页 > 知识库问答 >
问题:

Storm-Kafka喷口失效

乌和畅
2023-03-14

我使用storm0.9.4和storm-kafka:0.9.0-wip16a-scala292作为从kafka0.7读取的依赖项。

  • 我们的Kafka保留政策是7天。
  • 我从经纪人的最新偏移量开始读取。
kafka.common.OffsetOutOfRangeException: null
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.7.0_75]
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) ~[na:1.7.0_75]
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.7.0_75]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526) ~[na:1.7.0_75]
        at java.lang.Class.newInstance(Class.java:379) ~[na:1.7.0_75]
        at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) ~[stormjar.jar:na]
        at kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) ~[stormjar.jar:na]
        at kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) ~[stormjar.jar:na]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) ~[stormjar.jar:na]
        at kafka.message.MessageSet.foreach(MessageSet.scala:87) ~[stormjar.jar:na]
        at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:104) ~[stormjar.jar:na]
        at kafka.message.MessageSet.size(MessageSet.scala:87) ~[stormjar.jar:na]
        at storm.kafka.PartitionManager.fill(PartitionManager.java:113) ~[stormjar.jar:na]
        at storm.kafka.PartitionManager.next(PartitionManager.java:83) ~[stormjar.jar:na]
        at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:106) ~[stormjar.jar:na]
        at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] 2015-04-30T01:49:15.118-0500 backtype.storm.daemon.executor [ERROR] kafka.common.OffsetOutOfRangeException: null
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.7.0_75]
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) ~[na:1.7.0_75]
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.7.0_75]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526) ~[na:1.7.0_75]
        at java.lang.Class.newInstance(Class.java:379) ~[na:1.7.0_75]
        at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) ~[stormjar.jar:na]
        at kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) ~[stormjar.jar:na]
        at kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) ~[stormjar.jar:na]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) ~[stormjar.jar:na]
        at kafka.message.MessageSet.foreach(MessageSet.scala:87) ~[stormjar.jar:na]
        at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:104) ~[stormjar.jar:na]
        at kafka.message.MessageSet.size(MessageSet.scala:87) ~[stormjar.jar:na]
        at storm.kafka.PartitionManager.fill(PartitionManager.java:113) ~[stormjar.jar:na]
        at storm.kafka.PartitionManager.next(PartitionManager.java:83) ~[stormjar.jar:na]
        at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:106) ~[stormjar.jar:na]
        at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] 2015-04-30T01:49:15.129-0500 backtype.storm.util [ERROR] Halting process: ("Worker died") java.lang.RuntimeException: ("Worker died")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$fn__5102$fn__5103.invoke(worker.clj:495) [storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$mk_executor_data$fn__4555$fn__4556.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]

共有1个答案

彭浩穰
2023-03-14

喷口的配置方式有问题。我们有用于初始化SpoutConfig对象的自定义属性,我们始终将forceFromStart和startOffsetTime(设置为latest)。问题是,与后者相关的属性配置了错误的键,因此有时一个spout的zk条目具有一个在kafka中不再存在的较早的偏移量条目,或者引用了storm拓扑启动时存在的条目,但在storm完成积压之前从kafka中删除了该条目。由于我们无论如何都不想满足这个场景,我们只是修正了配置及其工作。

 类似资料:
  • 这里可能发生了同样的事情:错误backtype.storm.util-Async循环死亡!BufferUnderFlowException:null,但我将添加一个完整的堆栈跟踪和一些更多的上下文。 Storm版本-9.3 Storm-Kafka版本-9.3 Kafka版本-0.8.2-beta 堆栈跟踪: Spout代码(注意,出于调试目的,我使用的是一个静态定义的分区映射,只有一个代理):

  • 我想知道是否有任何Kafka喷口支持安全的Kafka经纪人。apache storm的KafkaSpout不支持SSL Kafka。 下面提到的Kafka不接受SSL Kafka生产者/消费者支持的任何参数。 请让我知道有没有任何方法,我们可以实现安全的Kafka消息流处理与Storm拓扑。

  • 我只是在试用这里提到的kafka-storm喷口https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka,我使用的配置如下所述。 但是,上面的喷子从Kafka主题中获取消息的速度大约是每秒7000条,但我预计每秒大约有50000条消息。我尝试了在spoutConfig中增加提取缓冲区大小的各种选项,但没有看到任何结果。

  • 我用的是Kafka。请在下面找到测试程序。 我正在使用Storm 0.8.1。在Storm 0.8.2中存在多方案类。我会用那个。我只想知道早期版本是如何通过实例化String计划()类来工作的?我在哪里可以下载早期版本的Kafka喷口?但是我怀疑这是一个正确的选择,而不是在Storm 0.8.2上工作。???(困惑) 当我在暴风集群上运行代码(如下所示)时(即当我推我的拓扑时),我得到以下错误(

  • 我对Apache Storm有一个奇怪的问题。我有一个Kafka连接到Kafka集群,里面有10条消息。 螺栓接收每条消息并正确处理,因为在Storm UI中,螺栓被列为“已确认”。然而,storm UI下面列出的喷口表示所有元组都失败了。 我相信这会导致喷口再次发出所有的信息。。。因此,我看到一个Storm螺栓打印出消息1-10,然后以相同的顺序一次又一次地打印出来。 我适当地调用了和方法,我只