当前位置: 首页 > 知识库问答 >
问题:

使用python的Apache Beam/Dataflow pub/sub侧输入

经和洽
2023-03-14

我是Apache Beam的新手,所以我在以下场景中有点吃力:

  • 使用流模式发布/子主题
  • 转换以取出CustomerID
  • 使用transform/pardo的并行PCollection根据pub/sub主题中接收的“customerid”从Firestore中获取数据(使用Side Input)
  • ...
class getFirestoreUsers(beam.DoFn):
    def process(self, element, customerId):

        print(f'Getting Users from Firestore, ID: {customerId}')

        # Call function to initialize Database
        db = intializeFirebase()

        """ # get customer information from the database
        doc = db.document(f'Customers/{customerId}').get()
        customer = doc.to_dict() """
        usersList = {}

        # Get Optin Users
        try:
            docs = db.collection(
                f'Customers/{customerId}/DevicesWiFi_v3').where(u'optIn', u'==', True).stream()
            usersList = {user.id: user.to_dict() for user in docs}
        except Exception as err:
            print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
            print(err)

        return([usersList])
def run(argv=None):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--topic',
        type=str,
        help='Pub/Sub topic to read from')
    parser.add_argument(
        '--output',
        help=('Output local filename'))

    args, pipeline_args = parser.parse_known_args(argv)

    options = PipelineOptions(pipeline_args)
    options.view_as(SetupOptions).save_main_session = True
    options.view_as(StandardOptions).streaming = True

    p = beam.Pipeline(options=options)

    users = (p | 'Create chars' >> beam.Create([
        {
             "clientMac": "7c:d9:5c:b8:6f:38",
             "username": "Louis"
             },
        {
            "clientMac": "48:fd:8e:b0:6f:38",
            "username": "Paul"
        }
    ]))


    # Get Dictionary from Pub/Sub
    data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
            | 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e))
            )

    # Get customerId from Pub/Sub information
    PcustomerId = (data | 'get customerId from Firestore' >>
                   beam.ParDo(lambda x: [x.get('customerId')]))
    PcustomerId | 'print customerId' >> beam.Map(print)

    # Get Users from Firestore
    custUsers = (users | 'Read from Firestore' >> beam.ParDo(
        getFirestoreUsers(), customerId=beam.pvalue.AsSingleton(PcustomerId)))
    custUsers | 'print Users from Firestore' >> beam.Map(print)

共有1个答案

卫建义
2023-03-14

我不清楚示例代码中如何使用usersPCollection(因为elementprocess定义中没有处理)。我使用窗口重新排列了代码,并使用customer_id作为主要输入。

class GetFirestoreUsers(beam.DoFn):
  def setup(self):
    # Call function to initialize Database
    self.db = intializeFirebase()

  def process(self, element):
    print(f'Getting Users from Firestore, ID: {element}')

    """ # get customer information from the database
    doc = self.db.document(f'Customers/{element}').get()
    customer = doc.to_dict() """
    usersList = {}

    # Get Optin Users
    try:
        docs = self.db.collection(
            f'Customers/{element}/DevicesWiFi_v3').where(u'optIn', u'==', True).stream()
        usersList = {user.id: user.to_dict() for user in docs}
    except Exception as err:
        print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
        print(err)

    return([usersList])



data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
          | beam.WindowInto(window.FixedWindow(60))
          | 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e)))

# Get customerId from Pub/Sub information
customer_id = (data | 'get customerId from Firestore' >>
               beam.Map(lambda x: x.get('customerId')))
customer_id | 'print customerId' >> beam.Map(print)

# Get Users from Firestore
custUsers = (cutomer_id | 'Read from Firestore' >> beam.ParDo(
    GetFirestoreUsers())
custUsers | 'print Users from Firestore' >> beam.Map(print)

从你的评论来看:

当使用来自pub/sub的原始JSON数据运行“main”PCollection时,所需的数据(首先是customerID,然后是customers数据)尚未就绪

在阅读pub/sub主题时,你的意思是firestore中的数据还没有准备好?

您总是可以在主函数中将逻辑拆分为2个管道,然后一个接一个地运行它们。

 类似资料:
  • 我有一个数据流工作,将单个文件分割成x个记录(表)。这些流在bigQuery没有问题。 不过,我发现没有办法在结果出来后执行管道中的另一个阶段。 举个例子 根据上述内容,我希望运行以下内容: 是有无论如何运行管道的另一个部分后,up到bigQuery或这是不可能的?提前感谢。

  • 我试图在Apache Beam中使用BigtableIO的运行时参数来写入BigTable。 我创建了一个从 BigQuery 读取并写入 Bigtable 的管道。当我提供静态参数时,管道工作正常(使用 ConfigBigtableIO 和 ConfigBigtableConfiguration,请参阅此处的示例 - https://github.com/GoogleCloudPlatform/

  • 我有一个批次处理作业在数据流运行在gcp下版本apache-梁[gcp]==2.19.0的数据流运行。我为作业创建了一个自定义模板。作业正在按预期运行,但我还想添加最大作业持续时间。我在wait_until_finish()方法中找到了持续时间(毫秒)参数,它应该是可用的。问题是:如何让模板化批处理作业在运行时间超过持续时间时自动停止?我不需要保存任何数据,我只希望工作运行时间过长时停止。我已经实

  • 从这个问题中,我了解到Apache Flink中的SplitStream现在已被弃用,建议改用side输出。有人能举一个侧面输出如何替代splitStream的例子吗? e、 g.如何修改下面的代码段以使用侧输出?

  • 我正在处理一个更大的数据流管道,它在批处理模式下工作得很好,但完成后的重构确实有侧输入的问题。如果我将管道置于流模式并移除侧输入,管道在Google的数据流上可以完美地工作。 如果把所有东西都剥离下来,构建以下简短的脚本来封装这个问题,并能够与它一起玩。 在Google的Dataflow中以批处理作业的形式运行这个脚本可以完成它需要做的事情。请参阅从数据流中可视化的管道:

  • 问题内容: 在Linux上,命令ps aux输出每个统计信息具有多个列的进程列表。例如 我希望能够使用Python进行阅读,然后将每一行然后每一列分开,以便可以将它们用作值。 在大多数情况下,这不是问题: 我现在可以遍历流程以获取每一行并将其按空格分开,例如 但是,问题在于命令的最后一列有时在其中有空格。在上面的示例中,可以在命令中看到 将被拆分为 但我真的希望它是: 所以我的问题是,我该如何拆分