当前位置: 首页 > 知识库问答 >
问题:

带有混合工作负载的数据流作业-流式插入和加载作业(Python)

谭云瀚
2023-03-14

https://cloud.google.com/blog/products/data-analytics/how-to-how高效处理实时和聚合数据

用例说明步骤:

  1. 从pubsub获取流式原始事件。
  2. 验证接收的原始事件。
  3. 筛选特定类型的事件。
  4. 创建筛选事件的字典。
  5. 同时,将筛选的事件通过窗口操作传递并聚合。
  6. 2种输出类型-原始事件字典、聚合事件字典。
  7. 按照上面链接中解释的设计,原始事件字典属于低紧急度类别,聚合事件属于高紧急度类别。
  8. 为原始事件尝试“file_loads”方法以避免成本部分。
  9. 正在为聚合事件尝试'streaming_insert'方法,因为该方法需要实时可用。
p = beam.Pipeline(argv=argv)
    valid_msgs, errors = (p
                          | 'Read from Pubsub' >>
                          beam.io.ReadFromPubSub(subscription=c['SUBSCRIPTION']).with_output_types(bytes)
                          | 'Validate PubSub Event' >> beam.ParDo(ValidateMessages()).with_outputs('errors', main='valid')
                          )

    filtered_events = (valid_msgs | 'Filter Events' >> beam.Filter(filter_msgs))

    raw_events = (filtered_events | 'Prepare Raw Event Row for BQ ' >> beam.Map(get_raw_values))

    agg_events = (filtered_events
                  | f'Streaming Window for {c["WINDOW_TIME"]} seconds' >> beam.WindowInto(window.FixedWindows(c['WINDOW_TIME']))
                  | 'Event Parser' >> beam.Map(get_agg_values)
                  | 'Event Aggregation' >> beam.CombinePerKey(sum)
                  | 'Prepare Aggregate Event Row for BQ' >> beam.Map(get_count)
                  )

    # Raw events are written to BigQuery using 'Load Jobs' every 10 minutes.
    write_result_raw = (raw_events | 'Write Raw Events to BQ' >> beam.io.WriteToBigQuery(c["RAW_TABLE"],
                                                                                         project=c["PROJECT"],
                                                                                         dataset=c["DATASET_NAME"],
                                                                                         method='FILE_LOADS',
                                                                                         triggering_frequency=10))

    # Aggregated events are written to BigQuery using 'Streaming Inserts'.
    write_result_agg = (agg_events | 'Write Aggregate Results to BQ' >> beam.io.WriteToBigQuery(c["COUNT_TABLE"],
                                                                                                project=c["PROJECT"],
                                                                                                dataset=c["DATASET_NAME"],
                                                                                                create_disposition=CreateDisposition.CREATE_NEVER,
                                                                                                write_disposition=WriteDisposition.WRITE_APPEND,
                                                                                                insert_retry_strategy=RetryStrategy.RETRY_ALWAYS))

错误:

File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1493, in expand
42    'triggering_frequency can only be used with '
43ValueError: triggering_frequency can only be used with FILE_LOADS method of writing to BigQuery.

根据@Iñigo响应,我添加了标志。但这也不起作用。请看下面的详细信息。

 if c['FILE_LOAD']:
        argv.append('--experiments=use_beam_bq_sink')

 p = beam.Pipeline(argv=argv)
 records | 'Write Result to BQ' >> beam.io.WriteToBigQuery(c["RAW_TABLE"],
                                                                  project=c["PROJECT"],
                                                                  dataset=c["DATASET_NAME"],
                                                                  method='FILE_LOADS',
                                                                  triggering_frequency=c['FILE_LOAD_FREQUENCY'],
                                                                  create_disposition=CreateDisposition.CREATE_NEVER,
                                                                  write_disposition=WriteDisposition.WRITE_APPEND,
                                                                  insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
                                                                  )```

Error from dataflow job.
```Workflow failed.Causes: Because of the shape of your pipeline, the Cloud Dataflow job optimizer produced a job graph that is not updatable using the - -update pipeline option.This is a known issue that we are working to resolve.See https: // issuetracker.google.com / issues / 118375066 for information about how to modify the shape of your pipeline to avoid this error.You can override this error and force the submission of the job by specifying the --experiments=allow_non_updatable_job parameter., The stateful transform named 'Write Errors to BQ/BigQueryBatchFileLoads/ImpulseSingleElementPC/Map(decode).out/FromValue/ReadStream' is in two or more computations.```


EDITS : 08/11/2020

Added both the flags mentioned as the Pipeline argument.

