我希望使用Spark结构化流从Kafka读取数据并处理它并写入Hive表。
val spark = SparkSession
.builder
.appName("Kafka Test")
.config("spark.sql.streaming.metricsEnabled", true)
.config("spark.streaming.backpressure.enabled", "true")
.enableHiveSupport()
.getOrCreate()
val events = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xxxxxxx")
.option("startingOffsets", "latest")
.option("subscribe", "yyyyyy")
.load
val data = events.select(.....some columns...)
data.writeStream
.format("parquet")
.option("compression", "snappy")
.outputMode("append")
.partitionBy("ds")
.option("path", "maprfs:/xxxxxxx")
.start()
.awaitTermination()
这确实会创建拼花文件,但是我如何更改它以模拟类似的内容,以便它写入到可以使用(select*from)从配置单元或spark sql读取的表格式中
data.write.format("parquet").option("compression", "snappy").mode("append").partitionBy("ds").saveAsTable("xxxxxx")
我建议使用Kafka Connect将数据写入HDFS。它是开源的,可独立使用或作为ConFluent Platform的一部分使用。
对于过滤和转换数据,您可以使用Kafka Streams或KSQL。KSQL在Kafka Streams之上运行,为您提供了一种非常简单的方法来连接数据、过滤数据和构建聚合。
下面是一个在KSQL中聚合数据流的示例
SELECT PAGE_ID,COUNT(*) FROM PAGE_CLICKS WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY PAGE_ID
请参阅本博客中的KSQL操作。您可能还对关于使用这些组件构建流数据管道的讨论感兴趣
问题内容: 我试图找出一种在Hive中从平面源中选择数据并将其输出到一个名为struct的数组中的方法。这是我正在寻找的示例… 样本数据: 所需的输出: 我尝试了collect_list和collect_set,但是它们仅允许原始数据类型。关于如何在Hive中进行此操作有任何想法吗? 问题答案: 我会使用这个jar,它是的更好的实现(并需要复杂的数据类型)。 查询 : 输出 :
我试图从Spark Sql将数据插入到Hive外部表中。我通过以下命令创建了hive外部表 在spark工作中,我编写了以下代码Dataset df=session。read()。选项(“标题”、“真”)。csv(csvInput); 每次运行这段代码时,我都会遇到以下异常
这是我运行strm.py文件的终端命令 $spark_home/bin/spark-submit--主本地--驱动程序-内存4G--num-executors 2--executor-memory 4G--包org.apache.spark:spark-sql-kafka-0-102.11:2.4.0 org.apache.spark:spark-cassandra-connector2.11:2
在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之
我正在使用Spark SQL读取一个配置单元表,并将其分配给一个scala val 有什么方法可以绕过这个错误吗?我需要将记录插入到同一个表中。 嗨,我试着按建议做,但仍然得到同样的错误。