当前位置: 首页 > 知识库问答 >
问题:

基于元素值的Google云存储数据流写入

万俟玉书
2023-03-14

我试图构建一个数据流过程,通过将数据存储到谷歌云存储中来帮助归档数据。我有一个事件数据的PubSub流,其中包含client_id和一些元数据。这个进程应该归档所有传入的事件,因此这需要是一个流管道。

我希望能够通过将接收到的每个事件放入类似gs://archive/client_id/eventdata.json的bucket中来处理事件归档。在DataFlow/Apache beam中是否可以这样做,特别是能够为pCollection中的每个事件分配不同的文件名?

编辑:所以我的代码当前看起来像:

public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {

private String customerId;

public PerWindowFiles(String customerId) {
  this.customerId = customerId;
}

@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
  String filename = bucket+"/"+customerId;
  return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}

@Override
public ResourceId unwindowedFilename(
    ResourceId outputDirectory, Context context, String extension) {
  throw new UnsupportedOperationException("Unsupported.");
}
}


public static void main(String[] args) throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
    .withValidation()
    .as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setStreaming(true);
Pipeline p = Pipeline.create(options);

PCollection<Event> set = p.apply(PubsubIO.readStrings()
                                     .fromTopic("topic"))
    .apply(new ConvertToEvent()));

PCollection<KV<String, Event>> events = labelEvents(set);
PCollection<KV<String, EventGroup>> sessions = groupEvents(events);

String customers = System.getProperty("CUSTOMERS");
JSONArray custList = new JSONArray(customers);
for (Object cust : custList) {
  if (cust instanceof String) {
    String customerId = (String) cust;
    PCollection<KV<String, EventGroup>> custCol = sessions.apply(new FilterByCustomer(customerId));
            stringifyEvents(custCol)
                .apply(TextIO.write()
                                               .to("gs://archive/")
                                               .withFilenamePolicy(new PerWindowFiles(customerId))
                                               .withWindowedWrites()
                                               .withNumShards(3));
  } else {
    LOG.info("Failed to create TextIO: customerId was not String");
  }
}

p.run()
    .waitUntilFinish();
}

这段代码很难看,因为每次发生新的客户机时,我都需要重新部署,以便能够保存它们的数据。我更希望能够动态地将客户数据分配到适当的桶中。

共有1个答案

梁新觉
2023-03-14

“Dynamic Destinations”--根据正在编写的元素选择文件名--将是Beam 2.1.0中的一个新功能,该功能尚未发布。

 类似资料:
  • 新手问题:我想使用一个云存储桶作为云CDN的原点。不确定这是否可能。目前,我已经为负载均衡器打开了CDN,但我的理解是,它将只缓存来自我的domain.com的内容,并有适当的标题集。当然,我们的假设是,CDN pops比bucket离我的用户位置更近,而bucket离我的用户位置更远,并且从CDN中获取比使用bucket URL(主要是静态图像)更快。谢了。

  • 我试图将一个用编写的函数迁移到 实时解压缩并逐行读取 对每一行执行一些光转换 将未压缩的输出(一次一行或块)写入GCS 输出是>2GB,但略小于3GB,所以它适合。 null AFAIC,我将坚持,因为输出可以放入内存--就目前而言--但是多部分上传是以最少的内存支持任何输出大小的方法。 想法还是替代方案?

  • 我正在尝试使用DataFlow(Java)将数据从云存储插入到Big Query中。我可以批量上传数据;但是,我想要设置一个流式上传代替。因此,当新对象添加到我的bucket时,它们将被推送到BigQuery。 我已经将PipelineOptions设置为流,并且在GCP控制台UI中显示dataflow管道是流类型的。bucket中的初始文件/对象集被推送到BigQuery。 但是当我向桶中添加新

  • 我们将Google Cloud Datastore用于Google App Engine(GAE)应用程序。在我们的项目改造期间,我们希望将数据库从Datastore迁移到Google CloudSQL。 我们在数据存储中约有1 TB数据,不包括索引。 如何从数据存储迁移到云SQL,是否有任何现有的开源解决方案可用于此。 我已经检查了下面的一个https://cloud.google.com/da

  • 这是可行的,但这里的问题是,在流回此方法的客户端之前,它必须首先缓冲所有字节。这会导致很多延迟,尤其是当存储在GCS中的文件很大时。 是否有一种方法可以从GCS获取文件并将其直接流到OutputStream,这里的OutputStream是针对servlet的。

  • 我对xslt有这个问题。样式表中有一个名为misctable的表,它被放在一个变量miscTables中。例如在节点 id="_384PLATE"具有值A1、A2、B1。当我标记并将其与xml进行比较时,我需要保存R[@i=2]/C/@i属性的值。在这种情况下,我需要得到值2,3,26 xml xslt 非常感谢。

  • 有人能帮我做这个吗?