当前位置: 首页 > 知识库问答 >
问题:

读Kafka的书,写拼花地板中的hdfs

轩辕成天
2023-03-14

我是大数据生态系统的新手,有点起步。

我读过几篇关于使用spark流媒体阅读Kafka主题的文章,但我想知道是否可以使用spark作业而不是流媒体阅读Kafka主题?如果是的话,你们能帮我指出一些可以让我开始学习的文章或代码片段吗。

问题的第二部分是以拼花格式向hdfs写信。一旦我读了Kafka的书,我想我会有一个rdd。将此rdd转换为数据帧,然后将数据帧写入拼花文件。这是正确的方法吗。

感谢您的帮助。

谢谢

共有3个答案

孟成文
2023-03-14

使用Kafka流。SparkStreaming用词不当(它是引擎盖下的小批量,至少高达2.2)。

https://eng.verizondigitalmedia.com/2017/04/28/Kafka-to-Hdfs-ParquetSerializer/

司空凌
2023-03-14

关于这个话题,你已经有了几个很好的答案。

只是想释放压力-小心直接流到拼花地板桌上。当拼花地板行组大小足够大(为简单起见,您可以说文件大小应该在64-256Mb左右),以利用字典压缩、bloom过滤器等(一个拼花文件中可以有多个行块,并且通常每个文件中都有多个行块;虽然行块不能跨越多个拼花文件),拼花的性能就会大放异彩

如果您直接流式处理到一个拼花地板表,那么很可能会得到一堆很小的拼花地板文件(取决于Spark流式处理的小批量大小和数据量)。查询此类文件可能非常慢。例如,拼花可能需要读取所有文件的头来协调模式,这是一个很大的开销。如果是这种情况,您将需要一个单独的进程,例如,作为一种解决方法,该进程将读取旧文件,并将其写入“合并”(这不是一个简单的文件级合并,一个进程实际上需要读取所有拼花数据并溢出更大的拼花文件)。

这种变通方法可能会扼杀数据“流”的最初目的。你也可以看看其他技术,比如Apache Kudu、Apache Kafka、Apache Druid、Kinesis等,它们可以在这里更好地工作。

更新:自从我发布了这个答案,现在这里有了一个新的强大玩家——三角洲湖。https://delta.io/如果你习惯拼花,你会发现三角洲非常有吸引力(实际上,三角洲建立在拼花层元数据之上)。三角洲湖提供:

Spark上的ACID事务:

  • 可序列化的隔离级别确保读者永远不会看到不一致的数据
  • 可扩展元数据处理:利用Spark的分布式处理能力,轻松处理具有数十亿文件的PB级表的所有元数据
  • 流和批处理统一:Delta Lake中的一个表既是一个批处理表,也是一个流源和流汇。流式数据摄取、批量历史回填、交互式查询都是现成的
  • 模式强制:自动处理模式变化,以防止在摄取过程中插入错误记录
  • 时间旅行:数据版本控制支持回滚、完整的历史审核跟踪和可复制的机器学习实验
  • 升级和删除:支持合并、更新和删除操作,以支持复杂的用例,如更改数据捕获、缓慢更改维度(SCD)操作、流式升级等
璩慎之
2023-03-14

要以拼花格式从Kafka读取数据并将其写入HDFS,请使用Spark Batch job而不是流式处理,您可以使用Spark结构化流式处理。

结构化流是一种基于Spark SQL引擎构建的可扩展、容错的流处理引擎。您可以使用与对静态数据进行批处理计算相同的方式来表示流计算。Spark SQL引擎将负责以增量方式连续运行它,并在流式数据不断到达时更新最终结果。您可以使用Scala、Java、Python或R中的Dataset/DataFrame API来表示流聚合、事件时间窗口、流到批连接等。计算是在同一个优化的Spark SQL引擎上执行的。最后,该系统通过检查点和预写日志确保端到端的精确一次容错。简而言之,结构化流提供了快速、可扩展、容错、端到端的一次流处理,用户无需对流进行推理。

它作为Kafka的内置源提供,即我们可以从Kafka轮询数据。它与Kafka broker版本0.10.0或更高版本兼容。

为了在批处理模式下从Kafka提取数据,您可以为定义的偏移范围创建数据集/数据帧。

// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

源中的每一行都有以下架构:

| Column           | Type          |
|:-----------------|--------------:|
| key              |        binary |
| value            |        binary |
| topic            |        string |
| partition        |           int |
| offset           |          long |
| timestamp        |          long |
| timestampType    |           int |

现在,要以拼花格式将数据写入HDFS,可以编写以下代码

df.write.parquet("hdfs://data.parquet")

有关Spark结构化流媒体Kafka的更多信息,请参阅以下指南-Kafka集成指南

我希望这有帮助!

 类似资料:
  • 我正在从Impala迁移到SparkSQL,使用以下代码读取一个表: 我如何调用上面的SparkSQL,这样它就可以返回这样的东西:

  • 我能够以拼花格式写入,并通过如下列进行分区: 但我无法用Glue的DynamicFrame做到这一点。 我试图通过作为的一部分,因为AWS文档说拼花胶不支持任何格式选项,但这不起作用。 这有可能吗?怎么可能?至于这样做的原因,我认为工作书签是必要的,因为这对我目前不起作用。

  • 我有以Avro格式存储的Kafka主题。我想使用整个主题(在收到时不会更改任何消息)并将其转换为Parket,直接保存在S3上。 我目前正在这样做,但它要求我每次消费一条来自Kafka的消息,并在本地机器上处理,将其转换为拼花文件,一旦整个主题被消费,拼花文件完全写入,关闭写入过程,然后启动S3多部分文件上传。或《Kafka》中的阿夫罗- 我想做的是《Kafka》中的阿夫罗- 注意事项之一是Kaf

  • 由于,我检查了一个spark作业的输出拼花文件,该作业总是会发出声音。我在Cloudera 5.13.1上使用了 我注意到拼花地板排的大小是不均匀的。第一排和最后一排的人很多。剩下的真的很小。。。 拼花地板工具的缩短输出,: 这是已知的臭虫吗?如何在Spark中设置拼花地板块大小(行组大小)? 编辑: Spark应用程序的作用是:它读取一个大的AVRO文件,然后通过两个分区键(使用

  • 我是Spark的初学者,试图理解Spark数据帧的机制。当从csv和parquet加载数据时,我比较了spark sql dataframe上sql查询的性能。我的理解是,一旦数据加载到spark数据框中,数据的来源(csv或parquet)应该无关紧要。然而,我看到了两者之间的显著性能差异。我使用以下命令加载数据,并对其编写查询。 请解释差异的原因。

  • 如果我写信 临时工。拼花文件夹我得到了和行号相同的文件号 我想我不太了解拼花地板,但它是自然的吗?