我试图实现一个listflatten
函数,我使用simpledofn
实现了它,该函数运行良好,但用于并行化。我正在将函数转换为可拆分的Do函数。我使用Directrunner
在本地运行了一个包含5000个元素的单元测试,而在DataFlow中运行了相同的单元测试,但失败了,错误如下。
Error Details:
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: Shuffle key too large:3749653 > 1572864
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output (GroupAlsoByWindowsParDoFn.java:184)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue (GroupAlsoByWindowFnRunner.java:102)
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn.processElement (BatchGroupAlsoByWindowViaIteratorsFn.java:126)
at org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn.processElement (BatchGroupAlsoByWindowViaIteratorsFn.java:54)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement (GroupAlsoByWindowFnRunner.java:115)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement (GroupAlsoByWindowFnRunner.java:73)
at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement (GroupAlsoByWindowsParDoFn.java:114)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process (ParDoOperation.java:44)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process (OutputReceiver.java:49)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop (ReadOperation.java:201)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: Shuffle key too large:3749653 > 1572864
at com.abc.common.batch.functions.AbcListFlattenFn.splitRestriction (AbcListFlattenFn.java:68)
下面给出了本地DirectRunner和云数据流运行器之间的数据差异。
本地中的DirectRunner:
云中的DataflowRunner:
public class AbcList implements Serializable {
private List<Abc> abcs;
private List<Xyz> xyzs;
}
public class AbcListFlattenFn extends DoFn<AbcList, KV<Abc, List<Xyz>> {
@ProcessElement
public void process(@Element AbcList input,
ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker) {
try {
/* Below commented lines are without the Splittable DoFn
input.getAbcs().stream().forEach(abc -> {
context.output(KV.of(abc, input.getXyzs()));
}); */
for (long index = tracker.currentRestriction().getFrom(); tracker.tryClaim(index);
++index) {
context.output(KV.of(input.getAbcs().get(Math.toIntExact(index),input.getXyzs())));
}
} catch (Exception e) {
log.error("Flattening AbcList has failed ", e);
}
}
@GetInitialRestriction
public OffsetRange getInitialRestriction(AbcList input) {
return new OffsetRange(0, input.getAbcs().size());
}
@SplitRestriction
public void splitRestriction(final AbcList input,
final OffsetRange range, final OutputReceiver<OffsetRange> receiver) {
List<OffsetRange> ranges =
range.split(input.getAbcs().size() > 5000 ? 5000
: input.getAbcs().size(), 2000);
for (final OffsetRange p : ranges) {
receiver.output(p);
}
}
@NewTracker
public OffsetRangeTracker newTracker(OffsetRange range) {
return new OffsetRangeTracker(range);
}
}
洗牌键大小限制是由于原始大小。为了摆脱这个问题,您可能希望在您的SDF之前添加一个重新洗牌。重新洗牌会帮你做第一轮发行。
我得到了几次配额的增加,虽然这让作业比以前继续,但它仍然以相同的错误结束(尽管shuffle键的大小更大了)它现在似乎没有因为配额相关问题而碰壁。 除了放弃Dataprep和回到map Reduce之外,还有什么想法吗?
方法process()不更改记录的字段(键)值。假设所有算子的并行度都是2,那么keyBy()at(2)是否也会导致网络洗牌呢?也许keyBy()at(2)由于密钥值不变而具有前向策略避免网络通信代价的效果? 太好了~
如果和具有相同的分区符,
问题内容: 我想搜索给定目录中所有图像中的冲浪,并保存它们的关键点和描述符以供将来使用。我决定使用泡菜,如下所示: 当我尝试执行时,出现以下错误: 有人知道吗,这是什么意思,以及如何解决?我正在使用Python 2.6和Opencv 2.3.1 十分感谢 问题答案: 问题是您不能将cv2.KeyPoint转储到pickle文件中。我遇到了同样的问题,并设法通过本质上对关键点进行序列化和反序列化来解
本文向大家介绍MySQL大小写敏感导致的问题分析,包括了MySQL大小写敏感导致的问题分析的使用技巧和注意事项,需要的朋友参考一下 MYSQL对大小写敏感 见字如面,见标题知内容。你有遇到过因为MYSQL对大小写敏感而被坑的体验吗? 之前看过阿里巴巴Java开发手册,在MySql建表规约里有看到: 【强制】表名、字段名必须使用小写字母或数字 , 禁止出现数字开头,禁止两个下划线中间只 出现数字。数
大多数纸牌游戏都需要洗牌,也就是让纸牌随机排列。在第10.5节,我们看到了怎样生成随机数,但怎样利用随机数实现洗牌功能却并非显然意见的。 一种可行的方案是,模拟人洗牌的方法,将牌分为两堆,然后通过在每个牌堆中轮流选择的方式实现原牌堆的重新组织。因为一般而言,人并不能做到完美地洗牌,而程序经过大约7次迭代之后,牌堆中纸牌的顺序已经相当随机了。但是计算机程序每次在做完美洗牌的时候有一个令人讨厌的属性—