当前位置: 首页 > 知识库问答 >
问题:

storm kafka喷口的BufferUnderflowException

金秦斩
2023-03-14

这里可能发生了同样的事情:错误backtype.storm.util-Async循环死亡!BufferUnderFlowException:null,但我将添加一个完整的堆栈跟踪和一些更多的上下文。

Storm版本-9.3
Storm-Kafka版本-9.3
Kafka版本-0.8.2-beta

堆栈跟踪:

java.lang.RuntimeException: java.nio.BufferUnderflowException  
  at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-   0.9.3.jar:0.9.3]
  at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3]
  at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3]
  at backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748) ~[storm-core-0.9.3.jar:0.9.3]
  at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
  at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
  at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]  
Caused by: java.nio.BufferUnderflowException: null
  at java.nio.Buffer.nextGetIndex(Buffer.java:498) ~[na:1.7.0_71]
  at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:355) ~[na:1.7.0_71]
  at kafka.api.OffsetResponse$.readFrom(OffsetResponse.scala:28) ~[kafka_2.10-0.8.2-beta.jar:na]
  at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:128) ~[kafka_2.10-0.8.2-beta.jar:na]
  at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2-beta.jar:na]
  at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.3.jar:0.9.3]
  at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.3.jar:0.9.3]
  at storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:111) ~[storm-kafka-0.9.3.jar:0.9.3]
  at storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72) ~[storm-kafka-0.9.3.jar:0.9.3]
  at storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79) ~[storm-kafka-0.9.3.jar:0.9.3]
  at storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafkaEmitter.java:46) ~[storm-kafka-0.9.3.jar:0.9.3]
  at storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:204) ~[storm-kafka-0.9.3.jar:0.9.3]
  at storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:194) ~[storm-kafka-0.9.3.jar:0.9.3]
  at storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127) ~[storm-core-0.9.3.jar:0.9.3]
  at storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-0.9.3.jar:0.9.3]
  at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) ~[storm-core-0.9.3.jar:0.9.3]
  at backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633) ~[storm-core-0.9.3.jar:0.9.3]
  at backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401) ~[storm-core-0.9.3.jar:0.9.3]
  at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3]
  at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3]

Spout代码(注意,出于调试目的,我使用的是一个静态定义的分区映射,只有一个代理):

Broker broker = new Broker("localhost", 9094);
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, broker);
StaticHosts hosts = new StaticHosts(partitionInfo);
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(hosts, kafkaTopic);

spoutConfig.startOffsetTime = -1L;
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
return new OpaqueTridentKafkaSpout(spoutConfig);

共有1个答案

欧阳正卿
2023-03-14

当中断队列,特别是spout的send队列发生拥塞时,可能会产生此异常。我建议您增加executor发送缓冲区大小。它可能会解决这个问题。

 类似资料:
  • 在我的storm拓扑(有2个喷口和1个bolt)中,其中一个kafka喷口使用者的偏移量正在前进,但MSG没有通过kafka喷口发送到bolt。我可以在storm ui中看到,对于那个特定的喷口,发出和传送的消息是0。所以,我的问题是为什么消费者在前进,我可以看到消费者从zookeeper客户端的抵消逐渐增加。

  • 一般:我是一个想在Storm/Kafka/Flink/MS Azure SA/Spark上运行一些性能测试(WordCount)的学生。我想使用Kafka经纪人作为输入源。 我使用了Storm-Starter项目中的WordCount示例,并添加了Kafka作为喷口: 我使用kafka-console-producer生成一些消息。我希望有人能帮助我。我是编程Storm的新手...

  • 我正在用Apache Storm 1.1.2和Kafka0.11在Java9中构建一个Spring应用程序 我注意到,在高负载(每秒2500条消息)下,Kafka喷口有一个非常高的滞后。Kafka喷口有一个平行性提示3。滞后几乎等于喷口提交的偏移。 这个滞后设置了拓扑每秒可以摄取的最大消息量的上限,这并不是很大。有人知道解决这个问题的办法吗? 更新:我还注意到,即使有10个工作者和4个并行性提示,

  • 我正在使用Storm 1.1.2和Kafka 0.11构建一个Java Spring应用程序,将在Docker容器中启动。 我的拓扑中的所有东西都按计划工作,但在Kafka的高负载下,Kafka滞后会随着时间的推移越来越大。 我的KafKaspoutConfig: 那么我的拓扑结构如下

  • 刚开始使用Storm,只是了解喷口的概念,以及如何在喷口中实现并行。

  • 我理解是使用实现背压的一种简单方法。我想明白,现在背压已经实现了,我们还需要来节流喷口吗? 谢谢!