当前位置: 首页 > 知识库问答 >
问题:

在使用Spark时,如何确保属于一个用户的所有数据都到同一个文件中?

赫连秦迟
2023-03-14

我们有一个用例来准备一个spark作业,该作业将从多个提供者读取数据,其中包含关于以任意顺序出现的用户的信息,并将它们写回S3中的文件。现在,条件是,用户的所有数据必须存在于一个文件中。大约有100万个唯一的用户,每个用户最多拥有大约10KB的数据。我们想到最多创建1000个文件,并让每个文件包含大约1000个用户的记录。

我们使用java dataframe API在spark 2.4.0上创建作业。我不知道做这件事最合乎逻辑的方法是什么?我应该在用户ID上执行group by操作,然后以某种方式收集行,除非我达到1000个用户,然后翻转(如果可能的话),还是有更好的方法。任何正确方向的帮助或提示都非常感谢。

Properties props = PropLoader.getProps("PrepareData.properties");
SparkSession spark = SparkSession.builder().appName("prepareData").master("local[*]")
    .config("fs.s3n.awsAccessKeyId", props.getProperty(Constants.S3_KEY_ID_KEY))
    .config("fs.s3n.awsSecretAccessKey", props.getProperty(Constants.S3_SECERET_ACCESS_KEY)).getOrCreate();

Dataset<Row> dataSet = spark.read().option("header", true).csv(pathToRead);
dataSet.repartition(dataSet.col("idvalue")).coalesce(1000).write().parquet(pathToWrite);

spark.close();
dataSet.repartition(1000, dataSet.col("idvalue")).write().partitionBy("idvalue").parquet(pathToWrite);

共有1个答案

陆卓
2023-03-14

可以使用重新分区然后合并函数。

 Df.repartion(user_id).coalese(1000)

 Df.repartion(user_id,1000)

第一种解决方案保证不会有任何空分区,而在第二种解决方案中,有些分区可能是空的。

参考:Spark SQL-df.repartition和DataFrameWriter Partitionby之间的差异?

dataSet.repartition(dataSet.col("idvalue")).coalesce(1000).write().parquet(pathToWrite);
dataSet.repartition(1000, dataSet.col("idvalue")).write().partitionBy("idvalue").parquet(pathToWrite);
dataSet.repartition(1000, dataSet.col("idvalue")).write().parquet(pathToWrite);

它将创建1000个文件,但是在完成文件编写后,您必须通过读取每个文件来创建ids和文件之间的映射。

 类似资料:
  • 本文向大家介绍数据字典属于哪一个用户的?相关面试题,主要包含被问及数据字典属于哪一个用户的?时的应答技巧和注意事项,需要的朋友参考一下 答案: 数据字典是属于’SYS’用户的,用户‘SYS’ 和 ’SYSEM’是由系统默认自动创建的

  • 我有两个数据帧,它们的列名相同,但行数不同。第一个数据帧(a)看起来与此类似: 注:站点5、6、8和12故意丢失。 第二个数据帧(b)看起来像这样: 我想要实现的是: 在那里我注入(我肯定有一个更好的术语)数据帧b到数据帧a的数据,但是我想用零替换b中的任何NAs,并保持a中的NAs不变。 我发现并尝试了这个代码: 但它会带来NAs。我考虑先将NAs替换为零,但即使如此,它也会抹去我目前在数据帧a

  • 问题内容: 可以将 Spark RDD 通过管道传输到Python吗? 因为我需要一个python库来对数据进行一些计算,但是我的主要Spark项目基于Scala。有没有办法将两者混合使用或让python访问相同的spark上下文? 问题答案: 实际上,您可以使用Scala和Spark以及常规Python脚本来传递到python脚本。 test.py 火花壳(scala) 输出量 你好约翰 你好林

  • 我们的Java应用程序在Logback上使用SLF4J来记录错误消息。在我们的回复中。xml,我们为错误日志定义了一个appender,以及一个指定包层次结构顶层的记录器。 我们正在将记录数据的功能添加到不同的日志文件中。我创建了一个类来处理这个日志记录,我向logback.xml添加了一个新的appender和一个新的logger。新的logger指定了我创建的新类的完全限定包名称(以及addt

  • 我使用SSH SFTP采样器在jmeter中进行SFTP测试。我可以将文件从Ftp位置获取/Put到本地位置,反之亦然。但我无法将文件从同一FTP位置的一个目录移动到另一个目录。 请建议。