当前位置: 首页 > 知识库问答 >
问题:

火花:2.0。2爪哇。util。ConcurrentModificationException:KafkaConsumer对多线程访问不安全

陶沛
2023-03-14

我得到以下错误与Kafka0.10.1.0和火花2.0.2

private val spark = SparkSession.builder()
.master("local[*]")
.appName(job.name)
.config("spark.cassandra.connection.host","localhost"))
.config("spark.cassandra.connection.port","9042")
.config("spark.streaming.receiver.maxRate", 10000)
.config("spark.streaming.kafka.maxRatePerPartition", 10000)
.config("spark.streaming.kafka.consumer.cache.maxCapacity", 1)
.config("spark.streaming.kafka.consumer.cache.initialCapacity", 1)
.getOrCreate()

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> config.getString("kafka.hosts"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> job.name,
"auto.offset.reset" -> config.getString("kafka.offset"),
"enable.auto.commit" -> (false: java.lang.Boolean)
)`

例外

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1557)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1177)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

allreade看到了邮件链,但还没有解决方案https://www.mail-archive.com/user@火花。阿帕奇。org/msg566。html

共有1个答案

韶硕
2023-03-14

遇到相同错误,无法找到解决方案。相反,我使用“-executor cores 1”和spark submit来避免这个问题。如果有人找到解决方案,请发布

 类似资料:
  • 我想通过分区迭代一个dataframe,对于每个分区,迭代它的所有行,并创建一个deleteList,它将包含HBase的每一行的delete对象。我将Spark和HBase与Java一起使用,并使用以下代码创建了一个行对象: 但它无法工作,因为我无法正确访问行的值。而df有一个名为“hbase_key”的列。

  • 我在javafx中用鼠标拖动事件操作弧形时遇到了一个问题 我有一个用以下参数定义的Arc: radiusX和radiusY:整个椭圆的水平和垂直半径,该圆弧是其部分截面 centerX,centerY:弧的中心点 startAngle:弧的起始角度(相对于水平轴) 长度:弧的角度范围(度) 所以我需要的是在拖动的时候让弧线的起始角度“跟随”鼠标的移动: 当用户按下Arc的起点(位于起点角上)并在拖

  • 我有一个应用程序,它有一个ConcurrentHashMap本地存储一个存储在外部服务器上的数据副本。地图每隔几秒钟就会更新一次数据的新副本。 我有一个循环,每隔几秒钟运行一次,它可以访问HashMap并按照值的顺序将元素添加到数组中(实际上它做的事情还多一些,但这并不相关)。我的问题是,如果数据在创建数组的过程中发生了变化,您可能会在不同的地方有重复的键,或者完全省略一些键。 示例: 如您所见,

  • 问题内容: 嘿,我只需要回答一个问题…我将如何使以下代码不冻结整个JFrame? 问题答案: 使用其他线程来执行此任务。如果在主UI线程中执行此操作,则它将冻结。例如,您可以执行以下操作 更新 在对Robin和Marko提出明智建议之后,我正在用更好的解决方案来更新答案。

  • 我已经在网站上看到了“解决方案”http://www.rgagnon.com/javadetails/java-0506.html,但它不能正常工作。昨天(六月八日)应该是159,但它说是245。 那么,有没有人用Java解决方案来获取当前日期的三位数朱利安日(不是朱利安日——我需要今年的日期)? 谢谢!马克

  • When using any of the threaded mpms in Apache 2.0 it is important that every function called from Apache be thread safe. When linking in 3rd party extensions it can be difficult to determine whether t