当前位置: 首页 > 知识库问答 >
问题:

使用mapPartitionsToPair/PairFlatMapFunction时如何返回迭代器

施利
2023-03-14

在spark中使用mapPartitionsToPair/PairFlatMapFunction时,我在Internet上找到了一个类似的例子

spark.read ().textFile (hdfsPath).javaRDD ()
.mapPartitionsToPair (new PairFlatMapFunction <Iterator <String>, String, String> () {
  public Iterable <Tuple2 <String, String> > call (Iterator <String> input) {
    List <String> result = new ArrayList <String> ();
    while (input.hasNext ()) result.add (doSomeThing (input.next ()));
    return result;
  }
});

但当康普利

return type Iterable<Tuple2<String,String>> is not compatible with Iterator<Tuple2<String,String>>

我找到了call的声明

java.util.Iterator<scala.Tuple2<K,V>> call(T t) 

所以调用应该是返回一个迭代器。

因此,有人能帮助我如何返回在javaRDD api火花迭代器?谢谢

PS:我试过下面这样的代码,但在集群上不起作用:

public Iterator <Tuple2 <String, Strng> > call (Iterator <String> input) {
  List <String> result = new ArrayList <String> ();
  while (input.hasNext ()) result.add (doSomeThing (input.next ()));
  return result.iterator;
}

共有1个答案

柯河
2023-03-14

看起来您的开发环境和集群之间的spark版本不匹配。

从Spark-2.0.0,JavaRDD的平面图和映射分区函数返回Java迭代器,不可迭代。

因此,如果你的集群小于Spark-2.0.0,那么在开发时也使用相同的Spark版本。

对于Spark-2.0.0或更高版本,

public Iterator <Tuple2 <String, Strng> > call (Iterator <String> input) {
List <String> result = new ArrayList <String> ();
while (input.hasNext ()) result.add (doSomeThing (input.next ()));
return result.iterator;
}

应该行得通。

对于小于2.0.0的spark版本,

public Iterable <Tuple2 <String, String> > call (Iterator <String> input) {
List <String> result = new ArrayList <String> ();
while (input.hasNext ()) result.add (doSomeThing (input.next ()));
return result;
}

应该行得通。

 类似资料:
  • 我有一个返回: 然后另一个用户这样使用它: 如何处理任何迭代中的失败情况? 我知道我可以使用,在这种情况下,错误结果将被忽略: 的迭代器根据成功状态具有0或1项,如果为0,将过滤掉它。 但是,我不想忽略错误,而是想让整个代码块停止并返回一个新错误(基于映射中出现的错误,或者只是转发现有错误)。 在Rust中如何最好地处理此问题?

  • 问题内容: 介绍 我正在建立一个合并一些大的csv文件的过程。我目前正在研究使用Univocity进行此操作。我设置合并的方法是使用实​​现可比接口的bean。 给定 简化的文件如下所示: Bean看起来像这样(省略了getter和setter的方法): 比较器如下所示: 由于我不想读取内存中的所有数据,因此我想读取每个文件的最高记录并执行一些比较逻辑。这是我的简化示例: 题 给出上面的示例,我该

  • 将这些视为对象: 查看java文档,对于LinkedList类,LinkedList类中没有迭代器方法的实现,但是,实现是在AbstractSequentialList类中。 listIterator()方法在AbstractList类中实现,AbstractSequentialList的父类,总结一下,如果我没弄错的话,它返回一个不使用节点概念的迭代器对象。 但是方法是在LinkedList类中

  • 本文向大家介绍Python使用迭代器捕获Generator返回值的方法,包括了Python使用迭代器捕获Generator返回值的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Python使用迭代器捕获Generator返回值的方法。分享给大家供大家参考,具体如下: 用for循环调用generator时,发现拿不到generator的return语句的返回值。如果想要拿到返回值,必须

  • 我可以看到返回。但是现在已经添加到C++20标准中,为什么返回?cppreference指定: 返回值 等于last的迭代器。 这个选择背后的理性是什么? 与相比,用例的优势是什么?

  • 从这个问题跟进。如何修改此代码以允许多行倒卷?