当前位置: 首页 > 知识库问答 >
问题:

Spark结构化流-如何将字节值排队到Kafka?

温嘉赐
2023-03-14

我正在编写一个使用结构化流的 Spark 应用程序。应用从 Kafka 主题主题 1 读取消息,构造新消息,将其序列化为 Array[Byte],并将其发布到另一个 Kafka 主题主题 2

字节数组的序列化很重要,因为我使用了topic2的下游消费者也使用的特定序列化器/反序列化器。

不过,我很难向Kafka生产。我甚至不知道该怎么做。网上只有很多关于排队JSON数据的例子。

代码-

case class OutputMessage(id: String, bytes: Array[Byte])

implicit val encoder: Encoder[OutputMessage] = org.apache.spark.sql.Encoders.kryo

val outputMessagesDataSet: DataSet[OutputMessage] = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server1")
  .option("subscribe", "topic1")
  .load()
  .select($"value")
  .mapPartitions{r =>
     val messages: Iterator[OutputMessage] = createMessages(r)
     messages
  }

outputMessagesDataSet
  .writeStream
  .selectExpr("CAST(id AS String) AS key", "bytes AS value")
  .format("kafka")
  .option("kafka.bootstrap.servers", "server1")
  .option("topic", "topic2")
  .option("checkpointLocation", loc)
  .trigger(trigger)
  .start
  .awaitTermination

但是,这会引发异常org.apache.spark.sql.AnalysisException:无法解析给定输入列的“id”:[value];第1行pos 5;

如何以id作为键并以bytes作为值排队到Kafka?

共有2个答案

甄成弘
2023-03-14

正如@ EmiCareOfCell44所建议的,我打印出了这个模式

如果我做消息数据集.printSchema(), 那么我只得到一个二进制类型的值。但是如果我这样做

val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "topic1")
.load()

df.printSchema()

然后打印

 root
  |-- key: binary (nullable = true)
  |-- value: binary (nullable = true)
  |-- topic: string (nullable = true)
  |-- partition: integer (nullable = true)
  |-- offset: long (nullable = true)
  |-- timestamp: timestamp (nullable = true)
  |-- timestampType: integer (nullable = true)

但是数据帧没有经历所需的转换,这是在

.mapPartitions{r =>
 val messages: Iterator[OutputMessage] = createMessages(r)
 messages
}

看起来数据集的值只有一个二进制值。

我在这里搜索了一些答案,然后发现Spark Dataset mapGroups操作后这个后值类型是二进制的,甚至在函数中返回一个字符串

我设置了一个编码器-

implicit val encoder: Encoder[OutputMessage] = org.apache.spark.sql.Encoders.kryo

这导致该值转换为二进制。由于输出消息是一个 scala 类,编码器不是必需的,所以我删除了它。之后,打印出架构显示两个字段(字符串和字节,这是我想要的)。之后,行 .selectExpr(“CAST(ID 为字符串)为键”、“字节为值”)运行良好。

谷梁英资
2023-03-14

您可以检查“收集”消息的数据帧的架构。由于您仅收集“值”字段,因此传入事件将以以下形式到达:

    +-------------------+
    | value             |
    +-------------------+
    | field1,field2,..  |
    +-------------------+
  

Yo 还需要查询密钥,就像在 Spark 文档中一样:

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

df.select(col("key").cast(StringType), col("value").cast(StringType))
 类似资料:
  • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之

  • 我正在使用Kafka和Spark 2.1结构化流。我有两个json格式的数据主题,例如: 我需要比较Spark中基于标记的两个流:name,当值相等时,执行一些额外的定义/函数。 如何使用Spark结构化流来做到这一点? 谢谢

  • 场景与经典的流连接略有不同 交易流: transTS, userid, productid,... streamB:创建的新产品流:productid、productname、createTS等) 我想加入与产品的交易,但我找不到水印/加入条件的组合来实现这一点。 结果为空。 我做错了什么?

  • 我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

  • 我试图从[Database ricks][1]中复制示例并将其应用于Kafka的新连接器并引发结构化流,但是我无法使用Spark中的开箱即用方法正确解析JSON… 注:题目以JSON格式写入Kafka。 下面的代码不行,我相信那是因为列json是字符串,和方法from_json签名不匹配... 有什么建议吗? [更新]示例工作:https://github.com/katsou55/kafka-s

  • 我是Kafka流媒体的新手。我使用python设置了一个twitter监听器,它运行在localhost:9092kafka服务器中。我可以使用kafka客户端工具(conduktor)并使用命令“bin/kafka-console-consumer.sh--bootstrap-server localhost:9092-topic twitter--from-begind”来使用侦听器生成的流,