我有一个groupedrdd
,其类型为key=string
和value=iterable
值实际上以字符串
格式保存JSON
数据,而分组键的格式为
示例:如果我的分组rdd中有以下键
tenant1/2016/12/output_data.json
tenant1/2017/01/output_data.json
tenant1/2017/02/output_data.json
那么在我的HDFS中我应该有三个文件
tenant1/2016/12/output_data.json
tenant1/2017/01/output_data.json
tenant1/2017/02/output_data.json
为此,我尝试了以下方法:
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}
groupedRDD.partitionBy(new HashPartitioner(1))
.saveAsHadoopFile("/user/pkhode/output/", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])
/user/pkhode/output/tenant1/2016/12/output_data.json
/user/pkhode/output/tenant1/2017/01/output_data.json
/user/pkhode/output/tenant1/2017/02/output_data.json
List({json_object_in_string1}, {json_object_in_string2}, .....)
{json_object_in_string1}
{json_object_in_string2}
.....
groupedRDD.partitionBy(new HashPartitioner(1000)).mapValues(_.mkString("\n")).saveAsHadoopFile(outputPath, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
at com.esotericsoftware.kryo.io.Output.require(Output.java:160)
at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:462)
at com.esotericsoftware.kryo.io.Output.writeString(Output.java:363)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:191)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:184)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27)
at scala.collection.immutable.List.foreach(List.scala:381)
at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27)
at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:195)
at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:135)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
当Spark将RDD保存为文本文件时,它只对RDD元素调用tostring
。首先尝试将值映射到字符串
:
rdd.mapValues(_.mkString("\n"))
我有一个带有20个分区的火花数据帧df,每个分区都有一天的数据。这意味着我的输入数据帧已经按天分区。我的目标是编写一个拼花文件,该文件也按天分区。如果我尝试以下命令: df。重新划分(5)。写模式(“覆盖”)。partitionBy(['day'])。拼花地板(“路径”) 当我的输入数据框已经分区时,有很多洗牌正在发生。请注意,这个数据框包含超过10亿行,并且由于洗牌,它正在杀死我的执行器。 有没
我想将一个数据帧保存到两个不同的csv文件中(拆分数据帧)-一个文件只包含标题,另一个文件包含其余行。 我想将这两个文件保存在同一个目录下,这样Spark处理所有逻辑将是最好的选择,如果可能的话,而不是使用pandas分割csv文件。 最有效的方法是什么? 谢谢你的帮助!
我一直在努力想办法创建一个HashMap,将具有相同键的值(放入列表)分组。这就是我的意思: 假设我有以下键和值: 我想把这些值放到一个 这样它就会将值分组到具有相同键的列表整数中,类似于这样: (1, {10, 11, 12}),(2, {20}), (3, {30,31}) 现在,密钥和值存储在 我不知道如何通过这个Hashmap循环创建新的Hashmap,关键是:值对列表。有人对这个话题有好
我正在为工作中的一个项目尝试闪烁。我已经到了通过应用计数窗口等来处理流的地步。然而,我注意到一个特殊的行为,我无法解释。 看起来一个流是由两个线程处理的,输出也是分成两部分的。 首先,我注意到使用将流打印到标准控制台时的行为。 然后,我打印到一个文件,它实际上正在输出文件夹中的两个名为1和2的文件中打印。 有人能解释一下为什么Flink会有这种行为吗?如何配置它?为什么有必要对结果流进行拆分? 并
问题内容: 我有一本字典: 我想以这种方式将数据写入dict.csv文件: 我写: 但是现在我将所有键都放在一行中,而所有值都在下一行中。 当我设法写一个这样的文件时,我也想将其读回到新的字典中。 只是为了解释我的代码,该词典包含来自textctrls和复选框的值和布尔值(使用wxpython)。我要添加“保存设置”和“加载设置”按钮。保存设置应以上述方式将字典写入文件(以使用户更容易直接编辑cs
问题内容: 经过一些帮助MSChart-强制从Origin绘制折线图]后,我设法将以下MSSQL查询放在一起,以用于折线图。 但是,我有一个问题。该查询返回类似以下内容的内容: 这意味着我每天都在阅读,而实际上每个不同的值仅需要一行。在每种情况下,我都希望它第一次出现,因此对于上面的示例,我希望查询返回: 我看了看似是基于类似前提的几个问题,但是所有答复都是针对特定情况而量身定制的(有些甚至使用L