当前位置: 首页 > 知识库问答 >
问题:

从Kafka主题读取数据并使用scala和火花写回Kafka主题

李云
2023-03-14

嗨,我正在阅读kafka主题,我想处理从kafka接收到的数据,例如tockenize,过滤掉不必要的数据,删除停用词,最后我想写回另一个kafka主题

// read from kafka
val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
      .load()

val df = readStream.selectExpr("CAST(value AS STRING)" )
df.show(false)
val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))

// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result

// write back to kafka
val writeStream = cleanedDataframe
      .writeStream
      .outputMode("append")
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("topic", "writing.val")
      .start()
    writeStream.awaitTermination()

然后我得到以下错误

线程"main"中的异常org.apache.spark.sql.Analysis Exception:具有流源的查询必须使用WriteStream.start()执行;

然后,我对代码进行了如下编辑,以从Kafka中读取并写入控制台

// read from kafka
val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
      .load()

// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
val query = df.writeStream
      .outputMode("append")
      .format("console")
      .start().awaitTermination();

// then perform the data processing part as mentioned in the first half

对于第二种方法,在控制台中连续显示数据,但它从未穿过数据处理部分。我是否知道如何读取Kafka主题,然后对接收到的数据执行一些操作(标记化、删除停止词),最后写回新的Kafka主题?

编辑

Stack Trace在错误期间指向上面代码中的df.show(false)

共有1个答案

元阳荣
2023-03-14

当前实现中有两个常见问题:

  1. 在流上下文中应用show
  2. 等待终止后的代码将不会被执行

到1。

show方法是对数据帧执行的操作(与转换相对)。当您处理流式数据帧时,这将导致错误,因为流式查询需要使用开始(正如expetion文本所告诉您的那样)。

至2。

方法waittermination是一种阻塞方法,这意味着后续代码不会在每个微批中执行。

总体解决方案

如果您想对Kafka进行读写,并且想要通过在控制台中显示数据来了解正在处理哪些数据,您可以执行以下操作:

// read from kafka
val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
      .load()

// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
df.writeStream
      .outputMode("append")
      .format("console")
      .start()

val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))

// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result

// write back to kafka
// the columns `key` and `value` of the DataFrame `cleanedDataframe` will be used for producing the message into the Kafka topic.
val writeStreamKafka = cleanedDataframe
      .writeStream
      .outputMode("append")
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("topic", "writing.val")
      .start()

existingSparkSession.awaitAnyTermination()

请注意现有SparkSession。waitanytermination()位于代码的最末尾,而不直接在启动后使用waitanytermination。此外,请记住,数据框的键和值列将用于将消息生成到Kafka主题中。但是,不需要列键,另请参见此处

此外,如果您使用检查点(推荐),那么您需要设置两个不同的位置:一个用于控制台流,另一个用于kafka输出流。重要的是要记住这些流查询独立运行。

 类似资料:
  • 下面的json数据示例 下面的错误消息 线程“main”org.apache.spark.sql.analysisException中出现异常:未能找到数据源:Kafka。请按照“结构化流+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookupdatasource(datasourc

  • 我已经在Ubuntu上设置了Kafka和Spark。我正在尝试阅读Kafka的主题通过火花流使用pyspark(Jupyter笔记本)。Spark既没有读取数据,也没有抛出任何错误。 null Kafka生产者:bin/kafka-console-producer.sh--broker-list localhost:9092--topic new_topic Kafka使用者:bin/kafka-

  • 我们需要从Kafka主题导出生产数据以用于测试目的:数据用Avro编写,模式放在模式注册表中。 我们尝试了以下策略: 使用和或。我们无法获得可以用Java解析的文件:解析时总是出现异常,这表明文件格式错误。 使用:它生成一个还包括一些字节的json,例如在反序列化BigDecimal时。我们甚至不知道要选择哪个解析选项(不是avro,也不是json) null 使用Kafka连接接收器。我们没有找

  • 我有一些关于Kafka主题分区->spark流媒体资源利用的用例,我想更清楚地说明这些用例。 我使用spark独立模式,所以我只有“执行者总数”和“执行者内存”的设置。据我所知并根据文档,将并行性引入Spark streaming的方法是使用分区的Kafka主题->RDD将具有与Kafka相同数量的分区,当我使用spark-kafka直接流集成时。 因此,如果我在主题中有一个分区和一个执行器核心,

  • 我们需要在Kafka主题上实现连接,同时考虑延迟数据或“不在连接中”,这意味着流中延迟或不在连接中的数据不会被丢弃/丢失,但会被标记为超时, 连接的结果被产生以输出Kafka主题(如果发生超时字段)。 (独立部署中的火花2.1.1,Kafka 10) Kafka在主题:X,Y,...输出主题结果将如下所示: 我发现三个解决方案写在这里,1和2从火花流官方留档,但与我们不相关(数据不在加入Dtsre

  • 有什么方法可以让我的Kafka Stream应用程序自动从新创建的主题中读取? 即使主题是在流应用程序已经运行时创建的? 类似于在主题名称中使用通配符,如下所示: 现在,我有多个客户端将数据(都使用相同的模式)发送到它们自己的主题,我的流应用程序从这些主题中读取数据。然后,我的应用程序进行一些转换,并将结果写入单个主题。 虽然所有的客户都可以写同一个主题,但一个没有偏见的客户也可以代表其他人写。所