我是火花的新手。我使用结构化流从Kafka读取数据。
我可以在Scala中使用此代码读取数据:
val data = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.option("startingOffsets", startingOffsets)
.load()
我在值列中的数据是Thrift记录。Streaming api以二进制格式提供数据。我看到了将数据转换为string或json的示例,但我找不到任何关于如何将数据反序列化为Thrift的示例。
我如何才能实现这一点?
我在数据砖网站上找到了这个博客。它展示了如何利用 Spark SQL 的 API 来使用和转换来自Kafka的复杂数据流。
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
有一节解释了如何使用UDF来反序列化行:
object MyDeserializerWrapper {
val deser = new MyDeserializer
}
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) =>
MyDeserializerWrapper.deser.deserialize(topic, bytes)
)
df.selectExpr("""deserialize("topic1", value) AS message""")
我正在使用java,因此必须编写以下示例UDF,以检查如何在java中调用它:
UDF1<byte[], String> mode = new UDF1<byte[], String>() {
@Override
public String call(byte[] bytes) throws Exception {
String s = new String(bytes);
return "_" + s;
}
};
现在,我可以在结构化流字计数示例中使用此UDF,如下所示:
Dataset<String> words = df
//converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
// .selectExpr("CAST(value AS STRING)")
.select( callUDF("mode", col("value")) )
.as(Encoders.STRING())
.flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING());
我的下一步是为节俭反序列化编写一个UDF。我会在完成后立即发布。
好吧,这是后续解决方案。我不能发布我自己的代码,但这里是你可以使用的公共代码,这是给所有者/编码者的信用。
https://github.com/airbnb/airbnb-spark-thrift/blob/master/src/main/scala/com/airbnb/spark/thrift/
首先你需要通过调用< code>convertObject函数将数组[byte]/value转换为Row,我们姑且称之为makeRow
其次,您需要通过调用转换
函数来获取您的节俭类结构类型/模式,让我们调用最终结果模式
然后,您需要注册一个 UDF,如下所示 val 反序列化程序 = udf((字节: 数组[字节]) =
注意:您不能在不传递模式的情况下使用makeRow,否则Spark会抱怨:
不支持类型org.apache.spark.sql.Row的模式
然后,您可以通过以下方式使用它:
val东西=kafkaStuff.withColumn("data",反序列化器(kafkaStuff("value")))val finalStuff=stuff.select("data.*")
而且…你完了!希望这有帮助。
给这篇文章另一个荣誉Spark UDF for结构类型/行,当我之前的解决方案如此接近时,它给了我最终的想法。
我用的是Spark 2.1。 我正在尝试使用 Spark 结构化流从 Kafka 读取记录,反序列化它们并在之后应用聚合。 我有以下代码: 我想要的是将字段反序列化到我的对象中,而不是转换为。 我有一个自定义的反序列化程序。 我如何在Java中做到这一点? 我找到的唯一相关链接是这个 https://databricks.com/blog/2017/04/26/processing-data-in
我是Kafka流媒体的新手。我使用python设置了一个twitter监听器,它运行在localhost:9092kafka服务器中。我可以使用kafka客户端工具(conduktor)并使用命令“bin/kafka-console-consumer.sh--bootstrap-server localhost:9092-topic twitter--from-begind”来使用侦听器生成的流,
我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。 虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。 火花代码 Spark中使用的模式 Kafka中的示例主题数据 以下是版本信息 感谢您的帮助。
我有一个Kafka2.1消息代理,希望在Spark2.4中对消息的数据进行一些处理。我想使用齐柏林0.8.1笔记本快速原型。 我下载了结构化流所必需的spark-streaming-kafka-0-102.11.jar(http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html),并将其作为“dep
在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之