当前位置: 首页 > 知识库问答 >
问题:

用Scala for Kafka API 0.10编写的Spark的Kafka消费者:自定义AVRO反序列化器

越源
2023-03-14

我正在将Spark Scala应用程序Kafka API升级到0.10版。我曾经创建自定义方法来反序列化字节字符串格式的消息。

我已经意识到有一种方法可以将StringDeserializer或ByteArrayDeserializer作为参数传递给键或值。

但是,我找不到有关如何创建自定义Avro模式反序列化器的任何信息,以便我的kafkaStream在创建DirectStream和使用Kafka中的数据时可以使用它。

有可能吗?

共有1个答案

翟浩穰
2023-03-14

这是可能的。您需要重写反序列化器

import org.apache.kafka.common.serialization.Deserializer

class AvroDeserializer extends Deserializer[Array[Byte]] {
  override def configure(map: util.Map[String, _], b: Boolean): Unit = ???
  override def close(): Unit = ???
  override def deserialize(s: String, bytes: Array[Byte]): Array[Byte] = ???
}

然后:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import my.location.with.AvroDeserializer

val ssc: StreamingContext = ???
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[AvroDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("sometopic")
val stream = KafkaUtils.createDirectStream[String, MyTypeWithAvroDeserializer](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
 类似资料:
  • 我创建了一个简单的生产者-消费者应用程序,使用自定义序列化器和反序列化器。 在我生成的Message类中添加了一个新方法之后,使用者开始在反序列化时被堆栈。我的生产者使用的是新类(有新方法),消费者使用的是旧类(没有方法)。 JSONSerializer/Deserializer可以处理这些类型的修复吗?如果我要使用JSONSerialzier,它应该只关心类的模式,对吗?

  • 我在一个Kafka消息中使用了我自己的类,它有一堆字符串数据类型。 我想我需要编写自己的序列化器并将其提供给生产者属性?

  • 我们正在考虑在我们的for消息传递中使用Kafka,我们的应用程序是使用Spring开发的。所以,我们已经计划用Spring-Kafka。 生产者将消息作为HashMap对象放入队列。我们有JSON序列化器,并且假设映射将被序列化并放入队列。这是生产者配置。 我们看到的文章很少,建议是这样做: 我们不想为创建反序列化程序编写一些代码。有没有我们缺少的样板?任何帮助都将不胜感激!!

  • 我有一个具体的类,我正在序列化字节数组,以发送到一个Kafka主题。对于序列化,我使用ReflectDatumWriter。在发送bytes[]之前,我在查看了一些在线教程之后,将模式ID放在前4个字节中。 ./bin/kafka-avro-console-consumer--bootstrap-server 0:9092--property schema.stry.url=http://0:80

  • 我正在使用Spring Kafka consumer和Avro模式构建我的应用程序。 但是,如果消息无法反序列化到我构建的指定Avro特定记录,消费者将不断地反复尝试相同的消息(无限重试)。 在这种情况下,如果我的使用者出现反序列化程序异常,我如何配置使用者应用程序以跳过当前消息并移动到下一个偏移量。 我已经研究了Spring Kafka错误句柄,它只能处理侦听器中的异常,而不是在反序列化阶段。

  • 我正在构建一个简单的项目与Spring boot和sping-kafka,我不能配置它,使其工作,它是一个简单的应用程序,生成笔记(作者,内容,createddatetime,lastmodefieddatetime)和发送基于笔记的事件,当他们被创建。 我已经玩了两天了,但我想我还没学会。 这是我的配置,我很确定它有很多锅炉板,但我已经用了几个例子来使我的工作。 我有2个生产者和消费者工厂,因为