当前位置: 首页 > 知识库问答 >
问题:

Spark Dataset Foreach函数不会迭代

陈季
2023-03-14

上下文

我想迭代一个Spark数据集,并为每一行更新一个HashMap。

以下是我的代码:

// At this point, I have a my_dataset variable containing 300 000 rows and 10 columns
// - my_dataset.count() == 300 000
// - my_dataset.columns().length == 10

// Declare my HashMap
HashMap<String, Vector<String>> my_map = new HashMap<String, Vector<String>>();

// Initialize the map
for(String col : my_dataset.columns())
{
    my_map.put(col, new Vector<String>());
}

// Iterate over the dataset and update the map
my_dataset.foreach( (ForeachFunction<Row>) row -> {
    for(String col : my_map.KeySet())
    {
        my_map.get(col).add(row.get(row.fieldIndex(col)).toString());
    }
});

问题

我的问题是Foreach根本不迭代,lambda从来没有执行过,我不知道为什么。
我在这里实现了它:如何在SparkJava中遍历/迭代数据集?

最后,尽管数据集不是空的,但所有内部向量都保持为空(初始化时)(查看给定代码示例中的第一个注释)。

我知道foreach从不迭代,因为我做了两个测试:

  • 添加一个原子整数来计数迭代,在lambda的开头用增量AndGet()方法递增。

我不习惯Java(更不用说Java lambdas了),所以我可能错过了一个重要的观点,但我找不到什么。

共有1个答案

燕靖
2023-03-14

我可能有点老派,但我从不太喜欢lambdas,因为它可能会变得非常复杂。

下面是一个完整的foreach()示例:

package net.jgp.labs.spark.l240_foreach.l000;

import java.io.Serializable;

import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class ForEachBookApp implements Serializable {
  private static final long serialVersionUID = -4250231621481140775L;

  private final class BookPrinter implements ForeachFunction<Row> {
    private static final long serialVersionUID = -3680381094052442862L;

    @Override
    public void call(Row r) throws Exception {
      System.out.println(r.getString(2) + " can be bought at " + r.getString(
          4));
    }
  }

  public static void main(String[] args) {
    ForEachBookApp app = new ForEachBookApp();
    app.start();
  }

  private void start() {
    SparkSession spark = SparkSession.builder().appName("For Each Book").master(
        "local").getOrCreate();

    String filename = "data/books.csv";
    Dataset<Row> df = spark.read().format("csv").option("inferSchema", "true")
        .option("header", "true")
        .load(filename);
    df.show();

    df.foreach(new BookPrinter());
  }
}

如您所见,这个例子读取一个CSV文件并从数据中打印一条消息。这相当简单。

Foreach()实例化一个新的类,在那里完成工作。

df.foreach(new BookPrinter());

这项工作是在类的call()方法中完成的:

  private final class BookPrinter implements ForeachFunction<Row> {

    @Override
    public void call(Row r) throws Exception {
...
    }
  }

由于您是Java新手,请确保您拥有正确的签名(对于类和方法)和正确的导入。

您还可以从中克隆示例https://github.com/jgperrin/net.jgp.labs.spark/tree/master/src/main/java/net/jgp/labs/spark/l240_foreach/l000.这将帮助您使用foreach()

 类似资料:
  • 合并和拆分迭代器 # itertools_chain.py from itertools import * for i in chain([1, 2, 3], ['a', 'b', 'c']): print(i, end=' ') print() # itertools_chain_from_iterable.py from itertools import * def make_

  • 所以几天前,我安装了vscode来处理Unity游戏引擎脚本,然而,即使我安装了C#扩展,它也不会显示每个函数的参数,例如,如果我编写一个函数,vscode没有显示我应该把什么样的参数放在括号里。我怎样才能让它这样做? 比如说,在我编写的代码中,它不会显示的参数: 此外,每当我从Unity项目加载脚本时,vscode都会显示此错误: 错误图像

  • 问题内容: 我在body 函数上使用此脚本: 每次我移动鼠标并画一条新线时,都应该清除画布,但是它不能正常工作。我正在尝试不使用jQuery,鼠标侦听器或类似工具来解决它。 问题答案: 您应该使用“ beginPath() ”。这就对了。

  • 问题内容: 您好,我似乎无法使排除_id工作,这是代码 这是控制台日志上的输出 我哪里做错了? 问题答案: 我认为指定投影的正确方法是使用“字段”或“投影”属性,具体取决于驱动程序的版本。 在此处阅读文档。

  • 我正在使用PyThon和熊猫。 在网球比赛中,我想通过以下方式过滤我的数据: 检查获胜者是某个玩家 检查失败者是否在指定的集合内 我试着用下面的方法来做 其中df是存储我的数据的数据帧对象。 我得到以下错误: 我理解错误。但是我还没有想出办法来解决它。 我如何进行过滤,使df.loser根据许多值进行检查,而不是像df.winner==player这样的值?