本文是如何用Python和Faust创建流处理管道系列文章的第二部分。如果您还不熟悉Faust的一般概念,建议先阅读系列文章的第一部分。
今天,我们将建立一个简单的流处理管道,包含多个任务。这是Kafka streams的常见用例,也是探索Faust的一种有趣方式。
好的,现在让我们动起来!
在开始新项目和学习新技术时,最令人沮丧的事情之一就是设置项目结构。在单个Python模块中启动Faust项目是完全可行的,但是如果您打算创建多个流处理任务,则最好从设置项目结构开始。
Faust为大中型项目提供了建议的布局,这种方法将以可安装库的形式分发项目。我们不会这样做,但仍将重用大多数建议的目录结构。
我们的项目结构如下:
+ pipeline/
+ pipeline/
- __init__.py
- __main__.py
- app.py
+ fetcher/
- __init__.py
- agents.py
- models.py
+ normaliser/
- __init__.py
- agents.py
- models.py
+ requirements/
- base.txt
- prod.txt
- test.txt
+ tests/
- __init__.py
- test_normaliser.py
- Dockerfile
- docker-compose.yaml
顶级文件夹pipeline
是项目根目录,包含所有其他文件和目录,其中最重要的是嵌套的pipeline
目录,包含app.py
模块。
先创建项目文件夹。
mkdir pipeline && cd pipeline/
在项目根目录下创建Dockerfile和docker-compose.yml.
touch Dockerfile && \
touch docker-compose.yaml
创建嵌套的pipeline
文件夹,存储源代码。
mkdir pipeline && \
mkdir requirements && \
mkdir tests
创建requirements.txt
文件。
printf "%s\n" "faust==1.10.4" "pytz==2021.1" "requests==2.25.1" > requirements/base.txt && \
printf "%s\n" "-r base.txt" > requirements/prod.txt && \
printf "%s\n" "-r base.txt" "pytest==6.2.3" > requirements/test.txt
现在,进入我们的主应用程序目录。
cd pipeline/
现在我们位于主应用程序目录( pipeline/pipeline/
)中,为Faust程序创建顶级文件。
touch __init__.py && \
touch __main__.py && \
touch app.py
我们还将为管道任务创建顶级目录。
mkdir fetcher && \
mkdir normaliser
将数据管道逻辑写入app.py
。
# app.py
import faust
VERSION = 1
PROJECT = "pipeline" # our root directory
ORIGIN = "pipeline" # our app directory
AUTODISCOVER = [
f"{ORIGIN}.fetcher",
f"{ORIGIN}.normaliser",
]
BROKER = "kafka://kafka:9092"
app = faust.App(
PROJECT,
version=VERSION,
autodiscover=AUTODISCOVER,
origin=ORIGIN,
broker=BROKER,
)
def main() -> None:
# HACK:
# Waiting for kafka to be ready.
# In production, you would add a wait script to your docker-compose
# to make sure that kafka is ready when Faust starts.
import time; time.sleep(10)
app.main()
我们已经完成了Faust应用的基本设置,然后可以在__main__.py
文件中使用它。
我们所做的就是利用标准参数实例化应用程序,要注意两件事情:
现在可以从__main__.py
文件中导入app
实例,这将成为我们的程序主入口。
# __main__.py
from .app import main
main()
将以下代码写入项目根目录下的Dockerfile
。
FROM python:3.8.6
WORKDIR /app
COPY requirements/*.txt /app/requirements/
RUN pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir -r requirements/prod.txt
COPY pipeline/*.py /app/pipeline/
COPY pipeline/fetcher/ /app/pipeline/fetcher/
COPY pipeline/normaliser/ /app/pipeline/normaliser/
ENTRYPOINT ["python", "-m", "pipeline", "worker", "-l", "info"]
使用docker-compose启动Zookeeper,Kafka和Faust服务,以下是docker-compose.yml。
version: "3"
services:
zookeeper:
image: "wurstmeister/zookeeper:latest"
ports:
- 2181:2181
kafka:
image: "wurstmeister/kafka:latest"
depends_on:
- zookeeper
ports:
- 9092:9092
volumes:
- /kafka
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_HOST_NAME=kafka
- ALLOW_PLAINTEXT_LISTENER=yes
faust:
build: .
depends_on:
- kafka
volumes:
- ./:/app/
现在,我们将创建实际的流处理逻辑,基本思路是:
fetcher
服务,定期从外部API提取数据,本例使用https://randomuser.me/api/normaliser
服务,负责转换数据并输出到目标主题,过滤掉不想要的数据,重命名字段等。Faust的流处理器称为代理(Agent),这是流传输管道的关键部分。在实现流处理逻辑之前,需要先定义输入和输出主题以及用于这些主题的模型。
如果您不在应用程序pipeline
目录中,请移至该目录。我们将创建fetcher文件夹及其关联文件。
cd fetcher && \
touch __init__.py && \
touch agents.py && \
touch models.py
在fetcher目录下定义主题和模型。
在models.py
文件中,添加以下内容。
# fetcher/models.py
import faust
from pipeline.app import app
class RawUser(faust.Record):
id: dict
gender: str
name: dict
location: dict
email: str
login: dict
dob: dict
registered: dict
phone: str
cell: str
picture: dict
nat: str
output_schema = faust.Schema(
key_type=str,
value_type=RawUser,
key_serializer="json",
value_serializer="json",
)
output_topic = app.topic("fetcher-0.0.1", schema=output_schema)
以上代码的第一件事是定义输出模型UserRecord
,然后定义Faust Schema
,该模型定义了键和值类型以及序列化程序。最后使用该模式来定义我们服务的输出主题以及主题名称。
一旦此模型设置就绪,我们就可以定义一个crontab,它将定期从目标API获取原始数据并推送到输出主题。
# fetcher/agents.py
import logging
import pytz
import requests
from pipeline.app import app
from .models import output_topic
logger = logging.getLogger(__name__)
@app.crontab("* * * * *", timezone=pytz.timezone("US/Eastern"), on_leader=True)
async def fetch():
response = requests.get("https://randomuser.me/api/?results=50")
response.raise_for_status()
data = response.json()
for result in data["results"]:
key = result["id"]["value"]
if not key:
# randomuser.me has some some users with None or empty ID value,
# we don't want to process these.
continue
logger.info("Fetched user with ID %(user_id)s", {"user_id": key})
await output_topic.send(key=key, value=result)
我们已经设置了初始服务,从外部API提取数据。该服务是一项cron作业( app.crontab
装饰器)该,每分钟从目标API获取原始数据,然后推送到output_topic( fetcher-0.0.1
),这里使用user.id.value
作为消息密钥。
为了建立适当的数据处理管道,至少需要几个服务,接下来进行数据转换。
# assumes you were in the fetcher/ directory
cd .. && cd normaliser && \
touch __init__.py && \
touch agents.py && \
touch models.py
normaliser/models.py
:
# normaliser/models.py
import faust
from pipeline.app import app
class RawUser(faust.Record):
id: dict
gender: str
name: dict
location: dict
email: str
login: dict
dob: dict
registered: dict
phone: str
cell: str
picture: dict
nat: str
input_schema = faust.Schema(
key_type=str,
value_type=RawUser,
key_serializer="json",
value_serializer="json",
)
input_topic = app.topic("fetcher-0.0.1", schema=input_schema)
class NormalisedUser(faust.Record):
id: str
name: str
cell: str
email: str
output_schema = faust.Schema(
key_type=str,
value_type=NormalisedUser,
key_serializer="json",
value_serializer="json",
)
output_topic = app.topic("normaliser-0.0.1", schema=output_schema)
normaliser/agents.py
:
# normaliser/agents.py
import logging
from pipeline.app import app
from .models import input_topic, output_topic
logger = logging.getLogger(__name__)
def normalise_user(raw_user):
return {
"id": raw_user["id"]["value"],
"name": f"{raw_user['name']['first']} {raw_user['name']['last']}",
"cell": raw_user["cell"],
"email": raw_user["email"],
}
@app.agent(input_topic)
async def consume(stream):
async for record in stream:
raw_user = record.asdict()
normalised_user = normalise_user(raw_user)
key = normalised_user["id"]
logger.info("Normalised user with ID %(user_id)s", {"user_id": key})
await output_topic.send(key=key, value=normalised_user)
这里的转换逻辑非常简单,过滤一些字段,并展平其他两个字段,即id
和name
。
app.agent
装饰器定义Faust代理,它对提取服务产生的原始数据流进行迭代,对每个单独的用户进行数据转换,然后将规范化结果推送到输出主题,等待其它客户端程序处理。
至此已经完成了数据管道的全部逻辑,是时候进行测试了,转到顶级目录并运行以下命令:
docker-compose up --abort-on-container-exit
本文的内容相对较多,在阅读完这个简单的案例后,希望您对Faust构建数据管道能有一个深入的理解。
数据黑客:专注数据工程和机器学习,提供开源数据接口。