当前位置: 首页 > 知识库问答 >
问题:

如何在Java中使用结构化流反序列化来自Kafka的记录?

双恩
2023-03-14

我用的是Spark 2.1。

我正在尝试使用 Spark 结构化流从 Kafka 读取记录,反序列化它们并在之后应用聚合。

我有以下代码:

SparkSession spark = SparkSession
        .builder()
        .appName("Statistics")
        .getOrCreate();

Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", kafkaUri)
        .option("subscribe", "Statistics")
        .option("startingOffsets", "earliest")
        .load();

df.selectExpr("CAST(value AS STRING)")

我想要的是将value字段反序列化到我的对象中,而不是转换为String

我有一个自定义的反序列化程序

public StatisticsRecord deserialize(String s, byte[] bytes)

我如何在Java中做到这一点?

我找到的唯一相关链接是这个 https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html,但这是针对Scala的。

共有2个答案

訾俊名
2023-03-14

如果您的数据有Java中的自定义反序列化器,请在<code>加载<code>后从Kafka获得的字节上使用它。

df.select("value")

该行为您提供数据集

我专门使用Scala的Spark API,因此我将在Scala中执行以下操作来处理“反序列化”情况:

import org.apache.spark.sql.Encoders
implicit val statisticsRecordEncoder = Encoders.product[StatisticsRecord]
val myDeserializerUDF = udf { bytes => deserialize("hello", bytes) }
df.select(myDeserializerUDF($"value") as "value_des")

这应该给你你想要的...在斯卡拉。将其转换为Java是您的家庭练习:)

请注意,自定义对象必须有可用的编码器,否则Spark SQL将拒绝将其对象放入数据集中。

蓝宜
2023-03-14

为JSON消息定义模式。

StructType schema = DataTypes.createStructType(new StructField[] { 
                DataTypes.createStructField("Id", DataTypes.IntegerType, false),
                DataTypes.createStructField("Name", DataTypes.StringType, false),
                DataTypes.createStructField("DOB", DataTypes.DateType, false) });

现在阅读下面的消息。MessageData是JSON消息的JavaBean。

Dataset<MessageData> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaUri)
            .option("subscribe", "Statistics")
            .option("startingOffsets", "earliest")
            .load()
            .selectExpr("CAST(value AS STRING) as message")
            .select(functions.from_json(functions.col("message"),schema).as("json"))
            .select("json.*")
            .as(Encoders.bean(MessageData.class));  
 类似资料:
  • 我是火花的新手。我使用结构化流从Kafka读取数据。 我可以在Scala中使用此代码读取数据: 我在值列中的数据是Thrift记录。Streaming api以二进制格式提供数据。我看到了将数据转换为string或json的示例,但我找不到任何关于如何将数据反序列化为Thrift的示例。 我如何才能实现这一点?

  • 批处理查询中似乎不支持“最新”。我想知道是否有可能用另一种方法做类似的事情(不直接处理偏移)

  • 我对rust是完全陌生的,我正试图找出如何从URLendpoint反序列化任意JSON结构。 reqwest自述文件中的相应示例如下: 因此,在本例中,目标结构--即以字符串作为键、以字符串作为值的HashMap对象--显然是已知的。 但是,如果我不知道在请求endpoint上接收到的结构是什么样子呢?

  • 我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。 虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。 火花代码 Spark中使用的模式 Kafka中的示例主题数据 以下是版本信息 感谢您的帮助。

  • 我是Kafka流媒体的新手。我使用python设置了一个twitter监听器,它运行在localhost:9092kafka服务器中。我可以使用kafka客户端工具(conduktor)并使用命令“bin/kafka-console-consumer.sh--bootstrap-server localhost:9092-topic twitter--from-begind”来使用侦听器生成的流,