当前位置: 首页 > 知识库问答 >
问题:

使用Kafka Spout的Apache storm给出错误:IllegalStateException

何峰
2023-03-14
Version Info: 
   "org.apache.storm" % "storm-core" % "1.2.1" 
   "org.apache.storm" % "storm-kafka-client" % "1.2.1" 

我有一个storm拓扑,如下所示:

螺栓ta->螺栓b->螺栓tc->螺栓d

bolta只是对请求进行一些格式化,并发出另一个元组。boltb执行一些处理,并为接受的每个元组发出大约100个元组。boltcboltd处理这些元组。所有的bolts都实现了basebasicbolt

2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died!
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
2018-11-30T20:01:05.262+05:30 executor [ERROR]
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
case class Abc(index: Int, rand: Boolean)

class BoltA  extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
    val inp = input.getBinaryByField("value").getObj[someObj]
    val randomGenerator = new Random()

    var i = 0
    val rand = randomGenerator.nextBoolean()
    1 to 100 foreach {
      collector.emit(new Values(Abc(i, rand).getJsonBytes))
      i += 1
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
    declarer.declare(new Fields("boltAout"))
  }

}
class BoltB  extends BaseBasicBolt {

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
    val abc = input.getBinaryByField("boltAout").getObj[Abc]
    println(s"Received ${abc.index}th tuple in BoltB")
    if(abc.index >= 97 && abc.rand){
      println(s"throwing FailedException for ${abc.index}th tuple for")
      throw new FailedException()
    }
  }

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
  }
}
private def getKafkaSpoutConfig(source: Config) = KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list", "queueName")
    .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
    .setOffsetCommitPeriodMs(100)
    .setRetry(new KafkaSpoutRetryExponentialBackoff(
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
      10,
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
    ))
    .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy", "UNCOMMITTED_EARLIEST")))
    .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset", 10000))
    .build()

MessageTimeoutInsecons:300

共有1个答案

乐正光誉
2023-03-14

这是由@Stig Rohde Døssing提供的修复方法。问题的确切原因如下:

在STORM-2666和followups的修复中,我们添加了逻辑来处理在下列偏移量已经被加密之后,spout接收到偏移量的ack的情况。问题是,喷口可能会提交所有被抵消的偏移,但不能向前调整消费者位置,或在适当发出之前清除。如果被加密的偏移量远远落后于日志结束的偏移量,则喷口可能会最终轮询它已经提交的偏移量。

修复方法有点错误。当使用者位置落后于提交的偏移量时,我们确保向前调整位置,并清除提交的偏移量之后的任何waitingToEmit消息。我们不清除waitingToEmit,除非我们调整了消费者的位置,结果这是一个问题。

 类似资料:
  • 问题内容: 我想从字符串中删除最后一次出现的“ \”这个特殊字符。我尝试了像这样的字符串函数 但是每次遇到错误时,我都要求加一个额外的报价。同时我发现(“ \”“)用来传递”这个特殊字符。我该如何进行? 问题答案: 您需要使用 字符串中的字符转义特殊字符(依此类推)。因此,在它们之前使用a 会使它成为 文字 ,这意味着java会将其后的内容视为常规字符。 你可以测试看看 将打印。它会打印。 所以:

  • 问题内容: 我在我的应用程序的servlet中使用以下代码 当我运行应用程序并调用servlet时,出现以下错误 我已经在Java版本为JDK 1.6.20的Linux机器上托管了该应用程序。 是什么原因引起的问题… 是编写代码的类,是在上述类中调用方法的servlet … 问题答案: 要在服务器端应用程序中使用AWT类,我相信您需要在“无头”模式下运行。将servlet容器的启动更改为包括: (

  • 来自servlet com.google.gson的未捕获异常。JsonSyntaxException:java.lang.IllegalStateException:应为BEGIN_OBJECT,但在com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(Reflective TypeAdapter Fact

  • 问题内容: 我创建了一些小的方法作为Util类,例如以下代码: 并且,进行了非常简单的运行,如下所示: 结果是一个很奇怪的日期-> 2017/07/187 搜索了类似的问题,并尝试了TimeZone,Locale,但没有帮助。有什么建议 ? 顺便说一句,这是我的环境: Windows7 x86 JDK 1.8.0.131 x86 Oxygen corrosion of x86 问题答案: 您格式化

  • 我从Spring Boot开始,并尝试提供Rest服务。我正在编写一个控制器,其中有3个方法的RequestMappings。其中两个工作正常,而第三个注释在编写代码时给出了这个错误。 此行有多个标记 - 语法错误,插入“枚举标识符”以完成枚举标头 - 语法错误,插入“枚举正文”以完成枚举声明 我尝试了其他答案,但似乎找不到问题所在。这是我的控制器代码- } 错误出现在最后一行,即最后一个请求映射

  • 第一个64字符行的CRC(“这是教训:永不屈服,永不屈服,永不,永不,)应该是000015FA,我得到的是BFE6EC00。 我的逻辑是: > 在CRCCalculation中,我将每个字符添加到一个32位无符号整数中,并在64(一行长度)之后将其发送到XOR函数中。 代码: