当前位置: 首页 > 知识库问答 >
问题:

使用spark streaming将每个Kafka消息保存在hdfs中

范浩宕
2023-03-14

我正在使用火花流做分析。分析后,我必须将kafka消息保存在hdfs中。每个kafka消息都是一个xml文件。我不能使用rdd.saveAsTextFile,因为它会保存整个rdd。rdd的每个元素都是kafka消息(xml文件)。如何使用火花在hdfs中保存每个rdd元素(文件)。

共有1个答案

能业
2023-03-14

我会用另一种方式来解决这个问题。将转换后的数据流式传输回Kafka,然后使用用于Kafka Connect的HDFS连接器将数据流式传输到HDFS。Kafka连接是阿帕奇Kafka的一部分。HDFS连接器是开源的,可以单独使用,也可以作为Confluent平台的一部分使用。

这样做可以将处理与将数据写入HDFS解耦,从而更容易管理、故障排除和扩展。

 类似资料:
  • 我有以下Flume代理配置来读取来自kafka源的消息并将它们写回HDFS接收器 如果每个轮询周期只有一条kafka消息到达,则kafka消息内容是avro数据,并且正确地序列化为文件。 当两个kafka消息到达同一批次时,它们被分组在同一个HDFS文件上,因为avro消息包含两个模式数据,结果文件包含模式数据模式数据,导致它是无效的. avro文件。 如何拆分avro事件以将不同的kafka消息

  • 保存/记录在AWS SNS主题上发布的每条消息的最简单方法是什么?我想可能有一个神奇的设置可以自动将它们推送到S3或数据库,或者可能是一个自动支持HTTP目标的数据库服务,但似乎并非如此。也许需要通过Lambda函数来完成? 目的只是为了在设置一些SNS发布时进行基本的诊断和调试。我并不真正关心大规模或快速查询,只想一次记录和执行几分钟对所有活动的基本查询。

  • 我正在为Kafka和SparkStreaming编写一些代码,当我将它们放在Yarn-Cluster上时,它报告了。 但它在我的电脑上运行良好(独立模式) 那它有什么问题呢? //这是代码 这里例外----------------------------------- 19/07/26 18:21:56警告Scheduler.TaskSetManager:在stage 0.0中丢失任务0.0(TI

  • 我是Kafka的新手,一直在尝试实现一个消费者。下面是我的场景 启动消费者应用程序 产生来自生产者的消息。这些消息被消费者消费 停止消费者并再次生成消息。当我启动消费者时,在消费者被停止时发布的消息不会被读取 虽然会消耗消息,但它会消耗发布到主题的所有消息。我想只消耗那些在消费者关闭时发布的消息。

  • 我已经更新了我的Kafka从版本0.10.2.0到版本2.1.0,现在Kafka不能消费消息。我使用Spring引导,这是我的配置: 我已经更改了组id,以避免旧组id出现问题。我当前的spring版本是2.1。2.释放。在我的应用程序中,我可以看到我的客户是如何不断地重新连接的 你知道这个问题吗?

  • 我遇到了两个关于订购的短语, 生产者发送到特定主题分区的消息将按发送顺序追加。也就是说,如果记录M1与记录M2由同一生产者发送,并且M1首先发送,则M1的偏移量将低于M2并出现在日志中的较早位置。 另一个 问题是,如果存在如#2所述的失败发送,那么该顺序是否仍会保留到特定分区?如果一条消息存在潜在问题,将删除每个分区的所有以下消息“以保留顺序”,或者将发送“正确”的消息,并将失败的消息通知应用程序