当前位置: 首页 > 知识库问答 >
问题:

NotSerializableException:Google云数据流管道中的org.apache.avro.io.DecoderFactory

商高谊
2023-03-14

但是当我运行代码时,我会遇到以下异常:

package com.google.cloud.dataflow.examples;

import java.io.Serializable;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.PCollection;


public class CalcMeanExample 
public static void main(String[] args) 
{
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline p = Pipeline.create(options);

    PCollection<String> numbers = p.apply(TextIO.Read.named("ReadLines").withCoder(StringUtf8Coder.of()).from(options.getInput()));

    numbers.apply( ParDo.of( new DoFn<String,String>(){
        @Override
        public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {

            System.out.println( c.element() );

        }
    }));

    PCollection<String> average = numbers.apply( Combine.globally( new AverageFn()));


    average.apply(TextIO.Write.named("WriteAverage")
            .to(options.getOutput())
            .withNumShards(options.getNumShards()));

    p.run();

    System.out.println( "done" );
}


public static class AverageFn extends CombineFn<String, AverageFn.Accum, String> {
        @DefaultCoder(AvroCoder.class)   
        public static class Accum implements Serializable {
         int sum = 0;
         int count = 0;
       }
       public Accum createAccumulator() { return new Accum(); }
       public void addInput(Accum accum, String input) {
           accum.sum += Integer.parseInt(input );
           accum.count++;
       }
       public Accum mergeAccumulators(Iterable<Accum> accums) {
         Accum merged = createAccumulator();
         for (Accum accum : accums) {
           merged.sum += accum.sum;
           merged.count += accum.count;
         }
         return merged;
       }
       public String extractOutput(Accum accum) {
         return Double.toString( ((double) accum.sum) / accum.count );
       }
     }



  /**
   * Options supported by {@link WordCount}.
   * <p>
   * Inherits standard configuration options.
   */
  public static interface Options extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInput();
    void setInput(String value);

    @Description("Path of the file to write to")
    @Default.InstanceFactory(OutputFactory.class)
    String getOutput();
    void setOutput(String value);

    /**
     * Returns gs://${STAGING_LOCATION}/"sorts.txt" as the default destination.
     */
    public static class OutputFactory implements DefaultValueFactory<String> {
      @Override
      public String create(PipelineOptions options) {
        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
        if (dataflowOptions.getStagingLocation() != null) {
          return GcsPath.fromUri(dataflowOptions.getStagingLocation())
              .resolve("sorts.txt").toString();
        } else {
          throw new IllegalArgumentException("Must specify --output or --stagingLocation");
        }
      }
    }

     /**
     * By default (numShards == 0), the system will choose the shard count.
     * Most programs will not need this option.
     */
    @Description("Number of output shards (0 if the system should choose automatically)")
    @Default.Integer(1)
    int getNumShards();
    void setNumShards(int value);
  }     

你有什么想法会导致这种情况吗?

共有1个答案

叶文博
2023-03-14

我们意识到了这个问题,并正在努力修复,应该很快就会出现。

目前,您应该能够使用SerializableCoder而不是AvroCoder来实现累加器。

 类似资料:
  • 在大约14个工作小时后,我有一个云数据流管道失败,下面是一条神秘的日志消息: 谢了!

  • 想知道是否有某种“钩子”来放置apache beam管道关闭时将执行的一段代码(无论出于何种原因-崩溃、取消) 每次数据流停止时,我都需要删除pubsub主题的订阅。

  • 我使用beam SDK用python编写了一个Google数据流管道。有一些文档介绍了我如何在本地运行它,并设置runner标志以在数据流上运行它。 我现在正尝试将其自动部署到CI管道(bitbucket管道,但并不真正相关)。有关于如何“运行”管道的文档,但没有真正的“部署”管道。我测试过的命令如下: 这将运行作业,但因为它正在流式传输,所以永远不会返回。它还在内部管理打包并推送到存储桶。我知道

  • 我们在datalab中运行了一个Python管道,它从google云存储(导入google.datalab.storage)中的存储桶中读取图像文件。最初我们使用DirectRunner,效果很好,但现在我们尝试使用DataflowRunner,并且出现导入错误。即使在管道运行的函数中包含“import google.datalab.storage”或其任何变体,也会出现错误,例如“没有名为'da

  • tl;dr Apache Beam管道步骤涉及构建docker图像;如何使用谷歌数据流运行这个管道?存在哪些替代方案? 我目前正在尝试使用谷歌的数据流服务和apache梁(python)迈出第一步。 简单的例子很简单,但当外部软件依赖性开始发挥作用时,事情就会让我感到困惑。似乎可以使用自定义docker容器来设置自己的环境[1][2]。虽然这对大多数依赖项来说都很好,但如果依赖项是docker本身

  • 我正在尝试设置我的开发环境。我一直在使用pubsub模拟器进行开发和测试,而不是在生产中使用谷歌云pubsub。为此,我设置了以下环境变量: 这适用于python google pubsub库,但当我切换到使用java apache beam进行google数据流时,管道仍然指向生产google pubsub。管道上是否有需要设置的设置、环境变量或方法,以便管道读取本地pubsub仿真器?