当前位置: 首页 > 知识库问答 >
问题:

如何一次将每组发送给spark执行者?

严知
2023-03-14

我无法一次将每组数据帧发送给执行器。

我在公司_模型_VAL_df数据框中有如下数据。

 ----------------------------------------------------------------------------------------
 | model_id  |  fiscal_year  | fiscal_quarter | col1 | col2 | col3 | col4 | col5 | col6 |
 ----------------------------------------------------------------------------------------
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   2             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   2             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 ----------------------------------------------------------------------------------------

我想将每个分组的数据发送给executor,以便一次处理每个数据。

为此,我做了如下工作:

var dist_company_model_vals_df =  company_model_vals_df.select("model_id","fiscal_quarter","fiscal_year").distinct()

// Want to send each group at a time to write by executors.

dist_company_model_vals_df.foreach(rowDf => {
  writeAsParquet(rowDf , parquet_file)    // this simply writes the data as parquet file
})

错误:

这将抛出一个NullPointerExcure,因为在Execator端找不到rowDf。使用Scala 2.11在Spark-sql中处理此问题的正确方法是什么?

第二部分:问题

当我做公司模型时。groupBy(“model_id”、“fiscal_quarty”、“fiscal_year”)即使在我增加内存后,数据仍大量溢出到磁盘上。也就是说,公司模型是巨大的数据帧。。。做群比时会发生很多溢出。

同样是下面的情况下,即与分区通过

company_model_vals_df.write.partition(model_id、fiscal_quarter、fiscal_year)

PSEDO代码:所以为了避免是第一个,我会做valgroups=company_model_vals_df的元组。groupBy(“型号id”、“会计季度”、“会计年度”)。收集

groups.forEach{ group ->
   // I want to prepare child dataframes for each group from    company_model_vals_df

   val child_df = company_model_vals_df.where(model_id= group.model_id && fiscal_quarter === group.fiscal_quarter && etc)

 this child_df , i want wrote to a file i.e. saveAs(path)
}

无论如何都有办法做到这一点。这里有对我有用的spark函数或API吗?请提出解决方法。

共有2个答案

姜俊友
2023-03-14

如果我正确理解了您的问题,您希望分别处理每个“型号id”、“财政季度”、“财政年度”的数据。

如果这是正确的,您可以使用groupBy(),例如:

company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").agg(avg($"col1") as "average")

如果要将每个逻辑组写入一个单独的文件夹,可以通过以下方式完成:

scala prettyprint-override">company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year").parquet("path/to/save")

公羊招
2023-03-14

这里几乎没有选择-

  • 您需要将数据集分成多个数据集,并分别处理它们,就像
var dist_company_model_vals_list =  company_model_vals_df
  .select("model_id","fiscal_quarter","fiscal_year").distinct().collectAsList

然后过滤company_model_vals_df,输出dist_company_model_vals_listlist,它提供了几个可以独立工作的数据集,如

def rowList = {
import org.apache.spark.sql._
var dfList:Seq[DataFrame] = Seq()
for (data <- dist_company_model_vals_list.zipWithIndex) {
val i = data._2
val row = data.-1
val filterCol = col($"model_id").equalTo(row.get(i).getInt(0).and($"fiscal_quarter").equalTo(row.get(i).getInt(1).and($"fiscal_year").equalTo(row.get(i).getInt(2))

   val resultDf = company_model_vals_df.filter(filterCol)    
dfList +: = resultDf
      }
dfList
}
  • 如果您的目标是写入数据,则可以在DataFrameWriter上使用partitionBy(“model_id”、“fiscal_quarter”、“fiscal_year”)方法分别写入数据
 类似资料:
  • 我想每10秒执行一段代码。我在这个论坛上找到了一个例子,但我的实现有一些问题。 我得到这个错误,指向:

  • 问题内容: 我正在cronjob中运行PHP脚本,我想每5分钟发送一封电子邮件 我当前的(crontab)cronjob: cronmail.php如下: 但是我没有在30分钟内收到此配置的电子邮件。 问题答案: 在文件中,这些字段是: 每小时的分钟。 一天中的小时。 一个月中的某天。 一年中的月份。 一周中的天。 所以: 表示每小时10分钟执行一次。 如果您希望每五分钟使用一次,请使用以下任一方

  • 问题内容: 我想通过cron运行一项工作,该工作将在一天中的指定时间每隔第二个星期二执行一次。每个星期二都很容易: 但是,如何在“每隔第二个星期二”(或者,如果您愿意,每隔第二周)进行一次?我不想自己在脚本中实现任何逻辑,而是仅将定义保留在cron中。 问题答案: 怎么样呢,即使没有在前五个字段中完全定义它,它也会保留它:

  • 问题内容: 我有一个ajax调用到一个php文件。我正在收到结果。现在,我正在研究是否有可能使ajax请求每1秒自动执行一次。我将结果发布到名为的输入字段中。如何每三秒钟执行一次ajax调用而不必调用该函数? 问题答案: 您可能要考虑的是服务器发送事件(SSE) 这是一种HTML5技术,JavaScript可以通过该技术“ 长期轮询 ”服务器端点(您的PHP文件)以查看是否发生了任何更改。长轮询基

  • 问题内容: 我有一个javascript函数,我希望在JSF 2中进行每次异步回发后执行。 我已执行以下操作以确保执行此每整页回发: 我需要执行此操作的原因是为了解决第三方JSF组件库中的故障,因此我无法在服务器呈现阶段中进行任何修改来对此组件执行此操作。 我可能找不到有关此问题的信息,可能是因为我使用了不正确的术语。我曾经是ASP.NET开发人员,我将这些术语称为“整页回发”和“部分回发”,而其

  • null 有什么主意吗?我该怎么做?我知道我不能只使用Once Controller,因为每个请求也是由每个线程执行的。