作为以下问答的后续问题:
https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey
我想与谷歌数据流工程团队(@jkff)确认尤金提出的第三个选项是否有可能使用谷歌数据流:
PCollection<KV<String, Iterable<String>>> output = ...;
public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {
private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
this.keysAsSideinputs = keysAsSideinputs;
}
@Override
public void processElement(ProcessContext c) {
List<String> keys = c.sideInput(keysAsSideinputs);
String key = c.element().getKey();
//the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
c.element().getValue().apply(Pardo.of(new FormatLineFn()))
.apply(TextIO.Write.to(key));
c.output(1);
}
}
BigQueryIo.Write转换不支持这一点。您可以做的最接近的事情是使用每个窗口的表,并使用自定义WindowFN对选择窗口对象中的表所需的任何信息进行编码。
如果不想这样做,可以直接从DOFN调用BigQuery API。这样,您就可以将表名设置为任何您想要的名称,正如您的代码所计算的那样。这可以从侧输入中查找,也可以直接从DoFn当前正在处理的元素中计算。为了避免对BigQuery进行太多的小调用,可以使用finishBundle()对请求进行批量处理;
您可以在这里看到数据流运行器是如何导入流的:https://github.com/googlecloudplatform/dataflowjavasdk/blob/master/sdk/src/main/java/com/google/cloud/Dataflow/sdk/util/bigquerytableinserter.java
我希望这里有人能帮忙。我一直在谷歌上疯狂地搜索这个错误,但没有发现任何东西。 我有一个管道,在本地执行时工作得很好,但在GCP上执行时会失败。以下是我得到的错误信息。 工作流失败。原因:S03:写入转换fn/WriteMetadata/ResolveBeamFutures/CreateSingleton/Read-WriteMetadata/ResolveBeamFutures/ResolveFu
我目前正试图根据数据中包含的特定键,将运行在Google Dataflow上的Beam管道分叉到多个目的地。当使用TaggedOutput标记对“fork”的每个endpoint进行硬编码时,我能够实现这一点。但是,在将来,我并不总是知道底层数据中存在哪些键,因此我希望使用类似于以下的for循环动态创建流程中的后续步骤: 我的理解是,<代码>的结果。with\u outputs()应该是可编辑的,
据Beam网站报道, 通常,对管道代码执行本地单元测试比调试管道的远程执行更快更简单。 出于这个原因,我想对写到Bigtable的Beam/DataFlow应用程序使用测试驱动开发。 但是,在Beam测试文档之后,我遇到了一个僵局--Passert并不有用,因为输出PCollection包含org.apache.hadoop.hbase.client.Put对象,这些对象不重写equals方法。
但是当我运行代码时,我会遇到以下异常: 你有什么想法会导致这种情况吗?
我有一个流数据集,阅读Kafka并试图写到CSV 有没有一种方法可以通过编程模式和结构化流数据集来实现这一点?
因此,服务器和客户端都发生这种情况。我有来自通道活动方法的通道处理程序上下文,我正在尝试使用写入AndFlush(对象消息)方法向其写入对象,但似乎消息永远不会进入创建的管道。 下面是我的客户端处理程序的样子(我重写了包解码器和编码器中的一些方法来调试) 这是我如何写入ChannelHandlerContext通道变量 当我运行我的代码时,“从客户端写入服务器的数据”是打印机,但“在客户端上编码的