当前位置: 首页 > 知识库问答 >
问题:

按键Spark写入多个输出-一个Spark作业

孙永嘉
2023-03-14

如何在单个作业中使用Spark根据密钥写入多个输出。

相关:按键写入多个输出扩展Hadoop,一个MapRe员作业

例如。

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

将确保cat前缀/1

a
b

cat prefix/2将是

c

编辑:我最近添加了一个新的答案,包括完整的导入,皮条客和压缩编解码器,请参阅https://stackoverflow.com/a/46118044/1586965,这可能是有帮助的,除了早期的答案。

共有3个答案

松桐
2023-03-14

如果一个给定的密钥可能有很多值,我认为可伸缩的解决方案是每个分区为每个密钥写一个文件。不幸的是,Spark中没有内置的支持,但我们可以激发一些东西。

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
  .mapPartitionsWithIndex { (p, it) =>
    val outputs = new MultiWriter(p.toString)
    for ((k, v) <- it) {
      outputs.write(k.toString, v)
    }
    outputs.close
    Nil.iterator
  }
  .foreach((x: Nothing) => ()) // To trigger the job.

// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
  private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
  def write(key: String, value: Any) = {
    if (!writers.contains(key)) {
      val f = new java.io.File("output/" + key + "/" + suffix)
      f.getParentFile.mkdirs
      writers(key) = new java.io.PrintWriter(f)
    }
    writers(key).println(value)
  }
  def close = writers.values.foreach(_.close)
}

(用您选择的分布式文件系统操作替换PrintWriter。)

这使得RDD单次通过,并且不执行洗牌。它为每个键提供一个目录,每个目录中包含许多文件。

须新
2023-03-14

我会这样做,这是可扩展的

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

刚刚看到上面类似的回答,但实际上我们不需要定制分区。MultipleTextOutputFormat将为每个键创建文件。相同键的多个记录落入同一分区是可以的。

新建Hash分区(num),其中num是所需的分区号。如果您有大量不同的密钥,您可以将数字设置为大。在这种情况下,每个分区不会打开太多的hdfs文件处理程序。

司空凌
2023-03-14

如果使用Spark 1.4,由于DataFrame API,这将变得非常容易。(数据帧是在Spark 1.3中引入的,但我们需要的partitionBy()是在1.4中引入的。)

如果您开始使用RDD,首先需要将其转换为DataFrame:

val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")

在Python中,同样的代码是:

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])

一旦有了一个数据帧,就可以根据一个特定的键写入多个输出。更重要的是——这就是DataFrame API的美妙之处——Python、Scala、Java和R的代码几乎相同:

people_df.write.partitionBy("number").text("people")

如果需要,您可以轻松使用其他输出格式:

people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")

在这些示例中,Spark将为我们已在其中分区DataFrame的每个键创建一个子目录:

people/
  _SUCCESS
  number=1/
    part-abcd
    part-efgh
  number=2/
    part-abcd
    part-efgh
 类似资料:
  • 所以我一个月前开始学习spark和cassandra。我遇到了这样一个问题,我必须使用spark预先聚合来自传感器的数据,然后将其放入cassandra表。 这是我的应用程序流程 问题是,我需要将数据按秒、分、时、日、月聚合到每年。这导致我在cassandra中创建了90多个聚合表。 就我的进展而言,我发现我必须使用每个聚合的一个写流查询将每个聚合下沉到每个cassandra表,这导致我创建了这个

  • 想象一下以下过程:Spark应用程序(Java实现)正在使用Cassandra数据库加载、转换为RDD并处理数据。该应用程序还从数据库中传输新数据,这些数据也由自定义接收器处理。流处理的输出存储在数据库中。该实现使用与数据库集成的Spring Data Cassandra。 CassandraConfig: 数据处理器。主要方法: 预计初始加载会有大量数据。因此,数据会在rddBuffer中分页、

  • -第一种方法 所有的查询都可以存储在一个配置单元表中,我可以编写一个Spark驱动程序来一次读取所有查询,并使用java多线程并行运行所有查询(与HiveContext一起 优点:易于维护 缺点:可能会占用所有资源,并且对每个查询进行性能优化可能会很困难。 使用oozie spark操作单独运行每个查询 优点:可以在查询级别进行优化 缺点:难以维护。 我找不到任何关于第一种方法的文档,说明Spar

  • 问题内容: 请看下面的场景:一个Spark应用程序(Java实现)正在使用Cassandra数据库加载,转换为RDD并处理数据。该应用程序还从数据库中提取新数据,这些新数据也由自定义接收器处理。流处理的输出存储在数据库中。该实现使用了与数据库集成中的Spring Data Cassandra。 CassandraConfig: DataProcessor.main方法: 预计在初始加载时会有大量数

  • 上周,我很难从Spark获得数据,最后我不得不接受 出了这个答案。 我测试了更本土的 对于Spark 2.0,但根据下面的注释,它会删除一组csv文件,而不是需要连接的文件,无论这在上下文中意味着什么。它还将一个空文件放入名为“成功”的目录中。目录名是 /mycsv/,但csv本身有一个由一长串字符组成的难以理解的名称。 这是我第一次听说这样的事情。Excel有多个选项卡,这些选项卡必须以某种方式

  • 问题内容: 我正在尝试用Java创建一个简单的Pong游戏,但我不知道如何让两个玩家同时使用键盘。游戏尚不完整,我目前正在为两位玩家进行划桨动作。问题是,当一个玩家按下自己的向上键并向上移动其拨片时,但是如果其他玩家按下其任意键,则会取消先前的玩家动作并导致拨片停止。我想我需要一种同时处理多个键输入的方法。这是我的代码,底部的KeyListeners是我需要帮助的地方。我只有1年的Java程序员,