当前位置: 首页 > 知识库问答 >
问题:

Apache Spark结构化流(DataStreamWriter)写入配置单元表

黄沈浪
2023-03-14

我希望使用Spark结构化流从Kafka读取数据并处理它并写入Hive表。

 val spark = SparkSession
   .builder
   .appName("Kafka Test")
   .config("spark.sql.streaming.metricsEnabled", true)
   .config("spark.streaming.backpressure.enabled", "true")
   .enableHiveSupport()
   .getOrCreate()

val events = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "xxxxxxx")
  .option("startingOffsets", "latest")
  .option("subscribe", "yyyyyy")
  .load


val data = events.select(.....some columns...)

data.writeStream
  .format("parquet")
  .option("compression", "snappy")
  .outputMode("append")
  .partitionBy("ds")
  .option("path", "maprfs:/xxxxxxx")
  .start()
  .awaitTermination()

这确实会创建拼花文件,但是我如何更改它以模拟类似的内容,以便它写入到可以使用(select*from)从配置单元或spark sql读取的表格式中

data.write.format("parquet").option("compression", "snappy").mode("append").partitionBy("ds").saveAsTable("xxxxxx")

共有1个答案

轩辕翰
2023-03-14

我建议使用Kafka Connect将数据写入HDFS。它是开源的,可独立使用或作为ConFluent Platform的一部分使用。

对于过滤和转换数据,您可以使用Kafka Streams或KSQL。KSQL在Kafka Streams之上运行,为您提供了一种非常简单的方法来连接数据、过滤数据和构建聚合。

下面是一个在KSQL中聚合数据流的示例

SELECT PAGE_ID,COUNT(*) FROM PAGE_CLICKS WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY PAGE_ID

请参阅本博客中的KSQL操作。您可能还对关于使用这些组件构建流数据管道的讨论感兴趣

 类似资料:
  • 问题内容: 我试图找出一种在Hive中从平面源中选择数据并将其输出到一个名为struct的数组中的方法。这是我正在寻找的示例… 样本数据: 所需的输出: 我尝试了collect_list和collect_set,但是它们仅允许原始数据类型。关于如何在Hive中进行此操作有任何想法吗? 问题答案: 我会使用这个jar,它是的更好的实现(并需要复杂的数据类型)。 查询 : 输出 :

  • 我试图从Spark Sql将数据插入到Hive外部表中。我通过以下命令创建了hive外部表 在spark工作中,我编写了以下代码Dataset df=session。read()。选项(“标题”、“真”)。csv(csvInput); 每次运行这段代码时,我都会遇到以下异常

  • 这是我运行strm.py文件的终端命令 $spark_home/bin/spark-submit--主本地--驱动程序-内存4G--num-executors 2--executor-memory 4G--包org.apache.spark:spark-sql-kafka-0-102.11:2.4.0 org.apache.spark:spark-cassandra-connector2.11:2

  • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之

  • 我正在使用Spark SQL读取一个配置单元表,并将其分配给一个scala val 有什么方法可以绕过这个错误吗?我需要将记录插入到同一个表中。 嗨,我试着按建议做,但仍然得到同样的错误。