这不是一个真正的Spark问题,而是一个与Scala类型相关的问题,但我正在做的事情可能会引起Spark粉丝的兴趣,所以我在我的问题框架中保留了“Spark ”,即:
我想以递归方式转换结构类型的 spark sql 架构,其中包含一个列表,其元素可以是结构类型或结构字段。转换的结果应该是原始架构的一个版本,该版本不允许在任何字段中使用 null。不幸的是,结构类型和结构字段不是从共同的标记特征延伸而来的。这导致了我的初始实现,其中方法接受“Any”并将结果显式转换回结构类型。
初始实施
object SchemaTools extends App {
import org.apache.spark.sql.types._
def noNullSchema(schema: StructType): StructType = {
def go(element: Any): Product = element match {
case x: StructField => x.copy(nullable = false)
case x: StructType => StructType(x.fields.map(_.copy(nullable = false)))
case bad => sys.error(s"element of unexpected type: $bad")
}
go(schema).asInstanceOf[StructType]
}
type Rec = (String, Seq[(Int, Int, String)])
val schema: StructType = Encoders.product[Rec].schema
System.out.println("pr:" + schema.prettyJson)
System.out.println("pr:" + noNullSchema(schema).prettyJson)
}
更新
我接受Tim的回答,因为他友好地指出了我的愚蠢错误,即我没有递归到嵌套结构中。我已经在下面包含了一个去无效器的上述“概念证明”的修改版本。这适用于我的示例输入,并说明了我将采用的一般方法。有了这个实现,我就没有与类型相关的问题了。我的错!:我误解了StructType内部的内容(它始终是StructField的数组,而不是StructField或StructType的数组)。数组中的字段本身可能是数据类型“StructType ”,这就需要递归。无论如何...下面是一个修改过的“玩具”实现,它说明了如果我需要一个完整的解决方案(而不是为了学习而实现),我将如何处理这个问题。这段代码肯定还没有准备好投入生产,在更复杂的输入中会失败。不过,它展示了一种可能的方法。
注意:我学到的关于空值和模式的另一件事非常重要,要记住....即使正确实现了模式“去空器”, Spark也不会在解析过程中强制执行可空性检查。这里将详细讨论这一点:Spark sql模式中的可空性在默认情况下是建议性的。严格执行的最佳方法是什么?
*概念证明...不再有类型问题*
object SchemaTools extends App {
import org.apache.spark.sql.types._
def noNullSchema(field: StructField): StructField = {
field.dataType match {
case ArrayType(StructType(fields), containsNull) =>
StructField(
field.name,
ArrayType(noNullSchema(StructType(fields)), containsNull),
nullable = false,
field.metadata)
case _ => field.copy(nullable = false)
}
}
def noNullSchema(schema: StructType): StructType =
StructType (
schema.fields.map { f =>
System.out.println("f:" + f);
noNullSchema(f)
}
)
type Rec = (String, Seq[(Int, String, String)])
val schema: StructType = Encoders.product[Rec].schema
System.out.println("pr:" + schema.prettyJson)
System.out.println("pr:" + noNullSchema(schema).prettyJson)
}
除非我误解了这个问题,否则我认为你只需要调整你原来的解决方案来改变。
go(schema).asInstanceOf[StructType]
到…里面
StructType(schema.fields.map(go))
此外,参数和 go
结果的类型应与 StructType.字段
的元素类型相同。
由于框架对字段
的元素使用泛型类型,因此必须有一些代码来处理该泛型类型。因此,如果该类型为“任何
”,则必须处理“任何
”,而类型类将无济于事。
这个整体模型代表了一个监控信息流的计算机系统,如果它检测到一个问题,它会创建一个任务供某人调查。但是,如果有进一步的信息可用,则任务应该已经终止。 在BPMN和Camunda中,最好的建模方法是什么? 我可以成功地启动/添加一个进程,以便使用curl发布表示启动消息的消息。这将添加一个进程,任务被分配给一个用户。 然而,我似乎无法让接收任务与流程相关联,它只是似乎添加了一个新流程。接收任务应该表示
我正在开发一个应用程序。我正在使用Activemq。有没有什么方法可以做到一个生产者总是向一个经纪人发送消息,但在另一边有3consumers.Each消费者监听经纪人,可以从queue.Is获取任何消息? 我正在使用 activemq 将我的应用程序日志写入数据库。如您所知,将日志写入数据库是一个耗时的过程。这就是为什么消费者比生产者越来越慢。例如。我发送100.000条消息(大对象)。制作者在
问题内容: 因此,使用此链接作为参考,任何人都可以提出更优雅的解决方案来取消定期的ScheduledExecutorService任务吗? 这是我目前正在做的事的一个例子: 问题答案: 我建议您使用int并自己安排任务。
我们已经在 无阻塞调用 一节中看到了取消任务的示例。 在这节,我们将回顾一下,在一些更加详细的情况下取消的语义。 一旦任务被 fork,可以使用 yield cancel(task) 来中止任务执行。取消正在运行的任务,将抛出 SagaCancellationException 错误。 来看看它是如何工作的,让我们先考虑一个简单的例子:一个可通过某些 UI 命令启动或停止的后台同步任务。 在接收到
问题内容: 我已经在vba中与selenium相关联地编写了一个脚本,以开始在某些洪流站点中进行搜索。我的脚本运行良好,但问题是我必须在脚本中使用才能使其成功。我现在想做的是使用一些循环或任何类似的方式通过从脚本中排除硬编码延迟来检查所需元素的可用性。任何帮助,将不胜感激。 到目前为止,这是我的尝试(正在工作): 参考添加: 问题答案: 似乎已经找到了解决方案。有一个指向 github 的链接,其
我不清楚如何在RXJava中实现任务取消。 我对移植使用Guava的ListenableFuture构建的现有API很感兴趣。我的用例如下: 我有一个单独的操作,它由一系列由未来连接的未来组成。transform() RxJava wiki中关于这一点的信息很少;我能找到的唯一取消参考提到等效于. NET的,但据我所知,订阅仅提供取消订阅序列中后续值的能力。 我不清楚如何通过这个API实现“任何订