我试图使用Spark 1.0在HBase(0.96.0-hadoop2)中编写一些简单的数据,但我一直遇到序列化问题。以下是相关代码:
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext
import java.util.Properties
import java.io.FileInputStream
import org.apache.hadoop.hbase.client.Put
object PutRawDataIntoHbase{
def main(args: Array[String]): Unit = {
var propFileName = "hbaseConfig.properties"
if(args.size > 0){
propFileName = args(0)
}
/** Load properties here **/
val theData = sc.textFile(prop.getProperty("hbase.input.filename"))
.map(l => l.split("\t"))
.map(a => Array("%010d".format(a(9).toInt)+ "-" + a(0) , a(1)))
val tableName = prop.getProperty("hbase.table.name")
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.rootdir", prop.getProperty("hbase.rootdir"))
hbaseConf.addResource(prop.getProperty("hbase.site.xml"))
val myTable = new HTable(hbaseConf, tableName)
theData.foreach(a=>{
var p = new Put(Bytes.toBytes(a(0)))
p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
myTable.put(p)
})
}
}
运行代码会导致:
Failed to run foreach at putDataIntoHBase.scala:79
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException:org.apache.hadoop.hbase.client.HTable
用map替换foreach不会崩溃,但我也不会写。任何帮助都将不胜感激。
类HBaseConfiguration
表示到HBase服务器的连接池。显然,它无法序列化并发送到工作节点。由于HTable
使用此池与HBase服务器通信,因此无法对其进行序列化。
基本上,有三种方法来处理这个问题:
注意foreach分区
方法的使用:
val tableName = prop.getProperty("hbase.table.name")
<......>
theData.foreachPartition { iter =>
val hbaseConf = HBaseConfiguration.create()
<... configure HBase ...>
val myTable = new HTable(hbaseConf, tableName)
iter.foreach { a =>
var p = new Put(Bytes.toBytes(a(0)))
p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
myTable.put(p)
}
}
请注意,每个工作节点都必须有权访问HBase服务器,并且必须预先安装或通过ADD_JARS
提供所需的jar。
还要注意的是,如果为每个分区打开连接池,那么最好将分区的数量大致减少到工作节点的数量(使用coalesce
函数)。也可以在每个工作节点上共享一个HTable
实例,但这并不是那么简单。
可以使用单台计算机从RDD写入所有数据,即使数据不适合内存。详细信息在本答案中解释:Spark:将大数据从RDD检索到本地机器的最佳实践
当然,它比分布式编写要慢,但它很简单,不会带来痛苦的序列化问题,如果数据大小合理,它可能是最好的方法。
可以为HBase创建自定义HadoopOutputFormat,也可以使用现有的HadoopOutputFormat。我不确定是否有适合你需要的东西,但谷歌应该在这里提供帮助。
顺便说一句,map
调用不会崩溃,因为它不会被评估:RDD不会被评估,直到你调用一个有副作用的函数。例如,如果调用了数据。地图(……)。如果继续,它也会崩溃。
我对Spark,Scala和Cassandra都是新手。使用Spark,我试图从MySQL获取一些ID。 我可以看到在控制台打印的ID。 当我试图在每个提取id上运行相同的函数时 它给出与例外相同的例外 在阅读spark-shell中的Apache spark:“sparkException:Task not serializable”后,我尝试将@transient添加到RDDs中
null 每当我尝试访问sc时,我会得到以下错误。我在这里做错了什么?
问题内容: 我们在Spark上使用Redis来缓存键值对,这是代码: 但是编译器给了我这样的反馈: 有人可以告诉我如何序列化从Redis获得的数据。非常感谢。 问题答案: 在Spark中,s(如此处)上的函数被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该可序列化。 Redis连接不可序列化,因为它打开了到目标DB的TCP连接,该TCP连接已绑定到创建它的机器。 解决方案是
这给出的错误如下,任何帮助将是感激的:
我有一个行的RDD,我想基于闭包进行过滤。最终,我想将闭包作为参数传递给正在进行过滤器的方法,但我已经简化了它,我可以用这样简单的东西重现错误。 我尝试将fn放入一个case对象中,这个对象扩展了一个可序列化的特性,在调用过滤器的方法的内部和外部定义了fn。我正在努力弄清楚我需要做什么,而不会出现这些错误。我知道在堆栈溢出上已经有很多关于这个的问题,我一直在寻找一个合适的答案,但我找不到。 更新:
在我的程序中,我有一个返回一些RDD的方法,我们称它为,它接受一个不可序列化的参数,并让RDD的类型为(我真正的RDD是元组类型,但只包含基元类型)。 当我尝试这样的事情时: 我得到的。 当我用替换(即某个常数)时,它会运行。 从序列化跟踪中,它试图序列化,并在那里阻塞,但我仔细检查了我的方法,这个对象从未出现在RDD中。 当我试图直接收集的输出时,即 我也没有问题。 该方法使用获取(本地)值序列