当前位置: 首页 > 知识库问答 >
问题:

可拆分DoFn导致洗牌键过大的问题

海灵均
2023-03-14

我试图实现一个listflatten函数,我使用simpledofn实现了它,该函数运行良好,但用于并行化。我正在将函数转换为可拆分的Do函数。我使用Directrunner在本地运行了一个包含5000个元素的单元测试,而在DataFlow中运行了相同的单元测试,但失败了,错误如下。

Error Details: 
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: Shuffle key too large:3749653 > 1572864
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output (GroupAlsoByWindowsParDoFn.java:184)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue (GroupAlsoByWindowFnRunner.java:102)
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn.processElement (BatchGroupAlsoByWindowViaIteratorsFn.java:126)
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn.processElement (BatchGroupAlsoByWindowViaIteratorsFn.java:54)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement (GroupAlsoByWindowFnRunner.java:115)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement (GroupAlsoByWindowFnRunner.java:73)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement (GroupAlsoByWindowsParDoFn.java:114)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process (ParDoOperation.java:44)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process (OutputReceiver.java:49)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop (ReadOperation.java:201)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: Shuffle key too large:3749653 > 1572864
at com.abc.common.batch.functions.AbcListFlattenFn.splitRestriction (AbcListFlattenFn.java:68)

下面给出了本地DirectRunner和云数据流运行器之间的数据差异。

本地中的DirectRunner:

  1. 在示例输入PCollection元素中有5000个ABC

云中的DataflowRunner:

    null
   public class AbcList implements Serializable {
        private List<Abc> abcs;
        private List<Xyz> xyzs;
   }

        public class AbcListFlattenFn extends DoFn<AbcList, KV<Abc, List<Xyz>> {

            @ProcessElement
            public void process(@Element AbcList input,
                ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker) {

                try {
            /* Below commented lines are without the Splittable DoFn
                       input.getAbcs().stream().forEach(abc -> {
                                context.output(KV.of(abc, input.getXyzs()));
                         }); */

                    for (long index = tracker.currentRestriction().getFrom(); tracker.tryClaim(index);
                        ++index) {
                        context.output(KV.of(input.getAbcs().get(Math.toIntExact(index),input.getXyzs())));
                    }
                } catch (Exception e) {
                    log.error("Flattening AbcList has failed ", e);
                }

            }

            @GetInitialRestriction
            public OffsetRange getInitialRestriction(AbcList input) {
                return new OffsetRange(0, input.getAbcs().size());
            }

            @SplitRestriction
            public void splitRestriction(final AbcList input,
                final OffsetRange range, final OutputReceiver<OffsetRange> receiver) {
              List<OffsetRange> ranges =
                  range.split(input.getAbcs().size() > 5000 ? 5000
                        : input.getAbcs().size(), 2000);
                for (final OffsetRange p : ranges) {
                    receiver.output(p);
                }
            }

            @NewTracker
            public OffsetRangeTracker newTracker(OffsetRange range) {
                return new OffsetRangeTracker(range);
            }
        }

共有1个答案

哈栋
2023-03-14

洗牌键大小限制是由于原始大小。为了摆脱这个问题,您可能希望在您的SDF之前添加一个重新洗牌。重新洗牌会帮你做第一轮发行。

 类似资料:
  • 我得到了几次配额的增加,虽然这让作业比以前继续,但它仍然以相同的错误结束(尽管shuffle键的大小更大了)它现在似乎没有因为配额相关问题而碰壁。 除了放弃Dataprep和回到map Reduce之外,还有什么想法吗?

  • 方法process()不更改记录的字段(键)值。假设所有算子的并行度都是2,那么keyBy()at(2)是否也会导致网络洗牌呢?也许keyBy()at(2)由于密钥值不变而具有前向策略避免网络通信代价的效果? 太好了~

  • 问题内容: 我想搜索给定目录中所有图像中的冲浪,并保存它们的关键点和描述符以供将来使用。我决定使用泡菜,如下所示: 当我尝试执行时,出现以下错误: 有人知道吗,这是什么意思,以及如何解决?我正在使用Python 2.6和Opencv 2.3.1 十分感谢 问题答案: 问题是您不能将cv2.KeyPoint转储到pickle文件中。我遇到了同样的问题,并设法通过本质上对关键点进行序列化和反序列化来解

  • 本文向大家介绍MySQL大小写敏感导致的问题分析,包括了MySQL大小写敏感导致的问题分析的使用技巧和注意事项,需要的朋友参考一下 MYSQL对大小写敏感 见字如面,见标题知内容。你有遇到过因为MYSQL对大小写敏感而被坑的体验吗? 之前看过阿里巴巴Java开发手册,在MySql建表规约里有看到: 【强制】表名、字段名必须使用小写字母或数字 , 禁止出现数字开头,禁止两个下划线中间只 出现数字。数

  • 大多数纸牌游戏都需要洗牌,也就是让纸牌随机排列。在第10.5节,我们看到了怎样生成随机数,但怎样利用随机数实现洗牌功能却并非显然意见的。 一种可行的方案是,模拟人洗牌的方法,将牌分为两堆,然后通过在每个牌堆中轮流选择的方式实现原牌堆的重新组织。因为一般而言,人并不能做到完美地洗牌,而程序经过大约7次迭代之后,牌堆中纸牌的顺序已经相当随机了。但是计算机程序每次在做完美洗牌的时候有一个令人讨厌的属性—