当前位置: 首页 > 知识库问答 >
问题:

从数据流管道写入BQ时的动态表名

蓝飞
2023-03-14

作为以下问答的后续问题:

https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey

我想与谷歌数据流工程团队(@jkff)确认尤金提出的第三个选项是否有可能使用谷歌数据流:

PCollection<KV<String, Iterable<String>>> output = ...;

public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {

private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
        this.keysAsSideinputs = keysAsSideinputs;
    }

@Override
    public void processElement(ProcessContext c) {
        List<String> keys = c.sideInput(keysAsSideinputs);
        String key = c.element().getKey();

        //the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
        c.element().getValue().apply(Pardo.of(new FormatLineFn()))
                .apply(TextIO.Write.to(key));

        c.output(1);
    }    
}    

共有1个答案

邵畅
2023-03-14

BigQueryIo.Write转换不支持这一点。您可以做的最接近的事情是使用每个窗口的表,并使用自定义WindowFN对选择窗口对象中的表所需的任何信息进行编码。

如果不想这样做,可以直接从DOFN调用BigQuery API。这样,您就可以将表名设置为任何您想要的名称,正如您的代码所计算的那样。这可以从侧输入中查找,也可以直接从DoFn当前正在处理的元素中计算。为了避免对BigQuery进行太多的小调用,可以使用finishBundle()对请求进行批量处理;

您可以在这里看到数据流运行器是如何导入流的:https://github.com/googlecloudplatform/dataflowjavasdk/blob/master/sdk/src/main/java/com/google/cloud/Dataflow/sdk/util/bigquerytableinserter.java

 类似资料:
  • 我希望这里有人能帮忙。我一直在谷歌上疯狂地搜索这个错误,但没有发现任何东西。 我有一个管道,在本地执行时工作得很好,但在GCP上执行时会失败。以下是我得到的错误信息。 工作流失败。原因:S03:写入转换fn/WriteMetadata/ResolveBeamFutures/CreateSingleton/Read-WriteMetadata/ResolveBeamFutures/ResolveFu

  • 我目前正试图根据数据中包含的特定键,将运行在Google Dataflow上的Beam管道分叉到多个目的地。当使用TaggedOutput标记对“fork”的每个endpoint进行硬编码时,我能够实现这一点。但是,在将来,我并不总是知道底层数据中存在哪些键,因此我希望使用类似于以下的for循环动态创建流程中的后续步骤: 我的理解是,<代码>的结果。with\u outputs()应该是可编辑的,

  • 据Beam网站报道, 通常,对管道代码执行本地单元测试比调试管道的远程执行更快更简单。 出于这个原因,我想对写到Bigtable的Beam/DataFlow应用程序使用测试驱动开发。 但是,在Beam测试文档之后,我遇到了一个僵局--Passert并不有用,因为输出PCollection包含org.apache.hadoop.hbase.client.Put对象,这些对象不重写equals方法。

  • 但是当我运行代码时,我会遇到以下异常: 你有什么想法会导致这种情况吗?

  • 我有一个流数据集,阅读Kafka并试图写到CSV 有没有一种方法可以通过编程模式和结构化流数据集来实现这一点?

  • 因此,服务器和客户端都发生这种情况。我有来自通道活动方法的通道处理程序上下文,我正在尝试使用写入AndFlush(对象消息)方法向其写入对象,但似乎消息永远不会进入创建的管道。 下面是我的客户端处理程序的样子(我重写了包解码器和编码器中的一些方法来调试) 这是我如何写入ChannelHandlerContext通道变量 当我运行我的代码时,“从客户端写入服务器的数据”是打印机,但“在客户端上编码的