我试图在火花笔记本的阿帕奇火花中做NLP。对于这个特定的例子,我正在使用库https://opennlp.apache.org创建一个块来提取名词短语。由于数据量的增加,我需要转向分布式计算。
问题是我无法广播我的chunker对象。通过阅读文档(只在board上投射数组等简单对象),我尝试了以下方法:
import opennlp.tools.tokenize.WhitespaceTokenizer
import opennlp.tools.cmdline.postag.POSModelLoader
import opennlp.tools.postag.POSTaggerME
import opennlp.tools.chunker.ChunkerModel
import opennlp.tools.chunker.ChunkerME
import java.io.FileInputStream
import java.io.File
//Instantiate the ChunkerME class
val inputStream = new FileInputStream("fr-chunk.bin");
val chunkerModel = new ChunkerModel(inputStream);
val chunkerME = new ChunkerME(chunkerModel);
val broadCastedChunkerME = sc.broadcast(chunkerME)
但这会引发以下错误:
java.io.NotSerializableException: opennlp.tools.chunker.ChunkerME
Serialization stack:
- object not serializable (class: opennlp.tools.chunker.ChunkerME, value: opennlp.tools.chunker.ChunkerME@35a5c281)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:268)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:269)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1411)
... 63 elided
如果我将chunker的初始化封装在函数中,然后在map方法中调用函数,则可以这样做:
def getNPChunks(sentence: String): Array[Chunk] = {
import opennlp.tools.chunker.ChunkerModel
import opennlp.tools.chunker.ChunkerME
import java.io.FileInputStream
val inputStream = new FileInputStream("fr-chunk.bin");
val chunkerModel = new ChunkerModel(inputStream);
//Instantiate the ChunkerME class
val chunkerME = new ChunkerME(chunkerModel);
chunkerME.chunkAsSpans(sentence);
}
// call the chunker
line.map(getNPChunks)
但这里的问题是,这段代码效率非常低,因为它正在为rdd中的每个条目初始化chunker对象。因为map函数为rdd的每个条目调用getNPChunks函数,并且为每个条目创建一个新的chunker对象。
由于这种低效的设计,我的spark脚本的运行速度比顺序脚本慢20倍。
我做错了什么?
初始化对象内部scala"chunkerME",然后广播它。默认情况下,Scala对象是序列化的。编译器对scala对象进行序列化。
或者,如果在scala类中初始化,则需要通过扩展Serializable特性显式序列化scala类。
解决这个问题的方法是使用mapPartitions
这样,您可以为每个分区创建一个chunker,而不是为每行创建一个chunker:
def getChunker():
val inputStream = new FileInputStream("fr-chunk.bin");
val chunkerModel = new ChunkerModel(inputStream);
//Instantiate the ChunkerME class
val chunkerME = new ChunkerME(chunkerModel);
line.mapPartitions(it =>
val chunker = getChunker()
it.map(line => chunker.chunkAsSpans(line))
)
有关map分区
的更多详细信息,请参阅此答案:https://stackoverflow.com/a/39203798/245024
我试图从外部文本文件中读取dataframe模式,并使用它创建一个dataframe。然而,我无法理解如何将string转换为StructType。 我正在使用Spark 2.1和Java。这是代码片段。 如何将上述代码中的struct2转换为StructType? 结构文件包含这个
输出如下: 如果两个数组的维数不相同,则元素到元素的操作是不可能的。 然而,在 NumPy 中仍然可以对形状不相似的数组进行操作,因为它拥有广播功能。 较小的数组会广播到较大数组的大小,以便使它们的形状可兼容。 如果满足以下规则,可以进行广播: 如果输入在每个维度中的大小与输出大小匹配,或其值正好为 1,则在计算中可它。 如果上述规则产生有效结果,并且满足以下条件之一,那么数组被称为可广播的。 数
原文:Broadcasting 另见:numpy.broadcast 术语广播描述了NumPy在算术运算时如何处理不同形状的数组。 在某些条件下,较小的数组“广播”成较大的数组以便有相同的形状。 广播提供了一种矢量化操作数组的方法,这样可以在C而不是Python中进行循环。 它可以在不制作不必要的数据副本的情况下实现这一点,并且通常可以实现高效 然而,有些情况下广播是一个坏主意,因为它会导致内存使
4.2.1.3 内部广播接收器 内部广播接收器是广播接收器,它将永远不会收到从内部应用以外发送的任何广播。 它由几个内部应用组成,用于保护内部应用处理的信息或功能。 要点(接收广播): 定义内部签名权限来接收广播。 声明使用内部签名权限来接收结果。 将导出属性显式设置为true。 需要静态广播接收器定义的内部签名权限。 需要内部签名来注册动态广播接收器。 确认内部签名权限是由内部应用定义的。 尽管
本文向大家介绍在SAP UI5中加载外部库,包括了在SAP UI5中加载外部库的使用技巧和注意事项,需要的朋友参考一下 可以使用普通脚本标签中的文件插入外部库。SAP UI5还支持JQuery,因此可以通过扩展控制器的标题来完成。 您还可以使用以下命令添加任何外部文件- 您可以导航到以下路径以了解更多详细信息- https://blogs.sap.com/2016/04/22/include-ex