Version Info:
"org.apache.storm" % "storm-core" % "1.2.1"
"org.apache.storm" % "storm-kafka-client" % "1.2.1"
我有一个storm拓扑,如下所示:
螺栓ta->螺栓b->螺栓tc->螺栓d
bolta
只是对请求进行一些格式化,并发出另一个元组。boltb
执行一些处理,并为接受的每个元组发出大约100个元组。boltc
和boltd
处理这些元组。所有的bolts都实现了basebasicbolt
。
2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died!
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
2018-11-30T20:01:05.262+05:30 executor [ERROR]
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
case class Abc(index: Int, rand: Boolean)
class BoltA extends BaseBasicBolt {
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
val inp = input.getBinaryByField("value").getObj[someObj]
val randomGenerator = new Random()
var i = 0
val rand = randomGenerator.nextBoolean()
1 to 100 foreach {
collector.emit(new Values(Abc(i, rand).getJsonBytes))
i += 1
}
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
declarer.declare(new Fields("boltAout"))
}
}
class BoltB extends BaseBasicBolt {
override def execute(input: Tuple, collector: BasicOutputCollector): Unit = {
val abc = input.getBinaryByField("boltAout").getObj[Abc]
println(s"Received ${abc.index}th tuple in BoltB")
if(abc.index >= 97 && abc.rand){
println(s"throwing FailedException for ${abc.index}th tuple for")
throw new FailedException()
}
}
override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
}
}
private def getKafkaSpoutConfig(source: Config) = KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list", "queueName")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
.setOffsetCommitPeriodMs(100)
.setRetry(new KafkaSpoutRetryExponentialBackoff(
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
10,
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
))
.setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy", "UNCOMMITTED_EARLIEST")))
.setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset", 10000))
.build()
MessageTimeoutInsecons:300
这是由@Stig Rohde Døssing提供的修复方法。问题的确切原因如下:
在STORM-2666和followups的修复中,我们添加了逻辑来处理在下列偏移量已经被加密之后,spout接收到偏移量的ack的情况。问题是,喷口可能会提交所有被抵消的偏移,但不能向前调整消费者位置,或在适当发出之前清除。如果被加密的偏移量远远落后于日志结束的偏移量,则喷口可能会最终轮询它已经提交的偏移量。
修复方法有点错误。当使用者位置落后于提交的偏移量时,我们确保向前调整位置,并清除提交的偏移量之后的任何waitingToEmit消息。我们不清除waitingToEmit,除非我们调整了消费者的位置,结果这是一个问题。
问题内容: 我想从字符串中删除最后一次出现的“ \”这个特殊字符。我尝试了像这样的字符串函数 但是每次遇到错误时,我都要求加一个额外的报价。同时我发现(“ \”“)用来传递”这个特殊字符。我该如何进行? 问题答案: 您需要使用 字符串中的字符转义特殊字符(依此类推)。因此,在它们之前使用a 会使它成为 文字 ,这意味着java会将其后的内容视为常规字符。 你可以测试看看 将打印。它会打印。 所以:
问题内容: 我在我的应用程序的servlet中使用以下代码 当我运行应用程序并调用servlet时,出现以下错误 我已经在Java版本为JDK 1.6.20的Linux机器上托管了该应用程序。 是什么原因引起的问题… 是编写代码的类,是在上述类中调用方法的servlet … 问题答案: 要在服务器端应用程序中使用AWT类,我相信您需要在“无头”模式下运行。将servlet容器的启动更改为包括: (
来自servlet com.google.gson的未捕获异常。JsonSyntaxException:java.lang.IllegalStateException:应为BEGIN_OBJECT,但在com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(Reflective TypeAdapter Fact
问题内容: 我创建了一些小的方法作为Util类,例如以下代码: 并且,进行了非常简单的运行,如下所示: 结果是一个很奇怪的日期-> 2017/07/187 搜索了类似的问题,并尝试了TimeZone,Locale,但没有帮助。有什么建议 ? 顺便说一句,这是我的环境: Windows7 x86 JDK 1.8.0.131 x86 Oxygen corrosion of x86 问题答案: 您格式化
我从Spring Boot开始,并尝试提供Rest服务。我正在编写一个控制器,其中有3个方法的RequestMappings。其中两个工作正常,而第三个注释在编写代码时给出了这个错误。 此行有多个标记 - 语法错误,插入“枚举标识符”以完成枚举标头 - 语法错误,插入“枚举正文”以完成枚举声明 我尝试了其他答案,但似乎找不到问题所在。这是我的控制器代码- } 错误出现在最后一行,即最后一个请求映射
第一个64字符行的CRC(“这是教训:永不屈服,永不屈服,永不,永不,)应该是000015FA,我得到的是BFE6EC00。 我的逻辑是: > 在CRCCalculation中,我将每个字符添加到一个32位无符号整数中,并在64(一行长度)之后将其发送到XOR函数中。 代码: