嗨,我正在阅读kafka主题,我想处理从kafka接收到的数据,例如tockenize,过滤掉不必要的数据,删除停用词,最后我想写回另一个kafka主题
// read from kafka
val readStream = existingSparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
.load()
val df = readStream.selectExpr("CAST(value AS STRING)" )
df.show(false)
val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))
// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result
// write back to kafka
val writeStream = cleanedDataframe
.writeStream
.outputMode("append")
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("topic", "writing.val")
.start()
writeStream.awaitTermination()
然后我得到以下错误
线程"main"中的异常org.apache.spark.sql.Analysis Exception:具有流源的查询必须使用WriteStream.start()执行;
然后,我对代码进行了如下编辑,以从Kafka中读取并写入控制台
// read from kafka
val readStream = existingSparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
.load()
// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
val query = df.writeStream
.outputMode("append")
.format("console")
.start().awaitTermination();
// then perform the data processing part as mentioned in the first half
对于第二种方法,在控制台中连续显示数据,但它从未穿过数据处理部分。我是否知道如何读取Kafka主题,然后对接收到的数据执行一些操作(标记化、删除停止词),最后写回新的Kafka主题?
编辑
Stack Trace在错误期间指向上面代码中的df.show(false)
当前实现中有两个常见问题:
show
到1。
show方法是对数据帧执行的操作(与转换相对)。当您处理流式数据帧时,这将导致错误,因为流式查询需要使用开始
(正如expetion文本所告诉您的那样)。
至2。
方法waittermination是一种阻塞方法,这意味着后续代码不会在每个微批中执行。
总体解决方案
如果您想对Kafka进行读写,并且想要通过在控制台中显示数据来了解正在处理哪些数据,您可以执行以下操作:
// read from kafka
val readStream = existingSparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
.load()
// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
df.writeStream
.outputMode("append")
.format("console")
.start()
val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))
// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result
// write back to kafka
// the columns `key` and `value` of the DataFrame `cleanedDataframe` will be used for producing the message into the Kafka topic.
val writeStreamKafka = cleanedDataframe
.writeStream
.outputMode("append")
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("topic", "writing.val")
.start()
existingSparkSession.awaitAnyTermination()
请注意现有SparkSession。waitanytermination()
位于代码的最末尾,而不直接在启动后使用waitanytermination。此外,请记住,数据框的键和值列将用于将消息生成到Kafka主题中。但是,不需要列键,另请参见此处
此外,如果您使用检查点(推荐),那么您需要设置两个不同的位置:一个用于控制台流,另一个用于kafka输出流。重要的是要记住这些流查询独立运行。
下面的json数据示例 下面的错误消息 线程“main”org.apache.spark.sql.analysisException中出现异常:未能找到数据源:Kafka。请按照“结构化流+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookupdatasource(datasourc
我已经在Ubuntu上设置了Kafka和Spark。我正在尝试阅读Kafka的主题通过火花流使用pyspark(Jupyter笔记本)。Spark既没有读取数据,也没有抛出任何错误。 null Kafka生产者:bin/kafka-console-producer.sh--broker-list localhost:9092--topic new_topic Kafka使用者:bin/kafka-
我们需要从Kafka主题导出生产数据以用于测试目的:数据用Avro编写,模式放在模式注册表中。 我们尝试了以下策略: 使用和或。我们无法获得可以用Java解析的文件:解析时总是出现异常,这表明文件格式错误。 使用:它生成一个还包括一些字节的json,例如在反序列化BigDecimal时。我们甚至不知道要选择哪个解析选项(不是avro,也不是json) null 使用Kafka连接接收器。我们没有找
我有一些关于Kafka主题分区->spark流媒体资源利用的用例,我想更清楚地说明这些用例。 我使用spark独立模式,所以我只有“执行者总数”和“执行者内存”的设置。据我所知并根据文档,将并行性引入Spark streaming的方法是使用分区的Kafka主题->RDD将具有与Kafka相同数量的分区,当我使用spark-kafka直接流集成时。 因此,如果我在主题中有一个分区和一个执行器核心,
我们需要在Kafka主题上实现连接,同时考虑延迟数据或“不在连接中”,这意味着流中延迟或不在连接中的数据不会被丢弃/丢失,但会被标记为超时, 连接的结果被产生以输出Kafka主题(如果发生超时字段)。 (独立部署中的火花2.1.1,Kafka 10) Kafka在主题:X,Y,...输出主题结果将如下所示: 我发现三个解决方案写在这里,1和2从火花流官方留档,但与我们不相关(数据不在加入Dtsre
有什么方法可以让我的Kafka Stream应用程序自动从新创建的主题中读取? 即使主题是在流应用程序已经运行时创建的? 类似于在主题名称中使用通配符,如下所示: 现在,我有多个客户端将数据(都使用相同的模式)发送到它们自己的主题,我的流应用程序从这些主题中读取数据。然后,我的应用程序进行一些转换,并将结果写入单个主题。 虽然所有的客户都可以写同一个主题,但一个没有偏见的客户也可以代表其他人写。所