当前位置: 首页 > 知识库问答 >
问题:

Java Spark-Java.lang.outofMemoryError:超过GC开销限制-大型数据集

呼延俊风
2023-03-14
final Dataset<Row> jdbcDF = sparkSession.read().format("jdbc")
            .option("url", "xxxx")
            .option("driver", "com.ibm.db2.jcc.DB2Driver")
            .option("query", sql)
            .option("user", "xxxx")
            .option("password", "xxxx")
            .load();
    final Encoder<GdxClaim> gdxClaimEncoder = Encoders.bean(GdxClaim.class);
    final Dataset<GdxClaim> gdxClaimDataset = jdbcDF.as(gdxClaimEncoder);
    System.out.println("BEFORE PARALLELIZE");
    final JavaRDD<GdxClaim> gdxClaimJavaRDD = javaSparkContext.parallelize(gdxClaimDataset.collectAsList());
    System.out.println("AFTER");
    final JavaRDD<ClaimResponse> gdxClaimResponse = gdxClaimJavaRDD.mapPartitions(mapFunc);
    mapFunc = (FlatMapFunction<Iterator<GdxClaim>, ClaimResponse>) claim -> {
        System.out.println(":D " + claim.next().getRBAT_ID());
        if (claim != null && !currentRebateId.equals((claim.next().getRBAT_ID()))) {
            if (redisCommands == null || (claim.next().getRBAT_ID() == null)) {
                    serObjList = Collections.emptyList();
                } else {

                    generateYearQuarterKeys(claim.next());

                    redisBillingKeys = redisBillingKeys.stream().collect(Collectors.toList());
                    final String[] stringArray = redisBillingKeys.toArray(new String[redisBillingKeys.size()]);
                    serObjList = redisCommands.mget(stringArray);

                    serObjList = serObjList.stream().filter(clientObj -> clientObj.hasValue()).collect(Collectors.toList());
                    deserializeClientData(serObjList);
                    currentRebateId = (claim.next().getRBAT_ID());
            }
        }
        return (Iterator) racAssignmentService.assignRac(claim.next(), listClientRegArr);

    };
final JavaRDD<GdxClaim> gdxClaimJavaRDD = javaSparkContext.parallelize(gdxClaimDataset.collectAsList());

我们不确定从这里到哪里去,完全被困住了。有人能帮忙吗?我们到处找了一些例子来帮忙。

共有1个答案

鲁建茗
2023-03-14

在高层,collectaslist()将把整个数据集带入内存,这是您需要避免的。

您可能希望一般地查看Dataset文档(与上面的链接相同)。它们解释了它的行为,包括javaRDD()方法,这可能是避免collectaslist()的方法。

请记住:将数据集收集到内存中的其他“终端”操作也会导致同样的问题。关键是在收集之前或过程中过滤到您的小子集,无论它是什么。

 类似资料:
  • 问题内容: 我在一个程序中创建了这个错误,该程序创建了几个(数十万)HashMap对象,每个对象都有几个(15-20)文本条目。这些字符串必须全部收集(不分解成较小的数量),然后再提交给数据库。 根据Sun的说法,该错误发生“如果在垃圾回收上花费了太多时间:如果在垃圾回收上花费了总时间的98%以上,而回收不到2%的堆,则将引发OutOfMemoryError。 ”。 显然,可以使用命令行将参数传递

  • 我已经阅读了与此错误相关的所有其他问题,并尝试了他们的解决方案,但没有任何帮助。 这是我的身材。格拉德尔 我在Android Studio 2.1.3和Android Studio 2.2.3上试过,我试过卸载jdk,然后重新安装。我已经格式化了窗口并重试,但没有用。 我一直在学习java。lang.OutOfMemoryError:超出GC开销限制错误 我该如何解决这个问题?

  • 但这并不能解决我的问题。我在我的分级中启用了multidex,因为没有它我会得到错误: com.android.dex.DexIndexOverflowException:方法ID不在[0,0xFFFF]:65536 所以这是一个解决方案,它在以前的Android Studio版本(也适用于公司的其他人,他们正在使用Android Studio1.4-2.0),但在我升级了我的Android St

  • 我得到一个java.lang.OutOfMemoryError:在Android1.4上运行gradle时超过了GC开销限制...以下是我的错误: 这怎么修复呢?

  • 什么是java。lang.OutOfMemoryError:Java堆空间意味着消息意味着应用程序只需要比正常运行可用的更多Java堆空间。 什么是java。lang.OutOfMemoryError:超出GC开销限制意味着由于某种原因,垃圾收集器占用了过多的时间(默认情况下占进程所有CPU时间的98%),每次运行时恢复的内存很少(默认情况下占堆的2%)。这在内部也意味着,当应用程序只需要比正常运

  • 我正在IntelliJ Idea Ultimate Edition 2020.2.2上运行Grails 2.5.0。它可以很好地编译和构建代码,但它会不断抛出“java.lang.OutOfMemoryError:超出GC开销限制”错误(整个错误都是复制并粘贴在最后)。以下是我在研究这个错误的基础上尝试的东西: 1)增加构建进程堆大小(在2G、4G和6G下尝试)https://intellij-s