当前位置: 首页 > 知识库问答 >
问题:

Spark(2.2):使用结构化流反序列化来自Kafka的Thrift记录

薛祯
2023-03-14

我是火花的新手。我使用结构化流从Kafka读取数据。

我可以在Scala中使用此代码读取数据:

val data = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("subscribe", topics)
      .option("startingOffsets", startingOffsets) 
      .load()

我在值列中的数据是Thrift记录。Streaming api以二进制格式提供数据。我看到了将数据转换为string或json的示例,但我找不到任何关于如何将数据反序列化为Thrift的示例。

我如何才能实现这一点?

共有2个答案

臧弘和
2023-03-14

我在数据砖网站上找到了这个博客。它展示了如何利用 Spark SQL 的 API 来使用和转换来自Kafka的复杂数据流。

https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

有一节解释了如何使用UDF来反序列化行:

object MyDeserializerWrapper {
  val deser = new MyDeserializer
}
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) => 
  MyDeserializerWrapper.deser.deserialize(topic, bytes)
)

df.selectExpr("""deserialize("topic1", value) AS message""")

我正在使用java,因此必须编写以下示例UDF,以检查如何在java中调用它:

UDF1<byte[], String> mode = new UDF1<byte[], String>() {
            @Override
            public String call(byte[] bytes) throws Exception {
                String s = new String(bytes);
                return "_" + s;
            }
        };

现在,我可以在结构化流字计数示例中使用此UDF,如下所示:

Dataset<String> words = df
                //converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
//                .selectExpr("CAST(value AS STRING)")
                .select( callUDF("mode", col("value")) )
                .as(Encoders.STRING())
                .flatMap(
                        new FlatMapFunction<String, String>() {
                            @Override
                            public Iterator<String> call(String x) {
                                return Arrays.asList(x.split(" ")).iterator();
                            }
                        }, Encoders.STRING());

我的下一步是为节俭反序列化编写一个UDF。我会在完成后立即发布。

松元明
2023-03-14

好吧,这是后续解决方案。我不能发布我自己的代码,但这里是你可以使用的公共代码,这是给所有者/编码者的信用。

https://github.com/airbnb/airbnb-spark-thrift/blob/master/src/main/scala/com/airbnb/spark/thrift/

首先你需要通过调用< code>convertObject函数将数组[byte]/value转换为Row,我们姑且称之为makeRow

其次,您需要通过调用转换函数来获取您的节俭类结构类型/模式,让我们调用最终结果模式

然后,您需要注册一个 UDF,如下所示 val 反序列化程序 = udf((字节: 数组[字节]) =

注意:您不能在不传递模式的情况下使用makeRow,否则Spark会抱怨:不支持类型org.apache.spark.sql.Row的模式

然后,您可以通过以下方式使用它:

val东西=kafkaStuff.withColumn("data",反序列化器(kafkaStuff("value")))val finalStuff=stuff.select("data.*")

而且…你完了!希望这有帮助。

给这篇文章另一个荣誉Spark UDF for结构类型/行,当我之前的解决方案如此接近时,它给了我最终的想法。

 类似资料:
  • 我用的是Spark 2.1。 我正在尝试使用 Spark 结构化流从 Kafka 读取记录,反序列化它们并在之后应用聚合。 我有以下代码: 我想要的是将字段反序列化到我的对象中,而不是转换为。 我有一个自定义的反序列化程序。 我如何在Java中做到这一点? 我找到的唯一相关链接是这个 https://databricks.com/blog/2017/04/26/processing-data-in

  • 我是Kafka流媒体的新手。我使用python设置了一个twitter监听器,它运行在localhost:9092kafka服务器中。我可以使用kafka客户端工具(conduktor)并使用命令“bin/kafka-console-consumer.sh--bootstrap-server localhost:9092-topic twitter--from-begind”来使用侦听器生成的流,

  • 我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。 虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。 火花代码 Spark中使用的模式 Kafka中的示例主题数据 以下是版本信息 感谢您的帮助。

  • 我有一个Kafka2.1消息代理,希望在Spark2.4中对消息的数据进行一些处理。我想使用齐柏林0.8.1笔记本快速原型。 我下载了结构化流所必需的spark-streaming-kafka-0-102.11.jar(http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html),并将其作为“dep

  • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之