当前位置: 首页 > 工具软件 > Apache Beam > 使用案例 >

使用Apache Beam进行ETL —将数据从API加载到BigQuery

葛安和
2023-12-01

We were recently faced with a challenge at work where we needed to consume customer details and activities from Exponea. The biggest limitation we encountered was with the event data, the API only allowed you consume events for one customer at a time. We had approximately 2 million users and this meant we would have to make 2 million API calls — a number that would only continue to increase.

最近,我们在工作中面临挑战,需要消耗Exponea的客户详细信息和活动。 我们遇到的最大限制是事件数据,该API仅允许您一次消耗一位客户的事件。 我们大约有200万用户,这意味着我们将不得不进行200万次API调用-这个数字只会继续增加。

Our initial approach was to write a python script that would make these API calls and fetch said data into a Postgres instance of CloudSQL. It worked! However, this script (which was deployed on a Compute Engine instance) took approximately 7 days to run. This was unacceptable as it would consume a lot of resources while limiting our refresh frequency. We needed a more scalable solution.

我们最初的方法是编写一个Python脚本,该脚本将进行这些API调用并将所述数据提取到CloudSQL的Postgres实例中。 有效! 但是,此脚本(已部署在Compute Engine实例上)运行大约需要7天。 这是不可接受的,因为它将消耗大量资源,同时限制了我们的刷新频率。 我们需要一个更具扩展性的解决方案。

Optimization options that we could have considered included splitting the list of customer ids into smaller applications, and perhaps some multithreading. However, this exact situation is what distributed analytical engines were built for. My top options were Apache Spark and Apache Beam, and as a heavy Spark user I was inclined to go with the latter. However, as any Spark afficionado would know, this would require setting up and managing infrastructure (whether open source or in the cloud). Since my company’s cloud service provider of choice is GCP, and there is a managed service called Dataflow which would run Beam scripts, I opted for Apache Beam.

我们本可以考虑的优化选项包括将客户ID列表拆分为较小的应用程序,也许还有一些多线程处理。 但是,正是这种确切情况是为分布式分析引擎构建的。 我的首选是Apache Spark和Apache Beam,作为Spark的重度用户,我倾向于使用后者。 但是,正如任何Spark爱好者都知道的那样,这将需要设置和管理基础结构(无论是开源的还是云中的)。 由于我公司的首选云服务提供商是GCP,并且有一个名为Dataflow的托管服务可以运行Beam脚本,因此我选择了Apache Beam。

管道 (The Pipeline)

This article is divided into two parts:

本文分为两个部分:

  1. Developing the Apache Beam script

    开发Apache Beam脚本
  2. Deploying and Scheduling on GCP

    在GCP上进行部署和计划

开发Apache Beam脚本 (Developing the Apache Beam script)

There are 3 options for developing in Apache Beam; Java, Python and Go. I used the Python SDK for this development since that is the language I work in primarily.I was initially concerned about the learning curve for beam, but if you’re familiar with Python then it shouldn’t be a steep one.

在Apache Beam中有3个开发选项。 Java,Python和Go。 我使用Python SDK进行开发是因为我主要使用该语言。起初我一直在关注Beam的学习曲线,但是如果您熟悉Python,那么它就不应该太陡峭了。

Beam has configured data sources and sinks, but you can also configure your own sources and/or sinks. For example, there is currently no JDBC sink for the Python SDK. If you choose to go the route of implementing a custom data source, it’s important that you add a Reshuffle() step in your pipeline in order to take advantage of Dataflow’s parallelization capabilities.

Beam已配置了数据源和接收器,但您也可以配置自己的源和/或接收器。 例如,Python SDK目前没有JDBC接收器。 如果选择实施自定义数据源的路线,则必须在管道中添加Reshuffle()步骤,以利用Dataflow的并行化功能,这一点很重要。

环境设定 (Environment Setup)

