我用的是Spark 2.1。
我正在尝试使用 Spark 结构化流从 Kafka 读取记录,反序列化它们并在之后应用聚合。
我有以下代码:
SparkSession spark = SparkSession
.builder()
.appName("Statistics")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUri)
.option("subscribe", "Statistics")
.option("startingOffsets", "earliest")
.load();
df.selectExpr("CAST(value AS STRING)")
我想要的是将value
字段反序列化到我的对象中,而不是转换为String
。
我有一个自定义的反序列化程序。
public StatisticsRecord deserialize(String s, byte[] bytes)
我如何在Java中做到这一点?
我找到的唯一相关链接是这个 https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html,但这是针对Scala的。
如果您的数据有Java中的自定义反序列化器,请在<code>加载<code>后从Kafka获得的字节上使用它。
df.select("value")
该行为您提供数据集
我专门使用Scala的Spark API,因此我将在Scala中执行以下操作来处理“反序列化”情况:
import org.apache.spark.sql.Encoders
implicit val statisticsRecordEncoder = Encoders.product[StatisticsRecord]
val myDeserializerUDF = udf { bytes => deserialize("hello", bytes) }
df.select(myDeserializerUDF($"value") as "value_des")
这应该给你你想要的...在斯卡拉。将其转换为Java是您的家庭练习:)
请注意,自定义对象必须有可用的编码器,否则Spark SQL将拒绝将其对象放入数据集中。
为JSON消息定义模式。
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("Id", DataTypes.IntegerType, false),
DataTypes.createStructField("Name", DataTypes.StringType, false),
DataTypes.createStructField("DOB", DataTypes.DateType, false) });
现在阅读下面的消息。MessageData是JSON消息的JavaBean。
Dataset<MessageData> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUri)
.option("subscribe", "Statistics")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),schema).as("json"))
.select("json.*")
.as(Encoders.bean(MessageData.class));
我是火花的新手。我使用结构化流从Kafka读取数据。 我可以在Scala中使用此代码读取数据: 我在值列中的数据是Thrift记录。Streaming api以二进制格式提供数据。我看到了将数据转换为string或json的示例,但我找不到任何关于如何将数据反序列化为Thrift的示例。 我如何才能实现这一点?
批处理查询中似乎不支持“最新”。我想知道是否有可能用另一种方法做类似的事情(不直接处理偏移)
我对rust是完全陌生的,我正试图找出如何从URLendpoint反序列化任意JSON结构。 reqwest自述文件中的相应示例如下: 因此,在本例中,目标结构--即以字符串作为键、以字符串作为值的HashMap对象--显然是已知的。 但是,如果我不知道在请求endpoint上接收到的结构是什么样子呢?
我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。 虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。 火花代码 Spark中使用的模式 Kafka中的示例主题数据 以下是版本信息 感谢您的帮助。
我是Kafka流媒体的新手。我使用python设置了一个twitter监听器,它运行在localhost:9092kafka服务器中。我可以使用kafka客户端工具(conduktor)并使用命令“bin/kafka-console-consumer.sh--bootstrap-server localhost:9092-topic twitter--from-begind”来使用侦听器生成的流,