但是当我运行代码时,我会遇到以下异常:
package com.google.cloud.dataflow.examples;
import java.io.Serializable;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.PCollection;
public class CalcMeanExample
public static void main(String[] args)
{
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
PCollection<String> numbers = p.apply(TextIO.Read.named("ReadLines").withCoder(StringUtf8Coder.of()).from(options.getInput()));
numbers.apply( ParDo.of( new DoFn<String,String>(){
@Override
public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
System.out.println( c.element() );
}
}));
PCollection<String> average = numbers.apply( Combine.globally( new AverageFn()));
average.apply(TextIO.Write.named("WriteAverage")
.to(options.getOutput())
.withNumShards(options.getNumShards()));
p.run();
System.out.println( "done" );
}
public static class AverageFn extends CombineFn<String, AverageFn.Accum, String> {
@DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable {
int sum = 0;
int count = 0;
}
public Accum createAccumulator() { return new Accum(); }
public void addInput(Accum accum, String input) {
accum.sum += Integer.parseInt(input );
accum.count++;
}
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for (Accum accum : accums) {
merged.sum += accum.sum;
merged.count += accum.count;
}
return merged;
}
public String extractOutput(Accum accum) {
return Double.toString( ((double) accum.sum) / accum.count );
}
}
/**
* Options supported by {@link WordCount}.
* <p>
* Inherits standard configuration options.
*/
public static interface Options extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
String getInput();
void setInput(String value);
@Description("Path of the file to write to")
@Default.InstanceFactory(OutputFactory.class)
String getOutput();
void setOutput(String value);
/**
* Returns gs://${STAGING_LOCATION}/"sorts.txt" as the default destination.
*/
public static class OutputFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
if (dataflowOptions.getStagingLocation() != null) {
return GcsPath.fromUri(dataflowOptions.getStagingLocation())
.resolve("sorts.txt").toString();
} else {
throw new IllegalArgumentException("Must specify --output or --stagingLocation");
}
}
}
/**
* By default (numShards == 0), the system will choose the shard count.
* Most programs will not need this option.
*/
@Description("Number of output shards (0 if the system should choose automatically)")
@Default.Integer(1)
int getNumShards();
void setNumShards(int value);
}
你有什么想法会导致这种情况吗?
我们意识到了这个问题,并正在努力修复,应该很快就会出现。
目前,您应该能够使用SerializableCoder而不是AvroCoder来实现累加器。
在大约14个工作小时后,我有一个云数据流管道失败,下面是一条神秘的日志消息: 谢了!
想知道是否有某种“钩子”来放置apache beam管道关闭时将执行的一段代码(无论出于何种原因-崩溃、取消) 每次数据流停止时,我都需要删除pubsub主题的订阅。
我使用beam SDK用python编写了一个Google数据流管道。有一些文档介绍了我如何在本地运行它,并设置runner标志以在数据流上运行它。 我现在正尝试将其自动部署到CI管道(bitbucket管道,但并不真正相关)。有关于如何“运行”管道的文档,但没有真正的“部署”管道。我测试过的命令如下: 这将运行作业,但因为它正在流式传输,所以永远不会返回。它还在内部管理打包并推送到存储桶。我知道
我们在datalab中运行了一个Python管道,它从google云存储(导入google.datalab.storage)中的存储桶中读取图像文件。最初我们使用DirectRunner,效果很好,但现在我们尝试使用DataflowRunner,并且出现导入错误。即使在管道运行的函数中包含“import google.datalab.storage”或其任何变体,也会出现错误,例如“没有名为'da
tl;dr Apache Beam管道步骤涉及构建docker图像;如何使用谷歌数据流运行这个管道?存在哪些替代方案? 我目前正在尝试使用谷歌的数据流服务和apache梁(python)迈出第一步。 简单的例子很简单,但当外部软件依赖性开始发挥作用时,事情就会让我感到困惑。似乎可以使用自定义docker容器来设置自己的环境[1][2]。虽然这对大多数依赖项来说都很好,但如果依赖项是docker本身
我正在尝试设置我的开发环境。我一直在使用pubsub模拟器进行开发和测试,而不是在生产中使用谷歌云pubsub。为此,我设置了以下环境变量: 这适用于python google pubsub库,但当我切换到使用java apache beam进行google数据流时,管道仍然指向生产google pubsub。管道上是否有需要设置的设置、环境变量或方法,以便管道读取本地pubsub仿真器?