当前位置: 首页 > 工具软件 > Apache Beam > 使用案例 >

Apache Beam 问题汇总

端木朝
2023-12-01

Apache Beam 问题汇总

  1. 泛型擦除问题
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for ParDo(Anonymous)/ParMultiDo(Anonymous).output [PCollection@577127077]. Correct one of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().
  Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for java.lang.Object.
  Building a Coder using a registered CoderProvider failed.
  See suppressed exceptions for detailed failures.
  Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:507)
	at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:286)
	at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:117)
	at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifying(TransformHierarchy.java:226)
	at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:212)
	at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
	at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:598)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
	at com.yss.beam.BeamMutiInput.main(BeamMutiInput.java:49)

这个问题一般发生在使用TupleTag的时候,需要在TupleTag初始化时加个{}即可

final TupleTag<Long> ATag = new TupleTag<Long>(){};
final TupleTag<Long> BTag = new TupleTag<Long>(){};

如果还不行,那就只能使用setCoder手动声明类型

PCollectionTuple resultTuple = pipeline
                .apply(Create.of("hello world", "hello flink", "hello flink spark"))
                .apply(ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(@Element String word, MultiOutputReceiver out) {
                        if (word.endsWith("world")) {
                            out.get(endWithWorldTag).output(word);
                        } else if (word.endsWith("spark")) {
                            out.get(endWithSparkTag).output(word);
                        } else {
                            out.get(endWithFlinkTag).output(word);
                        }
                    }
                }).withOutputTags(endWithFlinkTag, TupleTagList.of(endWithSparkTag).and(endWithWorldTag)));
		// 在此处使用setCoder来声明类型
        resultTuple.get(endWithFlinkTag).setCoder(StringUtf8Coder.of()).apply(ParDo.of(new DoFn<String, String>() {
            @ProcessElement
            public void processElement(@Element String element){
                System.out.println("flink Tag " + element);
            }
        }));
 类似资料: