当前位置: 首页 > 知识库问答 >
问题:

从Spark到HBase:org写作。阿帕奇。火花SparkException:任务不可序列化

胡飞舟
2023-03-14

我在我大学的热图项目中,我们必须从txt文件(坐标、高度)中获取一些数据(212Go),然后将其放入HBase以在带有Express的Web客户端上检索它。

我练习使用144Mo文件,这是工作:

SparkConf conf = new SparkConf().setAppName("PLE");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> data = context.textFile(args[0]);
Connection co = ConnectionFactory.createConnection(getConf());
createTable(co);
Table table = co.getTable(TableName.valueOf(TABLE_NAME));
Put put = new Put(Bytes.toBytes("KEY"));

for (String s : data.collect()) {
    String[] tmp = s.split(",");
    put.addImmutable(FAMILY,
                    Bytes.toBytes(tmp[2]),
                    Bytes.toBytes(tmp[0]+","+tmp[1]));
}

table.put(put);

但是我现在使用212Go文件,我有一些内存错误,我猜收集方法会收集内存中的所有数据,所以212Go太多了。

所以现在我在尝试这个:

SparkConf conf = new SparkConf().setAppName("PLE");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> data = context.textFile(args[0]);
Connection co = ConnectionFactory.createConnection(getConf());
createTable(co);
Table table = co.getTable(TableName.valueOf(TABLE_NAME));
Put put = new Put(Bytes.toBytes("KEY"));

data.foreach(line ->{
    String[] tmp = line.split(",");
    put.addImmutable(FAMILY,
                    Bytes.toBytes(tmp[2]),
                    Bytes.toBytes(tmp[0]+","+tmp[1]));
});

table.put(put);

我得到了“org.apache.spark.SparkException:Task not serializable”,我搜索了它并尝试了一些修复,但没有成功,我在这里读到了:Task not serializable:java。伊奥。仅在类而非对象上调用闭包外函数时NotSerializableException

其实我并不完全理解这个话题,我只是一个学生,也许我的问题的答案是显而易见的,也许不是,无论如何,提前谢谢!

共有1个答案

慎旭尧
2023-03-14

根据经验,序列化数据库连接(任何类型)都没有意义。无论是否有火花,都没有设计成序列化和反序列化。

为每个分区创建连接:

data.foreachPartition(partition -> {
  Connection co = ConnectionFactory.createConnection(getConf());
  ... // All required setup
  Table table = co.getTable(TableName.valueOf(TABLE_NAME));
  Put put = new Put(Bytes.toBytes("KEY"));
   while (partition.hasNext()) {
     String line = partition.next();
     String[] tmp = line.split(",");
     put.addImmutable(FAMILY,
                Bytes.toBytes(tmp[2]),
                Bytes.toBytes(tmp[0]+","+tmp[1]));
   }
   ... // Clean connections
});

我还建议阅读官方Spark Streaming编程指南中有关使用foreachRDD的设计模式。

 类似资料:
  • 将现有应用程序从Spark 1.6移动到Spark 2.2*(最终)会导致错误“org.apache.spark.SparkExctive:任务不可序列化”。我过于简化了我的代码,以演示同样的错误。代码查询拼花文件以返回以下数据类型:“org.apache.spark.sql.数据集[org.apache.spark.sql.行]”我应用一个函数来提取字符串和整数,返回字符串。一个固有的问题与Sp

  • 我的spark任务是在运行时抛出不可序列化的任务。谁能告诉我我做错了什么吗? 以下是stacktrace:

  • 我已经上了三节课 任务未序列化

  • 我想将转换流写入Elasticsearch索引,如下所示: 行抛出错误(见下文)。我尝试了不同的方法来解决这个问题(例如,在旁边添加),但似乎没有任何效果。 它是否与Hadoop的配置有关?(我参考了以下消息:) 更新:

  • 我在尝试将spark数据帧的一列从十六进制字符串转换为双精度字符串时遇到了一个问题。我有以下代码: 我无法共享txs数据帧的内容,但以下是元数据: 但当我运行这个程序时,我得到了一个错误: 错误:类型不匹配;找到:MsgRow需要:org.apache.spark.sql.行MsgRow(row.getLong(0),row.getString(1),row.getString(2),hex2in