当前位置: 首页 > 知识库问答 >
问题:

火花Kafka生产者可串行化

方焱
2023-03-14

我想出一个例外:

//    @transient
val sparkConf = new SparkConf()

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

//    @transient
val sc = new SparkContext(sparkConf)

val requestSet: RDD[String] = sc.textFile(s"hdfs:/user/bigdata/ADVERTISE-IMPRESSION-STAT*/*")

//    @transient
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, NearLineConfig.kafka_brokers)
//    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
//    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put("producer.type", "async")
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "49152")

//    @transient
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)

requestSet.foreachPartition((partisions: Iterator[String]) => {
  partisions.foreach((line: String) => {
    try {
      producer.send(new ProducerRecord[String, String]("testtopic", line))
    } catch {
      case ex: Exception => {
        log.warn(ex.getMessage, ex)
      }
    }
  })
})

producer.close()

在这个程序中,我尝试从hdfs路径读取记录,并将它们保存到Kafka中。问题是当我移除关于向Kafka发送记录的代码时,它运行得很好。我错过了什么?

共有1个答案

宰父熙云
2023-03-14

kafkaproducer不可序列化。您需要将实例的创建移到foreachpartition内部:

requestSet.foreachPartition((partitions: Iterator[String]) => {
  val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
  partitions.foreach((line: String) => {
    try {
      producer.send(new ProducerRecord[String, String]("testtopic", line))
    } catch {
      case ex: Exception => {
        log.warn(ex.getMessage, ex)
      }
    }
  })
})

请注意,KafkaProducer.send返回一个future[RecordMetadata],如果键或值不能序列化,则唯一可以从它传播的异常是SerializationException

 类似资料:
  • 我们在Spark 2.1中使用Kafka0.10,我发现我们的制作人发布消息总是很慢。在给Spark executors提供8个内核后,我只能达到1k/s左右,而另一篇帖子则说它们很容易达到百万/秒。我试着调一下玲珑的曲调。ms和batch。大小来找出答案。然而我发现了玲儿。ms=0对我和这批人来说似乎是最佳选择。大小没有多大影响。我每次迭代发送160k个事件。看来我得让Kafka制作人知道到底发

  • 我有一门课: 它运行得很好,但抛出了一个例外:在我对RDD的映射做了一个小更改之后: 我以为这两个功能应该是一样的,但似乎不是。为什么它们不同?

  • 一、生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程: Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。 接下来,数据被传给分区器。如果之前已经在 Prod

  • Kafka文件说,幂等生产者是可能的,与相同的生产者会话,我无法理解这一点。 比方说,Kafka为每条消息添加序列号,最后一个序列号在Kafka中维护(不确定它维护在哪里)。 它是如何生成序列号的,它保存在哪里? 为什么当制作人崩溃并再次出现时,它不能保持序列? 我怎样才能使它在制作人会话之间真正幂等?

  • Kafka为每条消息生成偏移量。假设,我正在生成消息5,偏移量将从1到5。 但是,在事务生产者中,比如说,我产生了5条消息并提交,然后是5条消息但中止,然后是5条消息提交。 > 那么,最后提交的5条消息的偏移量是6到10还是11到15? 如果我不放弃或不promise呢。这些信息还会被发布吗? Kafka是如何忽略未promise的补偿的?因此,kafka提交日志是基于偏移量的。它是否使用事务使用

  • 这是我的密码。