小编近来要做用户推荐相关的一些工作,根据调研,目前用于搜索、推荐等的算法以阿里妈妈、美团等团队的DIN等算法为主,因此需要祭出神经网络这样的武器,但奈何公司的大数据基础建设仍有待提高,直接使用高维度的item变量及用户变量进行模型开发可能会面临一定的工程难度,因此考虑使用循环神经网络的方法,将用户的点击页、浏览页作为一个个词,并用来评估用户下一步对产品、活动的兴趣点,而且LSTM的实现相对来讲也比较简单,不管是python的tensorflow还是keras都有现成的到layer的封装。笔者考虑到团队内部主要是Java为主的开发团队,只能先把Python那一套完整的东西放到一边,试试看spark+scala是不是能够将LSTM跑通。
目前与spark、scala/JAVA集成最好的应该是skymind公司开源的Deeplearning4j(https://deeplearning4j.org/cn/spark ),其中的scalanet称得上是为scala工程师构建的keras,deeplearning4j是一款非常优秀的开源软件依赖,非常适合java工程师进行神经网络模型开发,与此同时,与spark和scala的集成也堪称优秀,笔者在进行代码开发时重点参考了https://blog.csdn.net/wangongxi/article/details/60775940的一系列文章。
同时,为了使用scala跑通LSTM模型,更是参考了wangongxi博客里面的LSTM文本分类的java代码,文本进行分词后再进行one-hot,进入lstm的识别层并进行embeding处理成低维向量,再做循环神经网络的训练,对于笔者后续要开展的用户推荐,只需要将分词替换为用户的历史浏览和历史点击,再将文本分类label替换成用户未来一段时间对产品和活动页面的点击和浏览评分即可形成一套用户产品推荐的LSTM系统,因此,考虑到数据的安全性,本篇仍以文本的情感分类对deeplearning4j系统进行说明,java同学可以直接移步wangongxi博客,笔者采用的文本与其完全一致,均为http://spaces.ac.cn/archives/3414/的文本,在此感谢两位大神的付出。
0、首先给出模型建立使用的依赖,依赖共有三个部分,一部分是针对spark的,主要有建立spark的Session和JavaRDD,需要说明的是,deeplearning4j在模型训练时,需要使用java的sparkcontext和JAVARDD;另一部分是用于分词的结巴分词依赖,com.huaban.analysis.jieba.JiebaSegmenter,关于java分词的内容,在网上有很多资料,本处就不赘述;第3部分是关于Deeplearning4j的依赖,包括其本身以及nd4j的依赖。
import org.apache.spark.sql.SparkSession
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.deeplearning4j.nn.api.OptimizationAlgorithm
import org.deeplearning4j.nn.conf.{BackpropType, NeuralNetConfiguration}
import org.deeplearning4j.nn.conf.layers.{RnnOutputLayer,LSTM,EmbeddingLayer,GravesLSTM}
import org.deeplearning4j.optimize.listeners.ScoreIterationListener
import org.deeplearning4j.spark.impl.multilayer.SparkDl4jMultiLayer
import org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster
import org.nd4j.linalg.activations.Activation
import org.nd4j.linalg.api.ndarray.INDArray
import org.nd4j.linalg.dataset.DataSet
import org.nd4j.linalg.factory.Nd4j
import com.huaban.analysis.jieba.JiebaSegmenter
import org.nd4j.linalg.lossfunctions.LossFunctions
import org.deeplearning4j.nn.conf.inputs.InputType
import org.nd4j.linalg.learning.config.Adam
import org.nd4j.evaluation.classification.Evaluation
import scala.collection.mutable.ArrayBuffer
1、建立sparksession,需要说明的是给出了一个JavaSC,还有一个spark本身的SparkSeesion,这两个可以同时使用,spark主要便于读取csv,javasc则用于后面的模型训练,同时导入了分词的对象
val spark:SparkSession = {SparkSession
.builder()
.master("local")
.appName("Spark LSTM Emotion Analysis")
.getOrCreate()
}
import spark.implicits._
val JavaSC = JavaSparkContext.fromSparkContext(spark.sparkContext)
val segmenter = new JiebaSegmenter
2、读取csv并进行分词形成需要的JAVARDD
def getTrainingData(spark: SparkSession): JavaRDD[DataSet] = {
//Get data. For the sake of this example, we are doing the following operations:
// File -> String -> List<String> (split into length "sequenceLength" characters) -> JavaRDD<String> -> JavaRDD<DataSet>
//较好的评价
val goodCSV = spark.read.format("csv").load("/home/lorry/下载/pos.csv").toDF("describe")
//较差的评价
val badCSV = spark.read.format("csv").load("/home/lorry/下载/neg.csv").toDF("describe")
//设置正则
val regEx = """[`~!@#$%^&*()+=|{}':;',『』[\-]《》\\[\\][\"][ ]\[\][0123456789].<>/?~!@#¥%……&*()——+|{}【】‘;:”“’。,、?]"""
val signPattern = regEx.r
val targetColumns = goodCSV.columns
//在rdd中对评论进行分词并给出标签
val goodRDD = goodCSV.select(targetColumns.head, targetColumns.tail: _*).rdd.map(x => {
val word: String = x(0).asInstanceOf[String]
val wordSplit = segmenter.sentenceProcess(signPattern.replaceAllIn(word.trim, "")).toArray().mkString(" ")
("正面", wordSplit)
}
).filter(row => (row._2.size > 0))
//在rdd中对评论进行分词并给出标签
val badRDD = badCSV.select(targetColumns.head, targetColumns.tail: _*).rdd.map(x => {
val word: String = x(0).asInstanceOf[String]
val wordSplit = segmenter.sentenceProcess(signPattern.replaceAllIn(word.trim, "")).toArray().mkString(" ")
("负面", wordSplit)
}
).filter(row => (row._2.size > 0))
//汇总
val totalRDD = goodRDD.union(badRDD)
//count一下
totalRDD.count()
//得到词的统计并按照个数进行降序排列用于生成一个词对index的map
val WORD_TO_INT: Map[String, Int] = {
val VOCAB = totalRDD.flatMap(x => x._2.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
.map(row => row._1)
.collect()
VOCAB.zipWithIndex.toMap
}
//将最大的数目设为200,此处需要说明一下,本来最大数目约为1500,但笔者在尝试时一直报内存溢出的错误,分析下来确实是1500的词长度超出了内存的限制,所以考虑到大部分词句的长度都在100左右,因此设置了两百作为最大长度
val maxCorpusLength = 200//totalRDD.map(row => row._2.split(" ").length).collect().max
//词袋的大小,58000左右
val VOCAB_SIZE = totalRDD.flatMap(x => x._2.split(" ")).collect.distinct.length
//标签,只有两个
val labelWord = totalRDD.flatMap(x => x._1.split(" ")).collect.distinct
//生成最终的JAVARDD[DataSet],此处的DataSet并不是Spark的DataSet,而是nd4j的DataSet,实际上是向量形式,有两种格式,一种是只含input和output两个张量,另一个则加上了labermask和featuresmask,用于说明对应位置的label和features是否考虑,具体可见wangongxi的文章解释,也可以直接goto到类下面看函数解释
//此处参考wangongxi的文章设置,给了features和label的mask表示,但笔者的词语最大长度为200
val totalDataSet = totalRDD.map(row => {
val listWords = if (row._2.split(" ").length>=200) row._2.split(" ").take(200) else row._2.split(" ")
// val listWords = totalRDD.take(1)(0)._2.split(" ")
val label = row._1
// val label = totalRDD.take(1)(0)._1
val features: INDArray = Nd4j.create(1, 1, maxCorpusLength)
val labels = Nd4j.create(1, labelWord.length, maxCorpusLength)
val featuresMask = Nd4j.zeros(1.toLong, maxCorpusLength.toLong)
val labelsMask = Nd4j.zeros(1.toLong, maxCorpusLength.toLong)
labelsMask.shape()
val origin = new Array[Int](3)
val mask = new Array[Int](2)
var i: Int = 0
for (word <- listWords) {
features.putScalar(Array(1, 1, i), WORD_TO_INT(word))
featuresMask.putScalar(Array(0, i), 1)
i += 1
}
val lastIdx: Int = listWords.size
val idx = labelWord.indexOf(label)
labels.putScalar(Array[Int](0, idx, lastIdx - 1), 1.0)
labelsMask.putScalar(Array[Int](0, lastIdx - 1), 1.0)
new DataSet(features, labels, featuresMask, labelsMask)
})
totalDataSet.toJavaRDD()
}
3、设置LSTM的网络层,词袋尺寸为50758,每个词在第2步会被替换成对应的序号,例如最多的"的",会变成0,依次类推,然后再进入LSTM的layer0,即embeding层,进行embeding,将每个词由序号变为256维的向量,接着进入GravesLSTM层和RNN输出层,不做过多解释,由于本处使用的是Deeplearning4j的1.0.0-beta3包,所以与wangongxi的语法略有差别,体现在updater处,将learningrate等参数放在此处设置;接着设置参数平均的频率、每个batch进入的数据条目等,并生成一个参数平均控制器,这是使用Deeplearning4j进行神经网络训练必须要的;最后再将javaSC、conf、参数平均控制器集合在一起,形成训练用的网络控制器。
val VOCAB_SIZE = 50758
val conf = {
new NeuralNetConfiguration.Builder()
.seed(1234)
.optimizationAlgo(OptimizationAlgorithm.STOCHASTIC_GRADIENT_DESCENT)
.updater(Adam.builder().learningRate(1e-4).beta1(0.9).beta2(0.999).build())
.l2(5 * 1e-4)
.list()
.layer(0, new EmbeddingLayer.Builder().nIn(VOCAB_SIZE).nOut(256).activation(Activation.IDENTITY).build())
.layer(1, new GravesLSTM.Builder().nIn(256).nOut(256).activation(Activation.SOFTSIGN).build())
.layer(2, new RnnOutputLayer.Builder(LossFunctions.LossFunction.MCXENT)
.activation(Activation.SOFTMAX).nIn(256).nOut(2).build())
.pretrain(false).backprop(true)
.setInputType(InputType.recurrent(VOCAB_SIZE))
.build()
}
val examplesPerDataSetObject = 1
val averagingFrequency: Int = 5
val batchSizePerWorker: Int = 20
val trainMaster = {
new ParameterAveragingTrainingMaster.Builder(examplesPerDataSetObject)
.workerPrefetchNumBatches(0)
.saveUpdater(true)
.averagingFrequency(averagingFrequency)
.batchSizePerWorker(batchSizePerWorker)
.build()
}
val sparkNetwork: SparkDl4jMultiLayer = new SparkDl4jMultiLayer(JavaSC, conf, trainMaster)
4、进行训练,训练10个循环,将训练数据和测试数据按照0.5/0.5分配,每个训练循环结束给出对训练数据和测试数据的预测准确率。
var numEpoch=0
val emotionWordData = getTrainingData(spark)
val Array(trainingData,testingData) = emotionWordData.randomSplit(Array(0.5,0.5))
testingData.count()
val resultArray = new ArrayBuffer[Array[String]](0)
for (numEpoch <- 1 to 10){
sparkNetwork.fit(trainingData)
val trainEvaluation:Evaluation = sparkNetwork.evaluate(trainingData)
val trainAccuracy = trainEvaluation.accuracy()
val testEvaluation:Evaluation = sparkNetwork.evaluate(testingData)
val testAccuracy = testEvaluation.accuracy()
System.out.println("====================================================================")
System.out.println("Epoch " + numEpoch + " Has Finished")
System.out.println("Train Accuracy: " + trainAccuracy)
System.out.println("Test Accuracy: " + testAccuracy)
System.out.println("====================================================================")
resultArray.append(Array(trainAccuracy.toString, testAccuracy.toString))
}
5、代码结束,为了方便大家参考,给出笔者使用的依赖包,deeplearning4j发展的很快,因此不同版本之间的代码设计差别比较大,在使用时一定要注意添加的依赖包版本。下面有部分包是笔者进行其他模型开发使用的,大家可视情况进行删减,spark是我使用本地的local模式进行调试,到集群上使用时需要修改为yarn模式,其余代码直接参考上文,在idea中使用Object对代码进行封装即可。再次感谢wangongxi同学的java版本以及http://spaces.ac.cn/archives/3414/作者提供的文本。
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<nd4j.backend>nd4j-native-platform</nd4j.backend>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<shadedClassifier>bin</shadedClassifier>
<java.version>1.8</java.version>
<nd4j.version>1.0.0-beta3</nd4j.version>
<dl4j.version>1.0.0-beta3</dl4j.version>
<datavec.version>1.0.0-beta3</datavec.version>
<arbiter.version>1.0.0-beta3</arbiter.version>
<guava.version>25.1-jre</guava.version>
<jfreechart.version>1.5.0</jfreechart.version>
<dl4j.spark.version>1.0.0-beta3</dl4j.spark.version>
<aws.sdk.version>1.11.109</aws.sdk.version>
<jcommander.version>1.72</jcommander.version>
<scala.binary.version>2.11</scala.binary.version>
<hadoop.version>2.7.4</hadoop.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>nd4j-api</artifactId>
<version>${nd4j.version}</version>
</dependency>
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>nd4j-common</artifactId>
<version>${nd4j.version}</version>
</dependency>
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>nd4j-cuda-9.2-platform</artifactId>
<version>${nd4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost4j</artifactId>
<version>0.80</version>
</dependency>
<dependency>
<groupId>ml.dmlc</groupId>
<artifactId>xgboost4j-spark</artifactId>
<version>0.80</version>
</dependency>
<dependency>
<groupId>org.apache.predictionio</groupId>
<artifactId>apache-predictionio-core_2.11</artifactId>
<version>0.13.0</version>
<scope>provided</scope>
</dependency>
<!-- ND4J后端。每个DL4J项目都需要一个。一般将artifactId指定为"nd4j-native-platform"或者"nd4j-cuda-7.5-platform" -->
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>${nd4j.backend}</artifactId>
<version>${dl4j.version}</version>
</dependency>
<!-- DL4J核心功能 -->
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>deeplearning4j-core</artifactId>
<version>${dl4j.version}</version>
</dependency>
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>deeplearning4j-nlp</artifactId>
<version>${dl4j.version}</version>
</dependency>
<!-- 强制指定使用UI/HistogramIterationListener时的guava版本 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- datavec-data-codec:仅用于在视频处理示例中加载视频数据 -->
<dependency>
<artifactId>datavec-data-codec</artifactId>
<groupId>org.datavec</groupId>
<version>${datavec.version}</version>
</dependency>
<!-- 用于前馈/分类/MLP*和前馈/回归/RegressionMathFunctions示例 -->
<dependency>
<groupId>org.jfree</groupId>
<artifactId>jfreechart</artifactId>
<version>${jfreechart.version}</version>
</dependency>
<!-- Arbiter:用于超参数优化示例 -->
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>arbiter-deeplearning4j</artifactId>
<version>${arbiter.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.datavec/datavec-api -->
<dependency>
<groupId>org.datavec</groupId>
<artifactId>datavec-hadoop</artifactId>
<version>${dl4j.version}</version>
</dependency>
<!-- Logging Dependencies -->
<dependency>
<groupId>com.typesafe.scala-logging</groupId>
<artifactId>scala-logging_2.11</artifactId>
<version>3.5.0</version>
</dependency>
<!-- ND4J -->
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>nd4j-native-platform</artifactId>
<version>${nd4j.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
</dependency>
<!-- ND4J -->
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>nd4j-api</artifactId>
<version>${nd4j.version}</version>
</dependency>
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>scalnet_2.11</artifactId>
<version>1.0.0-beta2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-emr</artifactId>
<version>${aws.sdk.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${aws.sdk.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>${jcommander.version}</version>
</dependency>
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>dl4j-spark_${scala.binary.version}</artifactId>
<version>${dl4j.spark.version}_spark_2</version>
</dependency>
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>dl4j-spark-parameterserver_${scala.binary.version}</artifactId>
<version>${dl4j.spark.version}_spark_2</version>
</dependency>
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>com.huaban</groupId>
<artifactId>jieba-analysis</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>3.14</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>3.14</version>
</dependency>
<!-- 处理excel和上面功能是一样的-->
<dependency>
<groupId>net.sourceforge.jexcelapi</groupId>
<artifactId>jxl</artifactId>
<version>2.6.12</version>
</dependency>
</dependencies>