我有一个关于Kafka流上的Spark结构化流的问题。
我有一个模式类型:
StructType schema = new StructType()
.add("field1", StringType)
.add("field2", StringType)
.add("field3", StringType)
.add("field4", StringType)
.add("field5", StringType);
我从Kafka主题引导我的流,如下所示:
Dataset<Row> ds1 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "brokerlist")
.option("zookeeper.connect", "zk_url")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.option("max.poll.records", 10)
.option("failOnDataLoss", false)
.load();
接下来转换为字符串,字符串类型:
Dataset<Row> df1 = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
现在我想将value字段(这是一个JSON)转换为之前转换的模式,这将使SQL查询更容易:
Dataset<Row> df2 = df1.select(from_json("value", schema=schema).as("data").select("single_column_field");
看来Spark 2.3.1不知道from_json
函数?
这是我的进口:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
有没有办法解决这个问题?请注意,我不是在寻找Scala解决方案,而是一个纯粹的基于Java的解决方案!
这段代码对我有用。希望能有所帮助
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.34.216:9092")
.option("subscribe", "topicName")
.load()
//df.show();
import spark.implicits._
val comingXDR = df.select("value").as[String].withColumn("_tmp", split($"value", "\\,")).withColumn("MyNewColumnName1", $"_tmp".getItem(0)).withColumn("MyNewColumnName2", $"_tmp".getItem(1)).withColumn("MyNewColumnName3", $"_tmp".getItem(2)).withColumn("MyNewColumnName4", $"_tmp".getItem(3)).drop("value").drop("_tmp")
我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\
我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。
我试图从kafka主题获取数据并将其推送到hdfs位置。我面临以下问题。 在每条消息(kafka)之后,hdfs位置都会更新为带有.c000.csv格式的部分文件。我已经在HDFS位置的顶部创建了一个hive表,但是HIVE无法读取从火花结构化流写入的任何数据。 以下是spark结构化流媒体之后的文件格式 以下是我要插入的代码: 谁能帮帮我,为什么要创建这样的文件? 如果我执行dfs-cat/pa
我有一个用于结构化流媒体的Kafka和Spark应用程序。特别是我的KafkaProducer具有以下配置: 然后我创建了一个ProducerRecord,如下所示: 其中,json。toString()表示一个JSON格式的字符串,这是我想在Spark中处理的值。现在,我主要做的是将Spark与Kafka主题联系起来,正如官方Spark结构化流媒体指南中所报道的那样: 然后 我有以下输出和异常:
这是因为检查点只存储了其中一个数据流的偏移量吗?浏览Spark结构流文档,似乎可以在Spark 2.2或>中进行流源的联接/联合
《编程指南》说,结构化流媒体保证使用适当的源/汇实现端到端的一次语义。 然而,我不明白当工作崩溃,我们应用了水印时,这是如何工作的。 下面是一个例子,我目前想象它是如何工作的,请纠正我对任何问题的误解。提前谢谢! 例子: Spark Job:在每个1小时窗口中统计#个事件,带有1小时的水印。 信息: A-时间戳上午10点 B-时间戳上午10:10 C-时间戳上午10:20 X-时间戳12pm Y-