我想出一个例外:
// @transient
val sparkConf = new SparkConf()
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// @transient
val sc = new SparkContext(sparkConf)
val requestSet: RDD[String] = sc.textFile(s"hdfs:/user/bigdata/ADVERTISE-IMPRESSION-STAT*/*")
// @transient
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, NearLineConfig.kafka_brokers)
// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put("producer.type", "async")
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "49152")
// @transient
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
requestSet.foreachPartition((partisions: Iterator[String]) => {
partisions.foreach((line: String) => {
try {
producer.send(new ProducerRecord[String, String]("testtopic", line))
} catch {
case ex: Exception => {
log.warn(ex.getMessage, ex)
}
}
})
})
producer.close()
在这个程序中,我尝试从hdfs路径读取记录,并将它们保存到Kafka中。问题是当我移除关于向Kafka发送记录的代码时,它运行得很好。我错过了什么?
kafkaproducer
不可序列化。您需要将实例的创建移到foreachpartition
内部:
requestSet.foreachPartition((partitions: Iterator[String]) => {
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
partitions.foreach((line: String) => {
try {
producer.send(new ProducerRecord[String, String]("testtopic", line))
} catch {
case ex: Exception => {
log.warn(ex.getMessage, ex)
}
}
})
})
请注意,KafkaProducer.send
返回一个future[RecordMetadata]
,如果键或值不能序列化,则唯一可以从它传播的异常是SerializationException
。
我们在Spark 2.1中使用Kafka0.10,我发现我们的制作人发布消息总是很慢。在给Spark executors提供8个内核后,我只能达到1k/s左右,而另一篇帖子则说它们很容易达到百万/秒。我试着调一下玲珑的曲调。ms和batch。大小来找出答案。然而我发现了玲儿。ms=0对我和这批人来说似乎是最佳选择。大小没有多大影响。我每次迭代发送160k个事件。看来我得让Kafka制作人知道到底发
我有一门课: 它运行得很好,但抛出了一个例外:在我对RDD的映射做了一个小更改之后: 我以为这两个功能应该是一样的,但似乎不是。为什么它们不同?
一、生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程: Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。 接下来,数据被传给分区器。如果之前已经在 Prod
Kafka文件说,幂等生产者是可能的,与相同的生产者会话,我无法理解这一点。 比方说,Kafka为每条消息添加序列号,最后一个序列号在Kafka中维护(不确定它维护在哪里)。 它是如何生成序列号的,它保存在哪里? 为什么当制作人崩溃并再次出现时,它不能保持序列? 我怎样才能使它在制作人会话之间真正幂等?
Kafka为每条消息生成偏移量。假设,我正在生成消息5,偏移量将从1到5。 但是,在事务生产者中,比如说,我产生了5条消息并提交,然后是5条消息但中止,然后是5条消息提交。 > 那么,最后提交的5条消息的偏移量是6到10还是11到15? 如果我不放弃或不promise呢。这些信息还会被发布吗? Kafka是如何忽略未promise的补偿的?因此,kafka提交日志是基于偏移量的。它是否使用事务使用
这是我的密码。