当前位置: 首页 > 知识库问答 >
问题:

在Python中,无法在单个数据流作业中动态地处理多个流管道(N到N个管道)(使用运行时值提供程序)

谭嘉歆
2023-03-14

我试图启动一个流数据流作业,其中包含n个管道。

基于配置的主题和每个主题对应的BQ表,我想在一个流作业内启动一个管道。

import logging
import os
import json
from google.cloud import storage
from apache_beam import Pipeline, ParDo, DoFn
from apache_beam.io import ReadFromPubSub, WriteToBigQuery, BigQueryDisposition
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, WorkerOptions, GoogleCloudOptions, \
    SetupOptions


def _get_storage_service():
    storage_client = storage.Client \
        .from_service_account_json(
        json_credentials_path='C:\Users\dneema\PycharmProjects\iot_dataflow\df_stm_iot_pubsub_bq\service_account_credentials.json')
    print('storage service fetched')
    return storage_client


class RuntimeOptions(PipelineOptions):

    def __init__(self, flags=None, **kwargs):
        super(RuntimeOptions, self).__init__(flags, **kwargs)

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--bucket_name', type=str)
        parser.add_value_provider_argument('--config_json_path', type=str,)


class PipelineCreator:

    def __init__(self):
        self.options = PipelineOptions()
        storage_client = storage.Client.from_service_account_json(
            'service_account_credentials_updated.json')

        runtime_options = self.options.view_as(RuntimeOptions)
        bucket_name = str(runtime_options.bucket_name)
        config_json_path = str(runtime_options.config_json_path)

        # get the bucket with name
        bucket = storage_client.get_bucket(bucket_name)

        # get bucket file as blob
        blob = bucket.get_blob(config_json_path)

        # convert to string and load config
        json_data = blob.download_as_string()
        self.configData = json.loads(json_data)

        dataflow_config = self.configData['dataflow_config']
        self.options.view_as(StandardOptions).streaming = bool(dataflow_config['streaming'])
        self.options.view_as(SetupOptions).save_main_session = True

        worker_options = self.options.view_as(WorkerOptions)
        worker_options.max_num_workers = int(dataflow_config['max_num_worker'])
        worker_options.autoscaling_algorithm = str(dataflow_config['autoscaling_algorithm'])
        #worker_options.machine_type = str(dataflow_config['machine_type'])
        #worker_options.zone = str(dataflow_config['zone'])
        #worker_options.network = str(dataflow_config['network'])
        #worker_options.subnetwork = str(dataflow_config['subnetwork'])

    def run(self):
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'dataflow-service-account.json'

        project_id = self.configData['project_id']
        dataset_id = self.configData['dataset_id']
        topics = self.configData['topics']
        table_ids = self.configData['bq_table_ids']
        error_table_id = self.configData['error_table_id']

        logger = logging.getLogger(project_id)
        logger.info(self.options.display_data())

        pipeline = Pipeline(options=self.options)

        size = len(topics)
        for index in range(size):
            print(topics[index])
            pipeline_name = "pipeline_"+str(index)
            logger.info("Launch pipeline :: "+pipeline_name)
            messages = pipeline | 'Read PubSub Message in ' + pipeline_name >> ReadFromPubSub(topic=topics[index])
            logger.info("Read PubSub Message")
            valid_messages, invalid_messages = messages  | 'Convert Messages to TableRows in ' + pipeline_name >> ParDo(TransformMessageToTableRow()).with_outputs('invalid', main='valid')
            valid_messages | 'Write Messages to BigQuery in ' + pipeline_name >> WriteToBigQuery(table=table_ids[index],
                                                                                               dataset=dataset_id,
                                                                                               project=project_id,
                                                                                          write_disposition=BigQueryDisposition.WRITE_APPEND)

        pipeline.run().wait_until_finish()

