当前位置: 首页 > 知识库问答 >
问题:

SparkJava:java.util.并发修改异常广播对象类型GenericObjectPool

晋承嗣
2023-03-14

我正在java中做一个火花流项目。我试图发送一些消息从火花到阿帕奇Kafka使用Kafka-生产者java api。由于为每个元素创建KafkaProducer实例将非常昂贵,所以我试图使用一个使用apache公共池框架的生产者池。如下面的代码片段所示,我正在创建GenericObjectpool实例,并将其广播如下所示:-

GenericObjectPool<KafkaProducer<String, String>> producerPool = new GenericObjectPool<KafkaProducer<String, String>>(
                new KafkaProducerFactory(prop));
final Broadcast<GenericObjectPool<KafkaProducer<String, String>>> pool = ssc.sparkContext() .broadcast(producerPool);  //**Causing exception**

KafkaProducerFactory类的代码粘贴在下面:-

import java.io.Serializable;
import java.util.Map;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaProducerFactory<K,V> extends BasePooledObjectFactory<KafkaProducer<K, V>> 
implements Serializable{
    private Map<String,Object> configs;
    public KafkaProducerFactory(Map<String, Object> configs) {
        this.configs=configs;
    }

    @Override
    public KafkaProducer<K, V> create() {
        return new KafkaProducer<K, V>(this.configs);
    }

    @Override
    public PooledObject<KafkaProducer<K,V>> wrap(KafkaProducer<K,V> producer) {
        return new DefaultPooledObject<KafkaProducer<K,V>>(producer);
    }

    @Override
    public void destroyObject(PooledObject<KafkaProducer<K,V>>obj){
        obj.getObject().close();
    }
}

但上面的行给了我下面粘贴的例外:-

通用域名格式。深奥的软件。克鲁约。例外:java。util。ConcurrentModificationException

完整StackTrace粘贴在下面:-

Exception in thread "main" com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.spark.util.MutableURLClassLoader)
referent (java.lang.ref.WeakReference)
factoryClassLoader (org.apache.commons.pool2.impl.GenericObjectPool)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1291)
    at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
    at com.veda.txt.spark.Engine.start(Engine.java:63)
    at com.veda.txt.spark.Engine.main(Engine.java:126)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:622)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.ConcurrentModificationException
    at java.util.Vector$Itr.checkForComodification(Vector.java:1127)
    at java.util.Vector$Itr.next(Vector.java:1104)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
    ... 39 more
15/08/26 20:38:14 INFO SparkContext: Invoking stop() from shutdown hook

请告诉我哪里出了问题。

塔克斯

共有1个答案

龙新荣
2023-03-14

KafkaProducer不可序列化,您无法播放它。

一般来说,对于这类问题,您可以使用foreachPartition并为每个分区创建一次昂贵的资源,而不是为每个元素创建一次。如果这仍然不能满足您的性能需求,您可以使用单例(假设对象是线程安全的,在kafka生产者的情况下应该是线程安全的)。

最近有一篇关于这个主题的博客文章在Spark用户邮件列表中分享:

http://allegro.tech/spark-kafka-integration.html

 类似资料:
  • 问题内容: 我有这段代码,它给了我并发修改异常。即使看不到任何并发修改,我也无法理解为什么继续得到它。 问题答案: 为了避免,你应该这样编写代码: 允许你在迭代期间修改列表,但不能在创建和使用列表之间进行修改。

  • 问题内容: 在此python文档页面上,它说: 像其身份一样,对象的类型也是不可更改的。 我尝试这个脚本, 结果显示: 我想我更改了的。 怎么了,我想念什么? 问题答案: 该页面的脚注说: [1]在某些情况下,在某些受控条件下可以更改对象的类型。但是,通常这不是一个好主意,因为如果处理不当,它可能会导致某些非常奇怪的行为。 如果您尝试将f2的更改为: 引发TypeError:

  • 问题内容: 您能否告诉我在单线程环境中是否有可能发生并发修改异常的方法,我下面发布的以下应用程序由两个线程组成,请告诉我我也可以在单个线程中看到相同的异常..请劝告 是的,我知道,在单线程环境中,此错误可能会出现..如下面的代码所示。 请告知解决该问题的方法是什么..这样就不会出现此错误.. !! 问题答案: 可以在单线程环境中引发A。只要在不应该​​在上下文中修改对象的情况下使用它,就不必在另一

  • 当Maven构建我的项目并运行单元测试时,有时会抛出一个并发修改异常(大约5次中有1次会失败,其他时间会成功构建)。但是当我以单元测试的形式在本地运行测试时,它们都会毫无例外地通过。 在我的pom.xml文件我有Surefire插件配置为: 然而,我得到的stackTrace没有提到是什么导致了并发修改异常。 我注意到所有的测试都通过了构建,但是出于某种原因,Maven重新打印了已经通过但现在有测

  • 下面我需要帮助:我有两种方法: 第二种方法 在for循环中的方法calculatime中,我只得到第一个项目的结果,然后得到与标题相同的错误。请帮帮我,为这个案子多解释一下。

  • 问题内容: 我正在为我的大学课程使用一些代码,并从 至: 但是新方法不断给出并发修改错误。我如何解决这个问题,为什么会发生呢? 问题答案: 这是因为执行后您继续遍历该列表。 您正在同时读取和写入列表,这破坏了foreach循环下面的迭代器协定。 用 描述如下: 返回迭代中的下一个元素。 如果迭代没有更多元素,则抛出该异常。 您可以用来检查下一个元素是否可用。