One good thing about Apache Beam scripts is that you can run them anywhere, including on your local PC with no setup required! This is a big deal for a former Spark user like me because it means I can develop, test and debug locally and simply change my runner when I am ready to deploy.

关于Apache Beam脚本的一件好事是,您可以在任何地方运行它们,包括在本地PC上运行而无需进行设置! 对于像我这样的前Spark用户来说,这很重要,因为这意味着我可以在本地进行开发,测试和调试,并且在准备部署时只需更改我的运行器即可。

All you need to do is create a virtual environment in Python (best practice) and install the library and you’re good to go.

您所需要做的就是用Python创建一个虚拟环境(最佳实践)并安装该库,您就可以开始了。

pip install apache-beam[gcp]

编写脚本 (Writing the script)

For my use case, I would be using the built-in BigQuery source and sink. I would implement a custom class myself to call the API however. This was pretty straightforward, all I needed to do was inherit the DoFn class in my custom class and define a process() function which will be called by Beam when I pass a PCollection to this class.

对于我的用例,我将使用内置的BigQuery源和接收器。 我自己实现一个自定义类来调用API。 这非常简单,我要做的就是继承自定义类中的DoFn类并定义一个process()函数,当我将PCollection传递给此类时,Beam将调用该函数。

All that’s left is to construct the pipeline to read the customer ids from BigQuery, make the API call for each customer and write the events to another BigQuery table

剩下的就是构建管道以从BigQuery读取客户ID,为每个客户进行API调用并将事件写入另一个BigQuery表

在GCP上进行部署和计划 (Deploying and Scheduling on GCP)

I deployed the above script on Dataflow and set the maximum number of workers to 5. For testing, I simply run the python script in my terminal. To deploy this script as a dataflow template on GCP I used the following command and specified the DataflowRunner.

我在Dataflow上部署了上述脚本,并将最大工作程序数设置为5。为了进行测试,我只需要在终端中运行python脚本。 为了在GCP上将此脚本部署为数据流模板,我使用了以下命令并指定了DataflowRunner。

在GCP上安排 (Scheduling on GCP)

My least favorite part of dataflow has to be the scheduling because it requires making use of two other services on GCP. However, it’s a pretty straightforward process using Cloud Functions and scheduling using Cloud Scheduler. These can be done via the GCP console.

我最不喜欢数据流的部分是调度,因为它需要利用GCP上的其他两个服务。 但是,使用Cloud Functions和使用Cloud Scheduler进行调度是一个非常简单的过程。 这些可以通过GCP控制台完成。

I wrote a simple Cloud Function that would take in a template location on Cloud Storage, a job name and a location and would trigger the start of a Dataflow job using the template via the Dataflow API.

我编写了一个简单的Cloud Function,它将获取Cloud Storage上的模板位置,作业名称和位置,并通过Dataflow API使用该模板触发Dataflow作业的开始。

Finally, I created a job on Cloud Scheduler to send a message to the PubSub topic which acts as a trigger for the Cloud Function and set it to run periodically.

最后,我在Cloud Scheduler上创建了一个作业,以将消息发送到PubSub主题,该主题充当Cloud Function的触发器,并将其设置为定期运行。

结论 (Conclusion)

I was able to decrease runtime to 6 hours using this architecture! The best part of this is I can easily reduce that runtime even further by either increasing the number of workers or specifying a compute engine with more vCPUs to run this job.

使用这种架构,我能够将运行时间减少到6小时! 最好的部分是,我可以通过增加工作程序数量或指定具有更多vCPU的计算引擎来轻松运行,从而进一步减少运行时间。

I will be writing a follow up article about managing this data on BigQuery in a cost effective way.

我将写一篇有关以经济有效的方式在BigQuery上管理此数据的后续文章。

Thank you for reading !

谢谢您的阅读!

翻译自: https://medium.com/cars45-data-analytics/etl-with-apache-beam-load-data-from-api-to-bigquery-5fb7726866f5

 类似资料: