当前位置: 首页 > 编程笔记 >

java 中Spark中将对象序列化存储到hdfs

樊熠彤
2023-03-14
本文向大家介绍java 中Spark中将对象序列化存储到hdfs,包括了java 中Spark中将对象序列化存储到hdfs的使用技巧和注意事项,需要的朋友参考一下

java 中Spark中将对象序列化存储到hdfs

摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.

废话不多说, 直接贴代码了. spark1.4 + hbase0.98

import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source

object Word2VecDemo {

 def convertScanToString(scan: Scan) = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
 }

 def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
  sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  sparkConf.set("spark.kryoserializer.buffer", "256m")
  sparkConf.set("spark.kryoserializer.buffer.max","2046m")
  sparkConf.set("spark.akka.frameSize", "500")
  sparkConf.set("spark.rpc.askTimeout", "30")
  

  val sc = new SparkContext(sparkConf)
  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")

  hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")

  val scan = new Scan()
  val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
  
  val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")
  
  val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
  "data".getBytes,
  "article".getBytes,
  CompareOp.EQUAL,
  comp
  )
  
  filterList.addFilter(articleFilter)
  filterList.addFilter(new PageFilter(100))
  
  scan.setFilter(filterList)
  scan.setCaching(50)
  scan.setCacheBlocks(false)
  hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))

  val crawledRDD = sc.newAPIHadoopRDD(
   hbaseConf,
   classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result]
  )
 
  val articlesRDD = crawledRDD.filter{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     content != null
   }
  }

  val wordsInDoc = articlesRDD.map{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
     else Seq("")
   }
  }
  
  val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
  
  val word2vec = new Word2Vec()
  val model = word2vec.fit(fitleredWordsInDoc)
  
  //---------------------------------------重点看这里-------------------------------------------------------------
  //将上面的模型存储到hdfs
  val hadoopConf = sc.hadoopConfiguration
  hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
  val fileSystem = FileSystem.get(hadoopConf)
  val path = new Path("/user/hadoop/data/mllib/word2vec-object")
  val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
  oos.writeObject(model)
  oos.close
  
  //这里示例另外一个程序直接从hdfs读取序列化对象使用模型
  val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
  val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  
  /*
  * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型
  * import java.io._
  * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
  * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
  * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  * ois.close
  */
  //--------------------------------------------------------------------------------------------------------------
 }
}


感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

 类似资料:
  • 我试图将包含Contact类型的序列化对象的文件读入ArrayList ContactsCollection。我遇到的问题是联系的对象从未添加到ArrayList中。

  • 问题内容: 问题是,当我将序列化对象存储在.txt文件中时,它的格式不是可读的,并且包含一些随机的符号和字母。首先,我想知道其背后的原因是什么,然后如何解决此问题。 好了,这是我的代码:我要序列化的对象 序列化 } 反序列化 输出: 问题答案: 存储在文件中的序列化对象不可读 除了通过反序列化之外,它们并不可读。 问题是,当我将序列化对象存储在.txt文件中时,它的格式不是可读的,并且包含一些随机

  • 本文向大家介绍Java中对象序列化与反序列化详解,包括了Java中对象序列化与反序列化详解的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Java中对象序列化与反序列化。分享给大家供大家参考。具体如下: 一、简介 对象序列化(Serializable)是指将对象转换为字节序列的过程,而反序列化则是根据字节序列恢复对象的过程。 序列化一般用于以下场景: 1.永久性保存对象,保存对象的字节序列

  • 我正在尝试准备一个库(用Java编写)来运行在Apache-Spark上。由于该库有数百个类,并且仍处于活跃的开发阶段,所以我不想一一序列化所有的类。相反,我搜索了另一个方法,并找到了这个方法,但它同样不能解决序列化问题。 下面是代码示例: 这会产生年份4d的“对象不可序列化”异常: 顺便说一下,如果我将命令Action collect()替换为foreach(func), 那么,我的问题是为什么

  • 以下代码导致此异常: 所以问题是:如何在GSON序列化和反序列化的泛型HashMap中获得正确的实例?

  • 问题内容: 我试图序列化一对映射并获得如下异常: 有某种方式可以将此序列化吗? 问题答案: 我从这里做了一些改进。在这里测试和工作SerializationUtils类