我有一个Kafka Streams应用程序版本-0.11,它从很少的主题中获取数据,并将数据连接到另一个主题中。
5 kafka brokers - version 0.11
Kafka Topics - 15 partitions and 3 replication factor.
org.apache.kafka.streams.errors.LockException: task [4_10] Failed to lock the state directory for task 4_10
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:99)
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:80)
at org.apache.kafka.streams.processor.internals.StandbyTask.<init>(StandbyTask.java:62)
at org.apache.kafka.streams.processor.internals.StreamThread.createStandbyTask(StreamThread.java:1325)
at org.apache.kafka.streams.processor.internals.StreamThread.access$2400(StreamThread.java:73)
at org.apache.kafka.streams.processor.internals.StreamThread$StandbyTaskCreator.createTask(StreamThread.java:313)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
at org.apache.kafka.streams.processor.internals.StreamThread.addStandbyTasks(StreamThread.java:1366)
at org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:73)
at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:185)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
我在一些jira问题上读到过,清理流可能有助于修复问题。但是每次启动Kafka流应用程序时清理流是正确的解决方案还是补丁?此外,流清理会延迟应用程序的启动,对吗?
注意:每次启动Kafka Streams应用程序时,在调用Streams.start()之前是否需要调用Streams.cleanup()
看到org.apache.kafka.streams.errors.lockException:task[4_10]未能锁定Task4_10
的状态目录,这实际上是预期的,应该自行解决。该线程将后退以便等待,直到另一个线程释放锁并稍后重试。因此,您甚至可能多次看到这个警告消息是日志,以防在第二个线程释放锁之前发生重试。
但是,最终锁应该由第二个线程释放,第一个线程将能够获得锁。之后,溪流应该只是向前移动。注意,这是一条警告消息,而不是错误。
我在windows中启动Kafka服务器时遇到问题 命令\bin\windows\kafka服务器启动。球棒\配置\服务器。属性 错误消息: 该命令的语法不正确。错误:无法找到或加载主类文件\IBM\WebSphere 知道吗?
问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp
使用的build.sbt文件如下: Scala中以下2行给出了以下异常 线程“main”java.lang.noClassDeffounder异常错误:org/apache/kafka/streams/streamsbuilder at tradesapp$.main(tradesapp.scala:21)at tradesapp.main(tradesapp.scala)at java.base
它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka
我是Kafka和spark的初学者。我想通过spark streaming对我从Kafka收到的特定主题的数据进行实时处理。我无法使用createStream函数返回的JavaPairReceiverInputStream。 下面的代码给出了一个错误: 方法图(Function, R 我使用的spark版本是1.2.0。我找不到处理Kafka消息的java api示例。有人能告诉我我需要改变什么吗
问题内容: 我为ubuntu下载apache-cassandra-0.8.5并将其解压缩,并阅读了自述文件。我在外壳中尝试下面的命令: 但是它说:错误:代理抛出的异常:java.net.MalformedURLException:本地主机名未知:java.net.UnknownHostException:node24.nise.local:node24.nise.local 我该怎么办? 问题答案