class TransformMessageToTableRow(DoFn):

    def process(self, element, *args, **kwargs):
        logging.getLogger('dataflow').log(logging.INFO, element)
        print element
        print("element type ", type(element))
        print("inside bq pardo")
        import json
        try:
            message_rows = json.loads(element)

            # if using emulator, uncomment below line
            message_rows = json.loads(message_rows)
            print 'loaded element'
        except:
            try:
                element = "[" + element + "]"
                message_rows = json.loads(element)
            except Exception as e:
                print(e)
                from apache_beam import pvalue
                yield [pvalue.TaggedOutput('invalid', [element, str(e)])]
        print(message_rows)
        print("message rows", type(message_rows))
        if not isinstance(message_rows, list):
            message_rows = [message_rows]
        #rows = list()
        if isinstance(message_rows, list):

            for row in message_rows:
                try:
                    new_row = dict()
                    for k, v in row.items():
                        new_row[str(k)] = v
                    #rows.append(new_row)
                    print(new_row)
                    yield new_row
                except Exception as e:
                    print(e)
                    from apache_beam import pvalue
                    yield pvalue.TaggedOutput('invalid', [row, str(e)])

if __name__ == '__main__':
        PipelineCreator().run()

这里的运行时参数为bucket_name和config_json_path,用于所有与配置相关的数据集、BQ表、topics/Subscription和所有工作流选项。

这到底有没有可能?因为谷歌也提供了一对一的模板。不是很多对很多模板(例如三个主题-三个BQ表(三个数据流水线),n-n)。

共有1个答案

应涵容
2023-03-14

关于这个之前回答的线程,通过在Apache Beam中创建模板,您无法按照所需的顺序运行多个管道,您在任何时候都只能在一个模板中运行一个管道。您必须将模板创建委托给另一个服务,并通过它传递配置,只需遵循线程内的链接,您将了解如何示例。

 类似资料:
  • 问题内容: 我正在尝试使用Ruffus管道中的多个Sailq文件作为参数的Sailfish。我使用python中的子流程模块执行Sailfish,但即使设置,在子流程调用中也不起作用。 这是我要使用python执行的命令: 或(最好): 概括: 我将如何在python中执行此操作?子过程正确吗? 问题答案: 模拟bash进程替换: 在Python中,您可以使用命名管道: 在哪里: 实现bash进程

  • 我试图在管道完成后进行滞后更新,由于日期版本控制,表在运行时传入。由于此代码是作为模板执行的,因此需要使用NestedValueProviders。 如何在管道运行之外访问此值?在管道完成后,是否有更好的方法来做“只做一次”的工作?

  • 问题内容: 我想在一个定义管道构建作业的框架中利用Jenkins 的现有Mailer插件。给定以下简单的失败脚本,我希望每个构建版本都会收到一封电子邮件。 构建的输出为: 如您所见,它确实记录了它在失败后立即执行管道的过程,但是没有生成电子邮件。 利用自由工作的其他自由式工作中的电子邮件,只是通过管道工作来调用。 这与Jenkins 2.2和mailer 1.17一起运行。 是否有其他机制可以用来

  • 我想在谷歌数据流上运行一个管道,该管道取决于另一个管道的输出。现在,我正在本地使用DirectRunner依次运行两条管道: 我的问题如下: DataflowRunner是否保证第二个仅在第一个管道完成后启动

  • 我想通过管道传输我的hadoop流作业。例如,我运行了一个命令hadoop jarhadoop-streaming.jar-mappermap1.pyreducer.py-inputxx输出 /output1 但是我想使用第一步的输出作为第二步mapduce作业的输入,而不存储在hdfs中,也许输出为标准输出。有没有像linux管道一样的东西?比如hadoop jarhadoop-streamin

  • 我想在同一个Java过程中使用KCL处理多个Kinesis流。 想法很简单:为每个流创建一个新的KCL实例,然后并发运行worker。 我的问题是,在这种情况下,所有KCL实例是否都使用相同的线程池,以及在处理流处理时,这种想法是否是一种好的/常见的做法。 非常感谢。