```INFO:root:Argument to Beam Pipeline:['--project=xxxxxx, '--runner=DataflowRunner', '--job_name=df-pubsub-raw', '--save_main_session', '--staging_location=gs:/staging/', '--temp_location=gs://temp/', '--network=dataflow-localnet', '--subnetwork=regions/us-central1/subnetworks/us-central1', '--region=us-central1', '--service_account_email=xxx@YYY.iam.gserviceaccount.com', '--no_use_public_ips', '--streaming', '--experiments=[allow_non_updatable_job, use_beam_bq_sink]']

INFO:root:File load enabled 
INFO:root:Write using file load with frequency:5  

26  File "./dataflow_ps_stream_bq.py", line 133, in stream_to_bq  27    write_disposition=WriteDisposition.WRITE_APPEND  28  File "/usr/local/lib/python3.6/site-packages/apache_beam/pvalue.py", line 141, in __or__  29    return self.pipeline.apply(ptransform, self)  30  File "/usr/local/lib/python3.6/site-packages/apache_beam/pipeline.py", line 610, in apply  31    transform.transform, pvalueish, label or transform.label)  32  File "/usr/local/lib/python3.6/site-packages/apache_beam/pipeline.py", line 620, in apply  33    return self.apply(transform, pvalueish)  34  File "/usr/local/lib/python3.6/site-packages/apache_beam/pipeline.py", line 663, in apply  35    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)  36  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 153, in apply  37    return super(DataflowRunner, self).apply(transform, input, options)  38  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 198, in apply  39    return m(transform, input, options)  40  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/runner.py", line 228, in apply_PTransform  41    return transform.expand(input)  42  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1493, in expand  43    'triggering_frequency can only be used with '  44ValueError: triggering_frequency can only be used with FILE_LOADS method of writing to BigQuery.  ```

共有1个答案

柴星津
2023-03-14

您需要添加标志--实验use_beam_bq_sink。这个问题已经存在一段时间了,Dataflow覆盖了load类型。

你可以在光束回购中看到这一点。

它也看起来有一个正在进行的公关来改进插入,包括这一点(我只是略过代码)。

 类似资料:
  • 当我在GCP中运行作业时,这工作很好,但如果没有任何更新,它将失败。如果我删除update标志,只要没有正在运行的作业,它就可以正常工作。 是否有一种方法来指定,如果作业存在,则更新它,而不仅仅是开始一个新的作业?

  • 在我当前的架构中,多个数据流作业在不同阶段被触发,作为ABC框架的一部分,我需要捕获这些作业的作业id作为数据流管道中的审计指标,并在BigQuery中更新它。 如何使用JAVA从管道中获取数据流作业的运行id?有没有我可以使用的现有方法,或者我是否需要在管道中使用google cloud的客户端库?

  • 使用“file_loads”技术通过Apache Beam数据流作业写入BigQuery时出错。流式插入(else块)工作正常,符合预期。file_load(如果块)失败,错误在代码后面给出。bucket中GCS上的临时文件是有效的JSON对象。 来自pub/sub的原始事件示例: 数据流作业出错:

  • 读完这个问题后,我仍然对DataFlow/Apache Beam如何分配工作负载有一些疑问。我遇到的问题可以用下面的代码演示: 比较使用1个worker和5个worker时的最大吞吐量,而不是后者的效率高5倍,它只是稍微高一点。这让我对以下问题产生了疑问: 假设每个工作线程使用4个vCPU,那么每个线程是否绑定到特定的DoFn,或者如果需要提高性能,可以在给定时刻对所有线程调用相同的DoFns?

  • 问题内容: 即使发布订阅队列不断增加(现在有100k未送达消息),我使用的流数据流job()也不会超过1个Worker-您有什么想法吗? 目前与和一起运行。 问题答案: 数据流工程师在这里。我在后端查看了该工作,发现它没有扩大规模,因为CPU利用率低,这意味着其他一些因素会限制管道的性能,例如外部限制。在这些情况下,升级很少有帮助。 我发现某些捆绑包可能要花费数小时才能处理。我建议调查您的管道逻辑

  • Google docu表示,工作负载标识可以用来授权GKE POD使用Google API提供的服务(而且效果很好)。它还表示,将有一个自动创建的标识池,名为PROJECT\u ID.svc。id.goog。 关于工作负载标识联合的Docu说:“您可以使用工作负载标识池来组织和管理外部标识。” 在我按照这里所述配置了工作负载标识(并且工作正常)之后,我正在尝试检索项目中现有的工作负载标识池,我希望