apache beam_Apache Beam ML模型部署
apache beam
This blog post builds on the ideas started in three previous blog posts.
這篇博客文章基于之前 三篇 博客文章中開始的想法。
In this blog post I’ll show how to deploy the same ML model that we deployed as a batch job in this blog post, as a task queue in this blog post, inside an AWS Lambda in this blog post, as a Kafka streaming application in this blog post, a gRPC service in this blog post, as a MapReduce job in this blog post, as a Websocket service in this blog post, and as a ZeroRPC service in this blog post.
在這篇博客文章中,我將介紹如何部署相同的ML模型,我們部署在這個批處理作業的博客文章 ,在這個任務隊列的博客文章在這一點,AWS LAMBDA內的博客文章 ,為卡夫卡流應用在這個博客中 ,一個GRPC服務在這個博客中 ,在這樣的MapReduce工作博客文章 ,在這個WebSocket的服務的博客文章 ,并在這個ZeroRPC服務的博客文章 。
The code in this blog post can be found in this github repo.
可以在此github repo中找到此博客文章中的代碼。
介紹 (Introduction)
Data processing pipelines are useful for solving a wide range of problems. For example, an Extract, Transform, and Load (ETL) pipeline is a type of data processing pipeline that is used to extract data from one system and save it to another system. Inside of an ETL, the data may be transformed and aggregated into more useful formats. ETL jobs are useful for making the predictions made by a machine learning model available to users or to other systems. The ETL for such an ML model deployment looks like this: extract features used for prediction from a source system, send the features to the model for prediction, and save the predictions to a destination system. In this blog post we will show how to deploy a machine learning model inside of a data processing pipeline that runs on the Apache Beam framework.
數據處理管道可用于解決各種問題。 例如,提取,轉換和加載(ETL)管道是一種數據處理管道,用于從一個系統中提取數據并將其保存到另一個系統。 在ETL內部,可以將數據轉換并聚合為更有用的格式。 ETL作業對于使機器學習模型所做的預測對用戶或其他系統可用非常有用。 用于此類ML模型部署的ETL如下所示:從源系統提取用于預測的特征,將特征發送到模型進行預測,然后將預測保存到目標系統。 在此博客文章中,我們將展示如何在運行于Apache Beam框架上的數據處理管道內部署機器學習模型。
Apache Beam is an open source framework for doing data processing. It is most useful for doing parallel data processing that can easily be split among many computers. The Beam framework is different from other data processing frameworks because it supports batch and stream processing using the same API, which allows developers to write the code one time and deploy it in two different contexts without change. An interesting feature of the Beam programming model is that once we have written the code, we can deploy into an array of different runners like Apache Spark, Apache Flink, Apache MapReduce, and others.
Apache Beam是用于數據處理的開源框架。 這對于執行并行數據處理(可以輕松地在許多計算機之間進行拆分)非常有用。 Beam框架與其他數據處理框架不同,因為它支持使用同一API進行批處理和流處理,這使開發人員可以一次編寫代碼并將其部署在兩個不同的上下文中而無需更改。 Beam編程模型的一個有趣特征是,一旦編寫了代碼,便可以部署到一系列不同的運行程序中,例如Apache Spark,Apache Flink,Apache MapReduce等。
The Google Cloud Platform has a service that can run Beam pipelines. The Dataflow service allows users to run their workloads in the cloud without having to worry about managing servers and manages automated provisioning and management of processing resources for the user. In this blog post, we’ll also be deploying the machine learning pipeline to the Dataflow service to demonstrate how it works in the cloud.
Google Cloud Platform提供了可以運行Beam管道的服務。 Dataflow服務使用戶可以在云中運行其工作負載,而不必擔心管理服務器以及為用戶管理自動置備和處理資源的管理。 在此博客文章中,我們還將將機器學習管道部署到Dataflow服務,以演示其在云中的工作方式。
建筑梁工作 (Building Beam Jobs)
A Beam job is defined as a driver process that uses the Beam SDK to state the data processing steps that the Beam job does. The Beam SDK can be used from Python, Java, or Go processes. The driver process defines a data processing pipeline of components which are executed in the right order to load data, process it, and store the results. The driver program also accepts execution options that can be set to modify the behavior of the pipeline. In our example, we will be loading data from an LDJSON file, sending it to a model to make predictions, and storing the results in an LDJSON file.
Beam作業定義為使用Beam SDK聲明Beam作業執行的數據處理步驟的驅動程序進程。 Beam SDK可以在Python,Java或Go進程中使用。 驅動程序進程定義了組件的數據處理管道,這些組件按正確的順序執行以加載數據,處理數據并存儲結果。 驅動程序還接受可以設置的執行選項,以修改管道的行為。 在我們的示例中,我們將從LDJSON文件加載數據,將其發送到模型進行預測,然后將結果存儲在LDJSON文件中。
The Beam programming model works by defining a PCollection, which is a collection of data records that need to be processed. A PCollection is a data structure that is created at the beginning of the execution of the pipeline, and is received and processed by each step in a Beam pipeline. Each step in the pipeline that modifies the contents of the PCollection is called a PTransform. For this blog post we will create a PTransform component that takes a PCollection, makes predictions with it, and returns a PCollection with the prediction results. We will combine this PTransform with other components to build a data processing pipeline.
Beam編程模型通過定義PCollection來工作,PCollection是需要處理的數據記錄的集合。 PCollection是在管道執行開始時創建的數據結構,并由Beam管道中的每個步驟接收和處理。 管道中修改PCollection內容的每個步驟稱為PTransform。 對于此博客文章,我們將創建一個PTransform組件,該組件接受PCollection并對其進行預測,然后返回包含預測結果的PCollection。 我們將將此PTransform與其他組件結合起來以構建數據處理管道。
包裝結構 (Package Structure)
The code used in this blog post is hosted in this Github repository. The codebase is structured like this:
此博客文章中使用的代碼托管在此Github存儲庫中。 代碼庫的結構如下:
- data ( data for testing job)- model_beam_job (python package for apache beam package)
- __init__.py
- main.py (pipeline definition and launcher)
- ml_model_operator.py (prediction step)
- tests ( unit tests )
- Makefile
- README.md
- requirements.txt
- setup.py
- test_requirements.txt
安裝模型 (Installing the Model)
As in previous blog posts, we’ll be deploying a model that is packaged separately from the deployment codebase. This approach allows us to deploy the same model in many different systems and contexts. To install the model package, we’ll install the model into the virtual environment. The model package can be installed from a git repository with this command:
與以前的博客文章一樣,我們將部署一個與部署代碼庫分開打包的模型。 這種方法使我們可以在許多不同的系統和上下文中部署相同的模型。 要安裝模型包,我們將模型安裝到虛擬環境中。 可以使用以下命令從git存儲庫安裝模型包:
pip install git+https://github.com/schmidtbri/ml-model-abc-improvementsNow that we have the model installed in the environment, we can try it out by opening a python interpreter and entering this code:
現在我們已經在環境中安裝了模型,我們可以通過打開python解釋器并輸入以下代碼來進行嘗試:
>>> from iris_model.iris_predict import IrisModel>>> model = IrisModel()
>>> model.predict({“sepal_length”:1.1, “sepal_width”: 1.2, “petal_width”: 1.3, “petal_length”: 1.4})
{‘species’: ‘setosa’}
The IrisModel class implements the prediction logic of the iris_model package. This class is a subtype of the MLModel class, which ensures that a standard interface is followed. The MLModel interface allows us to deploy any model we want into the Beam job, as long as it implements the required interface. More details about this approach to deploying machine learning models can be found in the first three blog posts in this series.
IrisModel類實現了iris_model包的預測邏輯。 此類是MLModel類的子類型,該類確保遵循標準接口。 MLModel接口允許我們將所需的任何模型部署到Beam作業中,只要它實現所需的接口即可。 可以在本系列的前三篇 博客文章 中找到有關部署機器學習模型的方法的更多詳細信息。
MLModelPredictOperation類 (MLModelPredictOperation Class)
The first thing we’ll do is create a PTransform class for the code that receives records from the Beam framework and makes predictions with the MLModel class. This is the class:
我們要做的第一件事是為代碼創建一個PTransform類,該類從Beam框架接收記錄,并使用MLModel類進行預測。 這是課程:
class MLModelPredictOperation(beam.DoFn):The code above can be found here.
上面的代碼可以在這里找到。
The class we’ll be working with is called MLModelPredictOperation and it is a subtype of the DoFn class that is part of the Beam framework. The DoFn class defines a method which will be applied to each record in the PCollection. To initialize the object with the right model, we’ll add an __init__ method:
我們將使用的類稱為MLModelPredictOperation,它是DoFn類的子類型,它是Beam框架的一部分。 DoFn類定義一個方法,該方法將應用于PCollection中的每個記錄。 要使用正確的模型初始化對象,我們將添加__init__方法:
def __init__(self, module_name, class_name):beam.DoFn.__init__(self)
model_module = importlib.import_module(module_name)
model_class = getattr(model_module, class_name)
model_object = model_class()
if issubclass(type(model_object), MLModel) is None:
raise ValueError(“The model object is not a subclass of MLModel.”)
self._model = model_object
The code above can be found here.
上面的代碼可以在這里找到。
We’ll start by calling the __init__ method of the DoFn super class, this initializes the super class. We then find and load the python module that contains the MLModel class that contains the prediction code, get a reference to the class, and instantiate the MLModel class into an object. Now that we have an instantiated model object, we check the type of the object to make sure that it is a subtype of MLModel. If it is a subtype, we store a reference to it.
我們將從調用DoFn超類的__init__方法開始,這將初始化超類。 然后,我們找到并加載包含MLModel類的python模塊,該模塊包含預測代碼,獲取對該類的引用,并將MLModel類實例化為一個對象。 現在我們有了實例化的模型對象,我們檢查對象的類型以確保它是MLModel的子類型。 如果它是子類型,我們將存儲對其的引用。
Now that we have an initialized DoFn object with a model object inside of it, we need to actually do the prediction:
現在我們有了一個內部帶有模型對象的已初始化DoFn對象,我們需要實際進行預測:
def process(self, data, **kwargs):yield self._model.predict(data=data)
The code above can be found here.
上面的代碼可以在這里找到。
The prediction is very simple, we take the record and pass it directly to the model, and yield the result of the prediction. To make sure that this code will work inside of a Beam pipeline, we need to make sure that the pipeline feeds a PCollection of dictionaries to the DoFn object. When we create the pipeline, we’ll make sure that this is the case.
預測非常簡單,我們獲取記錄并將其直接傳遞給模型,然后得出預測結果。 為確保此代碼在Beam管道內運行,我們需要確保該管道將字典的PCollection饋送到DoFn對象。 創建管道時,請確保是這種情況。
創建管道 (Creating the Pipeline)
Now that we have a class that can make a prediction with the model, we need to build a simple pipeline around it that can load data, send it to the model, and save the resulting predictions.
現在,我們有了一個可以對模型進行預測的類,我們需要圍繞它構建一個簡單的管道,該管道可以加載數據,將其發送到模型并保存結果預測。
The creation of the Beam pipeline is done in the run function in the main.py module:
Beam管道的創建是在main.py模塊的run函數中完成的:
def run(argv=None):parser = argparse.ArgumentParser()
parser.add_argument(‘ — input’, dest=’input’, help=’Input file to process.’)
parser.add_argument(‘ — output’, dest=’output’, required=True, help=’Output file to write results to.’)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
The code above can be found here.
上面的代碼可以在這里找到。
The pipeline options is an object that is given to the Beam job to modify the way that it runs. The parameters loaded from a command line parser are fed directly to the PipelineOptions object. Two parameters are loaded in the command line parser: the location of the input files, and the location where the output of the job will be stored.
管道選項是給Beam作業修改其運行方式的對象。 從命令行解析器加載的參數直接饋送到PipelineOptions對象。 命令行解析器中加載了兩個參數:輸入文件的位置以及作業輸出的存儲位置。
When we are done loading the pipeline options, we can arrange the steps that make up the pipeline:
在完成管道選項的加載后,我們可以安排構成管道的步驟:
with beam.Pipeline(options=pipeline_options) as p:(p
| ‘read_input’ >> ReadFromText(known_args.input, coder=JsonCoder())
| ‘apply_model’ >> beam.ParDo(MLModelPredictOperation(module_name=”iris_model.iris_predict”, class_name=”IrisModel”))
| ‘write_output’ >> WriteToText(known_args.output, coder=JsonCoder())
)
The code above can be found here.
上面的代碼可以在這里找到。
The pipeline object is created by providing it with the PipelineOptions object that we created above. The pipeline is made up of three steps: a step that loads data from an LDJSON file and creates a PCollection from it, a step that makes predictions with that PCollection, and a step that saves the resulting predictions as an LDJSON file. The input and output steps use a class called JsonCoder, which takes care of serializing and deserializing the data in the LDJSON files.
通過為管道對象提供上面創建的PipelineOptions對象來創建管道對象。 管道由三個步驟組成:從LDJSON文件加載數據并從中創建PCollection的步驟,使用該PCollection進行預測的步驟,以及將生成的預測保存為LDJSON文件的步驟。 輸入和輸出步驟使用一個名為JsonCoder的類,該類負責序列化和反序列化LDJSON文件中的數據。
Now that we have a configured pipeline, we can run it:
現在我們已經配置了管道,我們可以運行它:
result = p.run()result.wait_until_finish()
The code above can be found here.
上面的代碼可以在這里找到。
The main.py module is responsible for arranging the steps of the pipeline, receiving parameters, and running the Beam job. This script will be used to run the job locally and in the cloud.
main.py模塊負責安排管道的步驟,接收參數以及運行Beam作業。 該腳本將用于在本地和云中運行作業。
在本地測試作業 (Testing the Job Locally)
We can test the job locally by running with the python interpreter:
我們可以通過運行python解釋器在本地測試作業:
export PYTHONPATH=./python -m model_beam_job.main — input data/input.json — output data/output.json
The job takes as input the “input.json” file in the data folder, and produces a file called “output.json” to the same folder.
該作業將data文件夾中的“ input.json”文件作為輸入,并在同一文件夾中生成一個名為“ output.json”的文件。
部署到Google Cloud (Deploying to Google Cloud)
The next thing we’ll do is run the same job that we ran locally in the Google Cloud Dataflow service. The Dataflow service is an offering in the Google Cloud suite of services that can do scalable data processing for batch and streaming jobs. The Dataflow service runs Beam jobs exclusively and manages the job, handling resource management and performance optimization.
接下來,我們將執行在Google Cloud Dataflow服務中本地運行的相同工作。 Dataflow服務是Google Cloud服務套件中的一項產品,可以對批處理和流式作業進行可伸縮的數據處理。 Dataflow服務專門運行Beam作業,并管理該作業,處理資源管理和性能優化。
To run the model Beam job in the cloud, we’ll need to create a project. In the Cloud Console, in the project selector page click on “Create Cloud Project”, then create a project for your solution. The newly created project should be the currently selected project, then any resources that we create next will be held in the project. In order to use the GCP Dataflow service, we’ll need to have billing enabled for the project. To make sure that billing is working, follow these steps.
要在云中運行模型Beam作業,我們需要創建一個項目。 在Cloud Console中,在項目選擇器頁面中,單擊“創建Cloud Project”,然后為您的解決方案創建一個項目。 新創建的項目應該是當前選擇的項目,然后我們接下來創建的任何資源都將保留在該項目中。 為了使用GCP數據流服務,我們需要為該項目啟用結算功能。 為確保計費正常,請按照以下步驟操作 。
To be able to create the Dataflow job, we’ll need to have access to the Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs from your new project. To enable access to these APIs, follow this link, then select your new project and click the “Continue” button.
為了能夠創建數據流作業,我們需要從以下位置訪問Cloud Dataflow,Compute Engine,Stackdriver Logging,Cloud Storage,Cloud Storage JSON,BigQuery,Cloud Pub / Sub,Cloud Datastore和Cloud Resource Manager API您的新項目。 要啟用對這些API的訪問,請點擊此鏈接 ,然后選擇新項目并單擊“繼續”按鈕。
Next, we’ll create a service account for our project. In the Cloud Console, go to the Create service account key page. From the Service account list, select “New service account”. In the Service account name field, enter a name. From the Role list, select Project -> Owner and click on the “Create” button. A JSON file will be created and downloaded to your computer, copy this file to the root of the project directory. To use the file in the project, open a command shell and set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the full path to the JSON file that you placed in the project root. The command will look like this:
接下來,我們將為我們的項目創建一個服務帳戶。 在Cloud Console中,轉到創建服務帳戶密鑰頁面 。 從服務帳戶列表中,選擇“新服務帳戶”。 在服務帳戶名稱字段中,輸入名稱。 從角色列表中,選擇項目->所有者,然后單擊“創建”按鈕。 將創建一個JSON文件并將其下載到您的計算機,將該文件復制到項目目錄的根目錄。 要在項目中使用該文件,請打開命令外殼,然后將GOOGLE_APPLICATION_CREDENTIALS環境變量設置為放置在項目根目錄中的JSON文件的完整路徑。 該命令將如下所示:
export GOOGLE_APPLICATION_CREDENTIALS=/Users/…/apache-beam-ml-model-deployment/model-beam-job-a7c5c1d9c22c.jsonTo store the file we will be processing, we need to create a storage bucket in the Google Cloud Storage service. To do this, go to the bucket browser page, click on the “Create Bucket” button, and fill in the details to create a bucket. Now we can upload our test data to a bucket so that it can be processed by the job. To upload the test data click on the “Upload Files” button in the bucket details page and select the input.json file in the data directory of the project.
要存儲我們將要處理的文件,我們需要在Google Cloud Storage服務中創建一個存儲桶。 為此,請轉到存儲桶瀏覽器頁面 ,單擊“創建存儲桶”按鈕,然后填寫詳細信息以創建存儲桶。 現在,我們可以將測試數據上傳到存儲桶中,以便作業可以對其進行處理。 要上傳測試數據,請在存儲桶詳細信息頁面中單擊“上傳文件”按鈕,然后在項目的數據目錄中選擇input.json文件 。
Next, we need to create a tar.gz file that contains the model package that will be run by the Beam job. This package is special because it cannot be installed from the public Pypi repository, so it must be uploaded along with the Beam job to the Dataflow job. To create the tar.gz file, we created a target in the project Makefile called “build-dependencies”. When executed, the target downloads the code for the iris_model package, builds a tar.gz.distribution file, and leaves in the “dependencies” directory.
接下來,我們需要創建一個tar.gz文件,其中包含將由Beam作業運行的模型包。 該軟件包非常特殊,因為它無法從公共Pypi存儲庫中安裝,因此必須將其與Beam作業一起上傳到Dataflow作業。 為了創建tar.gz文件,我們在項目Makefile中創建了一個名為“ build-dependencies”的目標。 執行后,目標將下載iris_model軟件包的代碼,構建一個tar.gz.distribution文件,并保留在“ dependencies”目錄中。
We’re finally ready to send the job to be executed in the Dataflow service. To do this, execute this command:
我們終于準備好發送要在Dataflow服務中執行的作業。 為此,請執行以下命令:
python -m model_beam_job.main — region us-east1 \-—input gs://model-beam-job/input.json \
-—output gs://model-beam-job/results/outputs \
-—runner DataflowRunner \
-—machine_type n1-standard-4 \
-—project model-beam-job-294711 \
-—temp_location gs://model-beam-job/tmp/ \
-—extra_package dependencies/iris_model-0.1.0.tar.gz \
-—setup_file ./setup.py
The job is sent by executing the same python scripts that we used to test the job locally, but we’ve added more command line options. The input and output options work the same as in the local execution of the job, but now they point to locations in the Google Cloud Storage bucket. The runner option tells the Beam framework that we want to use the Dataflow runner. The machine_type option tells the Dataflow service that we want to use that specific machine type when running the job. The project option points to the Google Cloud project we created above. The temp_location option tells the Dataflow service that we want to store temporary files in the same Google Cloud Storage bucket that we are using for the input and output. The extra_package option points to the iris_model distribution tar.gz file that we created above, this file will be sent to the Dataflow service along with the job code. Lastly, the setup_file option points at the setup.py file of the model_beam_job package itself, this allows the command to package up any code files that the job depends on.
通過執行與本地測試作業相同的python腳本來發送作業,但是我們添加了更多命令行選項。 輸入和輸出選項的工作方式與作業的本地執行相同,但現在它們指向Google Cloud Storage存儲桶中的位置。 運行器選項告訴Beam框架我們要使用數據流運行器。 machine_type選項告訴Dataflow服務我們要在運行作業時使用該特定機器類型。 該項目選項指向我們上面創建的Google Cloud項目。 temp_location選項告訴Dataflow服務我們要將臨時文件存儲在我們用于輸入和輸出的同一Google Cloud Storage存儲桶中。 extra_package選項指向我們在上面創建的iris_model發行版tar.gz文件,該文件將與作業代碼一起發送到Dataflow服務。 最后,setup_file選項指向model_beam_job程序包本身的setup.py文件,這使命令可以打包作業所依賴的所有代碼文件。
Once we execute the command, the job will be started in the cloud. As the job runs it will output a link to a webpage that can be used to monitor the progress of the job. Once the job completes, the results will be in the Google Cloud Storage bucket that we created above.
一旦執行了命令,該作業將在云中啟動。 在作業運行時,它將輸出指向網頁的鏈接,該鏈接可用于監視作業的進度。 作業完成后,結果將保存在我們上面創建的Google Cloud Storage存儲桶中。
閉幕 (Closing)
By using the Beam framework, we are able to easily deploy a machine learning prediction job to the cloud. Because of the simple design of the Beam framework, a lot of the complexities of running a job on many computers are abstracted out. Furthermore, we are able to leverage all of the features of the Beam framework for advanced data processing.
通過使用Beam框架,我們能夠輕松地將機器學習預測作業部署到云中。 由于Beam框架的簡單設計,抽象出了在許多計算機上運行作業的許多復雜性。 此外,我們能夠利用Beam框架的所有功能進行高級數據處理。
One of the important features of this codebase is the fact that it can accept any machine learning model that implements the MLModel interface. By installing another model package and importing the class that inherits from the MLModel base class, we can easily deploy any number of models in the same Beam job without changing the code. However, we do need to change the pipeline definition to change or add models to it. Once again, the MLModel interface allowed us to abstract out the building a machine learning model from the complexity of deploying a machine learning model.
該代碼庫的重要功能之一是它可以接受任何實現MLModel接口的機器學習模型。 通過安裝另一個模型包并導入從MLModel基類繼承的類,我們可以輕松地在同一Beam作業中部署任意數量的模型,而無需更改代碼。 但是,我們確實需要更改管道定義以更改或添加模型。 MLModel接口再次使我們能夠從部署機器學習模型的復雜性中抽象出構建機器學習模型的過程。
One thing that we can improve about the code is the fact that the job only accepts files encoded as LDJSON. We did this to make the code easy to understand, but we can easily add other options for the format of the input data making the pipeline more flexible and easier to use.
關于代碼,我們可以改進的一件事是該作業僅接受編碼為LDJSON的文件。 我們這樣做是為了使代碼易于理解,但是我們可以輕松地為輸入數據的格式添加其他選項,從而使管道更加靈活和易于使用。
翻譯自: https://medium.com/@brianschmidt_78145/an-apache-beam-ml-model-deployment-ac31c6f2d9b2
apache beam
總結
以上是生活随笔為你收集整理的apache beam_Apache Beam ML模型部署的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 角距离恒星_恒星问卷调查的10倍机器学习
- 下一篇: 三星、一加等一大堆重磅新机即将到来 这个