apache beam
This blog post builds on the ideas started in three previous blog posts.
In this blog post I’ll show how to deploy the same ML model that we deployed as a batch job in this blog post, as a task queue in this blog post, inside an AWS Lambda in this blog post, as a Kafka streaming application in this blog post, a gRPC service in this blog post, as a MapReduce job in this blog post, as a Websocket service in this blog post, and as a ZeroRPC service in this blog post.
在这篇博客文章中,我将介绍如何部署相同的ML模型,我们部署在这个批处理作业的博客文章 ,在这个任务队列的博客文章在这一点,AWS LAMBDA内的博客文章 ,为卡夫卡流应用在这个博客中 ,一个GRPC服务在这个博客中 ,在这样的MapReduce工作博客文章 ,在这个WebSocket的服务的博客文章 ,并在这个ZeroRPC服务的博客文章 。
The code in this blog post can be found in this github repo.
可以在此github repo中找到此博客文章中的代码。
介绍 (Introduction)
Data processing pipelines are useful for solving a wide range of problems. For example, an Extract, Transform, and Load (ETL) pipeline is a type of data processing pipeline that is used to extract data from one system and save it to another system. Inside of an ETL, the data may be transformed and aggregated into more useful formats. ETL jobs are useful for making the predictions made by a machine learning model available to users or to other systems. The ETL for such an ML model deployment looks like this: extract features used for prediction from a source system, send the features to the model for prediction, and save the predictions to a destination system. In this blog post we will show how to deploy a machine learning model inside of a data processing pipeline that runs on the Apache Beam framework.
数据处理管道可用于解决各种问题。 例如,提取,转换和加载(ETL)管道是一种数据处理管道,用于从一个系统中提取数据并将其保存到另一个系统。 在ETL内部,可以将数据转换并聚合为更有用的格式。 ETL作业对于使机器学习模型所做的预测对用户或其他系统可用非常有用。 用于此类ML模型部署的ETL如下所示:从源系统提取用于预测的特征,将特征发送到模型进行预测,然后将预测保存到目标系统。 在此博客文章中,我们将展示如何在运行于Apache Beam框架上的数据处理管道内部署机器学习模型。
Apache Beam is an open source framework for doing data processing. It is most useful for doing parallel data processing that can easily be split among many computers. The Beam framework is different from other data processing frameworks because it supports batch and stream processing using the same API, which allows developers to write the code one time and deploy it in two different contexts without change. An interesting feature of the Beam programming model is that once we have written the code, we can deploy into an array of different runners like Apache Spark, Apache Flink, Apache MapReduce, and others.
Apache Beam是用于数据处理的开源框架。 这对于执行并行数据处理(可以轻松地在许多计算机之间进行拆分)非常有用。 Beam框架与其他数据处理框架不同,因为它支持使用同一API进行批处理和流处理,这使开发人员可以一次编写代码并将其部署在两个不同的上下文中而无需更改。 Beam编程模型的一个有趣特征是,一旦编写了代码,便可以部署到一系列不同的运行程序中,例如Apache Spark,Apache Flink,Apache MapReduce等。
The Google Cloud Platform has a service that can run Beam pipelines. The Dataflow service allows users to run their workloads in the cloud without having to worry about managing servers and manages automated provisioning and management of processing resources for the user. In this blog post, we’ll also be deploying the machine learning pipeline to the Dataflow service to demonstrate how it works in the cloud.
Google Cloud Platform提供了可以运行Beam管道的服务。 Dataflow服务使用户可以在云中运行其工作负载,而不必担心管理服务器以及为用户管理自动置备和处理资源的管理。 在此博客文章中,我们还将将机器学习管道部署到Dataflow服务,以演示其在云中的工作方式。
建筑梁工作 (Building Beam Jobs)
A Beam job is defined as a driver process that uses the Beam SDK to state the data processing steps that the Beam job does. The Beam SDK can be used from Python, Java, or Go processes. The driver process defines a data processing pipeline of components which are executed in the right order to load data, process it, and store the results. The driver program also accepts execution options that can be set to modify the behavior of the pipeline. In our example, we will be loading data from an LDJSON file, sending it to a model to make predictions, and storing the results in an LDJSON file.
Beam作业定义为使用Beam SDK声明Beam作业执行的数据处理步骤的驱动程序进程。 Beam SDK可以在Python,Java或Go进程中使用。 驱动程序进程定义了组件的数据处理管道,这些组件按正确的顺序执行以加载数据,处理数据并存储结果。 驱动程序还接受可以设置的执行选项,以修改管道的行为。 在我们的示例中,我们将从LDJSON文件加载数据,将其发送到模型进行预测,然后将结果存储在LDJSON文件中。
The Beam programming model works by defining a PCollection, which is a collection of data records that need to be processed. A PCollection is a data structure that is created at the beginning of the execution of the pipeline, and is received and processed by each step in a Beam pipeline. Each step in the pipeline that modifies the contents of the PCollection is called a PTransform. For this blog post we will create a PTransform component that takes a PCollection, makes predictions with it, and returns a PCollection with the prediction results. We will combine this PTransform with other components to build a data processing pipeline.
Beam编程模型通过定义PCollection来工作,PCollection是需要处理的数据记录的集合。 PCollection是在管道执行开始时创建的数据结构,并由Beam管道中的每个步骤接收和处理。 管道中修改PCollection内容的每个步骤称为PTransform。 对于此博客文章,我们将创建一个PTransform组件,该组件接受PCollection并对其进行预测,然后返回包含预测结果的PCollection。 我们将将此PTransform与其他组件结合起来以构建数据处理管道。
包装结构 (Package Structure)
The code used in this blog post is hosted in this Github repository. The codebase is structured like this:
此博客文章中使用的代码托管在此Github存储库中。 代码库的结构如下:
- data ( data for testing job)
- model_beam_job (python package for apache beam package)
- __init__.py
- main.py (pipeline definition and launcher)
- ml_model_operator.py (prediction step)
- tests ( unit tests )
- Makefile
- README.md
- requirements.txt
- setup.py
- test_requirements.txt
安装模型 (Installing the Model)
As in previous blog posts, we’ll be deploying a model that is packaged separately from the deployment codebase. This approach allows us to deploy the same model in many different systems and contexts. To install the model package, we’ll install the model into the virtual environment. The model package can be installed from a git repository with this command:
与以前的博客文章一样,我们将部署一个与部署代码库分开打包的模型。 这种方法使我们可以在许多不同的系统和上下文中部署相同的模型。 要安装模型包,我们将模型安装到虚拟环境中。 可以使用以下命令从git存储库安装模型包:
pip install git+https://github.com/schmidtbri/ml-model-abc-improvements
Now that we have the model installed in the environment, we can try it out by opening a python interpreter and entering this code:
现在我们已经在环境中安装了模型,我们可以通过打开python解释器并输入以下代码来进行尝试:
>>> from iris_model.iris_predict import IrisModel
>>> model = IrisModel()
>>> model.predict({“sepal_length”:1.1, “sepal_width”: 1.2, “petal_width”: 1.3, “petal_length”: 1.4})
{‘species’: ‘setosa’}
The IrisModel class implements the prediction logic of the iris_model package. This class is a subtype of the MLModel class, which ensures that a standard interface is followed. The MLModel interface allows us to deploy any model we want into the Beam job, as long as it implements the required interface. More details about this approach to deploying machine learning models can be found in the first three blog posts in this series.
IrisModel类实现了iris_model包的预测逻辑。 此类是MLModel类的子类型,该类确保遵循标准接口。 MLModel接口允许我们将所需的任何模型部署到Beam作业中,只要它实现所需的接口即可。 可以在本系列的前三篇 博客文章 中找到有关部署机器学习模型的方法的更多详细信息。
MLModelPredictOperation类 (MLModelPredictOperation Class)
The first thing we’ll do is create a PTransform class for the code that receives records from the Beam framework and makes predictions with the MLModel class. This is the class:
我们要做的第一件事是为代码创建一个PTransform类,该类从Beam框架接收记录,并使用MLModel类进行预测。 这是课程:
class MLModelPredictOperation(beam.DoFn):
The code above can be found here.
上面的代码可以在这里找到。
The class we’ll be working with is called MLModelPredictOperation and it is a subtype of the DoFn class that is part of the Beam framework. The DoFn class defines a method which will be applied to each record in the PCollection. To initialize the object with the right model, we’ll add an __init__ method:
我们将使用的类称为MLModelPredictOperation,它是DoFn类的子类型,它是Beam框架的一部分。 DoFn类定义一个方法,该方法将应用于PCollection中的每个记录。 要使用正确的模型初始化对象,我们将添加__init__方法:
def __init__(self, module_name, class_name):
beam.DoFn.__init__(self)
model_module = importlib.import_module(module_name)
model_class = getattr(model_module, class_name)
model_object = model_class()
if issubclass(type(model_object), MLModel) is None:
raise ValueError(“The model object is not a subclass of MLModel.”)
self._model = model_object
The code above can be found here.
上面的代码可以在这里找到。
We’ll start by calling the __init__ method of the DoFn super class, this initializes the super class. We then find and load the python module that contains the MLModel class that contains the prediction code, get a reference to the class, and instantiate the MLModel class into an object. Now that we have an instantiated model object, we check the type of the object to make sure that it is a subtype of MLModel. If it is a subtype, we store a reference to it.
我们将从调用DoFn超类的__init__方法开始,这将初始化超类。 然后,我们找到并加载包含MLModel类的python模块,该模块包含预测代码,获取对该类的引用,并将MLModel类实例化为一个对象。 现在我们有了实例化的模型对象,我们检查对象的类型以确保它是MLModel的子类型。 如果它是子类型,我们将存储对其的引用。
Now that we have an initialized DoFn object with a model object inside of it, we need to actually do the prediction:
现在我们有了一个内部带有模型对象的已初始化DoFn对象,我们需要实际进行预测:
def process(self, data, **kwargs):
yield self._model.predict(data=data)
The code above can be found here.
上面的代码可以在这里找到。
The prediction is very simple, we take the record and pass it directly to the model, and yield the result of the prediction. To make sure that this code will work inside of a Beam pipeline, we need to make sure that the pipeline feeds a PCollection of dictionaries to the DoFn object. When we create the pipeline, we’ll make sure that this is the case.
预测非常简单,我们获取记录并将其直接传递给模型,然后得出预测结果。 为确保此代码在Beam管道内运行,我们需要确保该管道将字典的PCollection馈送到DoFn对象。 创建管道时,请确保是这种情况。
创建管道 (Creating the Pipeline)
Now that we have a class that can make a prediction with the model, we need to build a simple pipeline around it that can load data, send it to the model, and save the resulting predictions.
现在,我们有了一个可以对模型进行预测的类,我们需要围绕它构建一个简单的管道,该管道可以加载数据,将其发送到模型并保存结果预测。
The creation of the Beam pipeline is done in the run function in the main.py module:
Beam管道的创建是在main.py模块的run函数中完成的:
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(‘ — input’, dest=’input’, help=’Input file to process.’)
parser.add_argument(‘ — output’, dest=’output’, required=True, help=’Output file to write results to.’)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
The code above can be found here.
上面的代码可以在这里找到。
The pipeline options is an object that is given to the Beam job to modify the way that it runs. The parameters loaded from a command line parser are fed directly to the PipelineOptions object. Two parameters are loaded in the command line parser: the location of the input files, and the location where the output of the job will be stored.
管道选项是给Beam作业修改其运行方式的对象。 从命令行解析器加载的参数直接馈送到PipelineOptions对象。 命令行解析器中加载了两个参数:输入文件的位置以及作业输出的存储位置。
When we are done loading the pipeline options, we can arrange the steps that make up the pipeline:
在完成管道选项的加载后,我们可以安排构成管道的步骤:
with beam.Pipeline(options=pipeline_options) as p:
(p
| ‘read_input’ >> ReadFromText(known_args.input, coder=JsonCoder())
| ‘apply_model’ >> beam.ParDo(MLModelPredictOperation(module_name=”iris_model.iris_predict”, class_name=”IrisModel”))
| ‘write_output’ >> WriteToText(known_args.output, coder=JsonCoder())
)
The code above can be found here.
上面的代码可以在这里找到。
The pipeline object is created by providing it with the PipelineOptions object that we created above. The pipeline is made up of three steps: a step that loads data from an LDJSON file and creates a PCollection from it, a step that makes predictions with that PCollection, and a step that saves the resulting predictions as an LDJSON file. The input and output steps use a class called JsonCoder, which takes care of serializing and deserializing the data in the LDJSON files.
通过为管道对象提供上面创建的PipelineOptions对象来创建管道对象。 管道由三个步骤组成:从LDJSON文件加载数据并从中创建PCollection的步骤,使用该PCollection进行预测的步骤,以及将生成的预测保存为LDJSON文件的步骤。 输入和输出步骤使用一个名为JsonCoder的类,该类负责序列化和反序列化LDJSON文件中的数据。
Now that we have a configured pipeline, we can run it:
现在我们已经配置了管道,我们可以运行它:
result = p.run()
result.wait_until_finish()
The code above can be found here.
上面的代码可以在这里找到。
The main.py module is responsible for arranging the steps of the pipeline, receiving parameters, and running the Beam job. This script will be used to run the job locally and in the cloud.
main.py模块负责安排管道的步骤,接收参数以及运行Beam作业。 该脚本将用于在本地和云中运行作业。
在本地测试作业 (Testing the Job Locally)
We can test the job locally by running with the python interpreter:
我们可以通过运行python解释器在本地测试作业:
export PYTHONPATH=./
python -m model_beam_job.main — input data/input.json — output data/output.json
The job takes as input the “input.json” file in the data folder, and produces a file called “output.json” to the same folder.
该作业将data文件夹中的“ input.json”文件作为输入,并在同一文件夹中生成一个名为“ output.json”的文件。
部署到Google Cloud (Deploying to Google Cloud)
The next thing we’ll do is run the same job that we ran locally in the Google Cloud Dataflow service. The Dataflow service is an offering in the Google Cloud suite of services that can do scalable data processing for batch and streaming jobs. The Dataflow service runs Beam jobs exclusively and manages the job, handling resource management and performance optimization.
接下来,我们将执行在Google Cloud Dataflow服务中本地运行的相同工作。 Dataflow服务是Google Cloud服务套件中的一项产品,可以对批处理和流式作业进行可伸缩的数据处理。 Dataflow服务专门运行Beam作业,并管理该作业,处理资源管理和性能优化。
To run the model Beam job in the cloud, we’ll need to create a project. In the Cloud Console, in the project selector page click on “Create Cloud Project”, then create a project for your solution. The newly created project should be the currently selected project, then any resources that we create next will be held in the project. In order to use the GCP Dataflow service, we’ll need to have billing enabled for the project. To make sure that billing is working, follow these steps.
要在云中运行模型Beam作业,我们需要创建一个项目。 在Cloud Console中,在项目选择器页面中,单击“创建Cloud Project”,然后为您的解决方案创建一个项目。 新创建的项目应该是当前选择的项目,然后我们接下来创建的任何资源都将保留在该项目中。 为了使用GCP数据流服务,我们需要为该项目启用结算功能。 为确保计费正常,请按照以下步骤操作 。
To be able to create the Dataflow job, we’ll need to have access to the Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs from your new project. To enable access to these APIs, follow this link, then select your new project and click the “Continue” button.
为了能够创建数据流作业,我们需要从以下位置访问Cloud Dataflow,Compute Engine,Stackdriver Logging,Cloud Storage,Cloud Storage JSON,BigQuery,Cloud Pub / Sub,Cloud Datastore和Cloud Resource Manager API您的新项目。 要启用对这些API的访问,请点击此链接 ,然后选择新项目并单击“继续”按钮。
Next, we’ll create a service account for our project. In the Cloud Console, go to the Create service account key page. From the Service account list, select “New service account”. In the Service account name field, enter a name. From the Role list, select Project -> Owner and click on the “Create” button. A JSON file will be created and downloaded to your computer, copy this file to the root of the project directory. To use the file in the project, open a command shell and set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the full path to the JSON file that you placed in the project root. The command will look like this:
接下来,我们将为我们的项目创建一个服务帐户。 在Cloud Console中,转到创建服务帐户密钥页面 。 从服务帐户列表中,选择“新服务帐户”。 在服务帐户名称字段中,输入名称。 从角色列表中,选择项目->所有者,然后单击“创建”按钮。 将创建一个JSON文件并将其下载到您的计算机,将该文件复制到项目目录的根目录。 要在项目中使用该文件,请打开命令外壳,然后将GOOGLE_APPLICATION_CREDENTIALS环境变量设置为放置在项目根目录中的JSON文件的完整路径。 该命令将如下所示:
export GOOGLE_APPLICATION_CREDENTIALS=/Users/…/apache-beam-ml-model-deployment/model-beam-job-a7c5c1d9c22c.json
To store the file we will be processing, we need to create a storage bucket in the Google Cloud Storage service. To do this, go to the bucket browser page, click on the “Create Bucket” button, and fill in the details to create a bucket. Now we can upload our test data to a bucket so that it can be processed by the job. To upload the test data click on the “Upload Files” button in the bucket details page and select the input.json file in the data directory of the project.
要存储我们将要处理的文件,我们需要在Google Cloud Storage服务中创建一个存储桶。 为此,请转到存储桶浏览器页面 ,单击“创建存储桶”按钮,然后填写详细信息以创建存储桶。 现在,我们可以将测试数据上传到存储桶中,以便作业可以对其进行处理。 要上传测试数据,请在存储桶详细信息页面中单击“上传文件”按钮,然后在项目的数据目录中选择input.json文件 。
Next, we need to create a tar.gz file that contains the model package that will be run by the Beam job. This package is special because it cannot be installed from the public Pypi repository, so it must be uploaded along with the Beam job to the Dataflow job. To create the tar.gz file, we created a target in the project Makefile called “build-dependencies”. When executed, the target downloads the code for the iris_model package, builds a tar.gz.distribution file, and leaves in the “dependencies” directory.
接下来,我们需要创建一个tar.gz文件,其中包含将由Beam作业运行的模型包。 该软件包非常特殊,因为它无法从公共Pypi存储库中安装,因此必须将其与Beam作业一起上传到Dataflow作业。 为了创建tar.gz文件,我们在项目Makefile中创建了一个名为“ build-dependencies”的目标。 执行后,目标将下载iris_model软件包的代码,构建一个tar.gz.distribution文件,并保留在“ dependencies”目录中。
We’re finally ready to send the job to be executed in the Dataflow service. To do this, execute this command:
我们终于准备好发送要在Dataflow服务中执行的作业。 为此,请执行以下命令:
python -m model_beam_job.main — region us-east1 \
-—input gs://model-beam-job/input.json \
-—output gs://model-beam-job/results/outputs \
-—runner DataflowRunner \
-—machine_type n1-standard-4 \
-—project model-beam-job-294711 \
-—temp_location gs://model-beam-job/tmp/ \
-—extra_package dependencies/iris_model-0.1.0.tar.gz \
-—setup_file ./setup.py
The job is sent by executing the same python scripts that we used to test the job locally, but we’ve added more command line options. The input and output options work the same as in the local execution of the job, but now they point to locations in the Google Cloud Storage bucket. The runner option tells the Beam framework that we want to use the Dataflow runner. The machine_type option tells the Dataflow service that we want to use that specific machine type when running the job. The project option points to the Google Cloud project we created above. The temp_location option tells the Dataflow service that we want to store temporary files in the same Google Cloud Storage bucket that we are using for the input and output. The extra_package option points to the iris_model distribution tar.gz file that we created above, this file will be sent to the Dataflow service along with the job code. Lastly, the setup_file option points at the setup.py file of the model_beam_job package itself, this allows the command to package up any code files that the job depends on.
通过执行与本地测试作业相同的python脚本来发送作业,但是我们添加了更多命令行选项。 输入和输出选项的工作方式与作业的本地执行相同,但现在它们指向Google Cloud Storage存储桶中的位置。 运行器选项告诉Beam框架我们要使用数据流运行器。 machine_type选项告诉Dataflow服务我们要在运行作业时使用该特定机器类型。 该项目选项指向我们上面创建的Google Cloud项目。 temp_location选项告诉Dataflow服务我们要将临时文件存储在我们用于输入和输出的同一Google Cloud Storage存储桶中。 extra_package选项指向我们在上面创建的iris_model发行版tar.gz文件,该文件将与作业代码一起发送到Dataflow服务。 最后,setup_file选项指向model_beam_job程序包本身的setup.py文件,这使命令可以打包作业所依赖的所有代码文件。
Once we execute the command, the job will be started in the cloud. As the job runs it will output a link to a webpage that can be used to monitor the progress of the job. Once the job completes, the results will be in the Google Cloud Storage bucket that we created above.
一旦执行了命令,该作业将在云中启动。 在作业运行时,它将输出指向网页的链接,该链接可用于监视作业的进度。 作业完成后,结果将保存在我们上面创建的Google Cloud Storage存储桶中。
闭幕 (Closing)
By using the Beam framework, we are able to easily deploy a machine learning prediction job to the cloud. Because of the simple design of the Beam framework, a lot of the complexities of running a job on many computers are abstracted out. Furthermore, we are able to leverage all of the features of the Beam framework for advanced data processing.
通过使用Beam框架,我们能够轻松地将机器学习预测作业部署到云中。 由于Beam框架的简单设计,抽象出了在许多计算机上运行作业的许多复杂性。 此外,我们能够利用Beam框架的所有功能进行高级数据处理。
One of the important features of this codebase is the fact that it can accept any machine learning model that implements the MLModel interface. By installing another model package and importing the class that inherits from the MLModel base class, we can easily deploy any number of models in the same Beam job without changing the code. However, we do need to change the pipeline definition to change or add models to it. Once again, the MLModel interface allowed us to abstract out the building a machine learning model from the complexity of deploying a machine learning model.
该代码库的重要功能之一是它可以接受任何实现MLModel接口的机器学习模型。 通过安装另一个模型包并导入从MLModel基类继承的类,我们可以轻松地在同一Beam作业中部署任意数量的模型,而无需更改代码。 但是,我们确实需要更改管道定义以更改或添加模型。 MLModel接口再次使我们能够从部署机器学习模型的复杂性中抽象出构建机器学习模型的过程。
One thing that we can improve about the code is the fact that the job only accepts files encoded as LDJSON. We did this to make the code easy to understand, but we can easily add other options for the format of the input data making the pipeline more flexible and easier to use.
关于代码,我们可以改进的一件事是该作业仅接受编码为LDJSON的文件。 我们这样做是为了使代码易于理解,但是我们可以轻松地为输入数据的格式添加其他选项,从而使管道更加灵活和易于使用。
翻译自: https://medium.com/@brianschmidt_78145/an-apache-beam-ml-model-deployment-ac31c6f2d9b2
apache beam