当前位置: 首页 > 知识库问答 >
问题:

相当于Scala集合分区的Spark RDD

韩宏朗
2023-03-14
val myStuff = List(Try(2/2), Try(2/0))
val (successes, failures) =  myStuff.partition(_.isSuccess)
val myStuff: RDD[Try[???]] = sourceRDD.map(someOperationThatMayFail)
val successes: RDD[???] = myStuff.collect { case Success(v) => v }
val failures: RDD[Throwable] = myStuff.collect { case Failure(ex) => ex }
val (successes: RDD[Try[???]], failures: RDD[Try[???]]) = myStuff.partition(_.isSuccess)

作为参考,我以前使用了下面这样的方法来解决这个问题。可能失败的操作是从二进制格式反序列化一些数据,这些失败已经变得足够有趣,需要将它们处理并保存为RDD而不是日志。

def someOperationThatMayFail(data: Array[Byte]): Option[MyDataType] = {
   try {
      Some(deserialize(data))
   } catch {
      case e: MyDesrializationError => {
         logger.error(e)
         None
      }
   }
}

共有1个答案

尉迟卓
2023-03-14

也许还有其他的解决方案,但现在你来了:

设置:

import scala.util._
val myStuff = List(Try(2/2), Try(2/0))
val myStuffInSpark = sc.parallelize(myStuff)

执行:

val myStuffInSparkPartitioned = myStuffInSpark.aggregate((List[Try[Int]](),List[Try[Int]]()))(
  (accum, curr)=>if(curr.isSuccess) (curr :: accum._1,accum._2) else (accum._1, curr :: accum._2), 
  (first, second)=> (first._1 ++ second._1,first._2 ++ second._2))
 类似资料:
  • 我目前正在使用firestore的子集合模式,它允许我将集合保存在文档中。 这意味着对于我的组织集合,我有组织文档,每个文档都有子集合,比如:项目、成员等等。。。通过这种方式,我可以始终保证,对于一个特定的文档,我以一种可扩展的方式将其所有特定的数据都限定在它的范围内。这解决了在文档属性中嵌入数据(比如数组)的问题,这些数据可能会失控。 现在,我在mongoDB中没有找到这样的东西。我确实发现Do

  • 问题内容: 我想知道Scala或其著名的数学库之一(例如Spire)是否具有与Go 函数相同的功能 (来自http://tour.golang.org/#4) 如果没有,那么获得相同功能的最“ Scala”方法是什么? 问题答案: 它是Java 库的一部分:

  • 在Java8中,Scala伟大的的等价物是什么? 我很想知道它是,但是reduce必须返回与它所还原的内容类型相同的内容。 例子: 上面代码中的问题是umulator: 因此,有人能给我指出与“foldLeft/修复我的代码”对应的正确方法吗?

  • Scala提供了一套很好的集合实现,提供了一些集合类型的抽象。 Scala 集合分为可变的和不可变的集合。 可变集合可以在适当的地方被更新或扩展。这意味着你可以修改,添加,移除一个集合的元素。 而不可变集合类,相比之下,永远不会改变。不过,你仍然可以模拟添加,移除或更新操作。但是这些操作将在每一种情况下都返回一个新的集合,同时使原来的集合不发生改变。 接下来我们将为大家介绍几种常用集合类型的应用:

  • Scala 集合 Scala Set(集合)是没有重复的对象集合,所有的元素都是唯一的。 Scala 集合分为可变的和不可变的集合。 默认情况下,Scala 使用的是不可变集合,如果你想使用可变集合,需要引用 scala.collection.mutable.Set 包。 默认引用 scala.collection.immutable.Set,不可变集合实例如下: val set = Set(1

  • Scala提供了一套很好的集合实现,提供了一些集合类型的抽象。 Scala 集合分为可变的和不可变的集合。 可变集合可以在适当的地方被更新或扩展。这意味着你可以修改,添加,移除一个集合的元素。 而不可变集合类,相比之下,永远不会改变。不过,你仍然可以模拟添加,移除或更新操作。但是这些操作将在每一种情况下都返回一个新的集合,同时使原来的集合不发生改变。 接下来我们将为大家介绍几种常用集合类型的应用:

  • 为了学习,我开发了一个Scala和JSF应用程序。在这个应用程序中,在JSF中呈现之前,我必须将所有Scala集合对象转换为Java cllectios。有没有什么简单的方法可以通过ScalaElResolver这样的东西来实现,如果有,任何人都有ScalaElResolver的示例代码。提前谢谢菲利普

  • 问题内容: 问题 与新HashSet(Collection)等效的Scala)相关,如何将Java集合(例如)转换为Scala集合? 我实际上是在尝试将Java API调用转换为Spring的 (返回a )成Scala不可变。因此,例如: 这似乎有效。欢迎批评! 问题答案: 您的最后一条建议有效,但您也可以避免使用: 请注意,默认情况下,由于提供了此功能。