我正在编写一个使用结构化流的 Spark 应用程序。应用从 Kafka 主题主题 1
读取消息,构造新消息,将其序列化为 Array[Byte]
,并将其发布到另一个 Kafka 主题主题 2
。
字节数组的序列化很重要,因为我使用了topic2
的下游消费者也使用的特定序列化器/反序列化器。
不过,我很难向Kafka生产。我甚至不知道该怎么做。网上只有很多关于排队JSON数据的例子。
代码-
case class OutputMessage(id: String, bytes: Array[Byte])
implicit val encoder: Encoder[OutputMessage] = org.apache.spark.sql.Encoders.kryo
val outputMessagesDataSet: DataSet[OutputMessage] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "topic1")
.load()
.select($"value")
.mapPartitions{r =>
val messages: Iterator[OutputMessage] = createMessages(r)
messages
}
outputMessagesDataSet
.writeStream
.selectExpr("CAST(id AS String) AS key", "bytes AS value")
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("topic", "topic2")
.option("checkpointLocation", loc)
.trigger(trigger)
.start
.awaitTermination
但是,这会引发异常org.apache.spark.sql.AnalysisException:无法解析给定输入列的“id”:[value];第1行pos 5;
如何以id
作为键并以bytes
作为值排队到Kafka?
正如@ EmiCareOfCell44所建议的,我打印出了这个模式
如果我做消息数据集.printSchema(),
那么我只得到一个二进制
类型的值。但是如果我这样做
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "topic1")
.load()
df.printSchema()
然后打印
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
但是数据帧没有经历所需的转换,这是在
.mapPartitions{r =>
val messages: Iterator[OutputMessage] = createMessages(r)
messages
}
看起来数据集的值只有一个二进制值。
我在这里搜索了一些答案,然后发现Spark Dataset mapGroups操作后这个后值类型是二进制的,甚至在函数中返回一个字符串
我设置了一个编码器-
implicit val encoder: Encoder[OutputMessage] = org.apache.spark.sql.Encoders.kryo
这导致该值转换为二进制。由于输出消息
是一个 scala 类,编码器不是必需的,所以我删除了它。之后,打印出架构显示两个字段(字符串和字节,这是我想要的)。之后,行 .selectExpr(“CAST(ID 为字符串)为键”、“字节为值”)
运行良好。
您可以检查“收集”消息的数据帧的架构。由于您仅收集“值”字段,因此传入事件将以以下形式到达:
+-------------------+
| value |
+-------------------+
| field1,field2,.. |
+-------------------+
Yo 还需要查询密钥,就像在 Spark 文档中一样:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
或
df.select(col("key").cast(StringType), col("value").cast(StringType))
在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之
我正在使用Kafka和Spark 2.1结构化流。我有两个json格式的数据主题,例如: 我需要比较Spark中基于标记的两个流:name,当值相等时,执行一些额外的定义/函数。 如何使用Spark结构化流来做到这一点? 谢谢
场景与经典的流连接略有不同 交易流: transTS, userid, productid,... streamB:创建的新产品流:productid、productname、createTS等) 我想加入与产品的交易,但我找不到水印/加入条件的组合来实现这一点。 结果为空。 我做错了什么?
我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。
我试图从[Database ricks][1]中复制示例并将其应用于Kafka的新连接器并引发结构化流,但是我无法使用Spark中的开箱即用方法正确解析JSON… 注:题目以JSON格式写入Kafka。 下面的代码不行,我相信那是因为列json是字符串,和方法from_json签名不匹配... 有什么建议吗? [更新]示例工作:https://github.com/katsou55/kafka-s
我是Kafka流媒体的新手。我使用python设置了一个twitter监听器,它运行在localhost:9092kafka服务器中。我可以使用kafka客户端工具(conduktor)并使用命令“bin/kafka-console-consumer.sh--bootstrap-server localhost:9092-topic twitter--from-begind”来使用侦听器生成的流,