当前位置: 首页 > 工具软件 > Faust > 使用案例 >

如何用Python Faust构建流数据管道(下篇)

廖臻
2023-12-01

本文是如何用Python和Faust创建流处理管道系列文章的第二部分。如果您还不熟悉Faust的一般概念,建议先阅读系列文章的第一部分。

今天,我们将建立一个简单的流处理管道,包含多个任务。这是Kafka streams的常见用例,也是探索Faust的一种有趣方式。

好的,现在让我们动起来!

项目布局

在开始新项目和学习新技术时,最令人沮丧的事情之一就是设置项目结构。在单个Python模块中启动Faust项目是完全可行的,但是如果您打算创建多个流处理任务,则最好从设置项目结构开始。

Faust为大中型项目提供了建议的布局,这种方法将以可安装库的形式分发项目。我们不会这样做,但仍将重用大多数建议的目录结构。

我们的项目结构如下:

+ pipeline/
    + pipeline/
        - __init__.py
        - __main__.py
        - app.py
        + fetcher/
            - __init__.py
            - agents.py
            - models.py
        + normaliser/
            - __init__.py
            - agents.py
            - models.py
    + requirements/
        - base.txt
        - prod.txt
        - test.txt
    + tests/
        - __init__.py
        - test_normaliser.py
    - Dockerfile
    - docker-compose.yaml

顶级文件夹pipeline是项目根目录,包含所有其他文件和目录,其中最重要的是嵌套的pipeline目录,包含app.py模块。

先创建项目文件夹。

mkdir pipeline && cd pipeline/

在项目根目录下创建Dockerfile和docker-compose.yml.

touch Dockerfile && \
touch docker-compose.yaml

创建嵌套的pipeline文件夹,存储源代码。

mkdir pipeline && \
mkdir requirements && \
mkdir tests

创建requirements.txt文件。

printf "%s\n" "faust==1.10.4" "pytz==2021.1" "requests==2.25.1" > requirements/base.txt && \
printf "%s\n" "-r base.txt" > requirements/prod.txt && \
printf "%s\n" "-r base.txt" "pytest==6.2.3" > requirements/test.txt

现在,进入我们的主应用程序目录。

cd pipeline/

Faust程序主入口

现在我们位于主应用程序目录( pipeline/pipeline/ )中,为Faust程序创建顶级文件。

touch __init__.py && \
touch __main__.py && \
touch app.py

我们还将为管道任务创建顶级目录。

mkdir fetcher && \
mkdir normaliser

将数据管道逻辑写入app.py

# app.py
import faust

VERSION = 1

PROJECT = "pipeline"  # our root directory
ORIGIN = "pipeline"  # our app directory

AUTODISCOVER = [
    f"{ORIGIN}.fetcher",
    f"{ORIGIN}.normaliser",
]

BROKER = "kafka://kafka:9092"

app = faust.App(
    PROJECT,
    version=VERSION,
    autodiscover=AUTODISCOVER,
    origin=ORIGIN,
    broker=BROKER,
)

def main() -> None:
    # HACK:
    # Waiting for kafka to be ready.
    # In production, you would add a wait script to your docker-compose
    # to make sure that kafka is ready when Faust starts.
    import time; time.sleep(10)

    app.main()

我们已经完成了Faust应用的基本设置,然后可以在__main__.py文件中使用它。

我们所做的就是利用标准参数实例化应用程序,要注意两件事情:

  • 代码使用了autodiscovery功能,但这不是必需的。
  • Faust的一个重要特征是集成RocksDB,但这里没有使用存储。

现在可以从__main__.py文件中导入app实例,这将成为我们的程序主入口。

# __main__.py
from .app import main

main()

Docker和docker-compose设置

将以下代码写入项目根目录下的Dockerfile

FROM python:3.8.6

WORKDIR /app

COPY requirements/*.txt /app/requirements/
RUN pip install --no-cache-dir --upgrade pip \
    && pip install --no-cache-dir -r requirements/prod.txt

COPY pipeline/*.py /app/pipeline/
COPY pipeline/fetcher/ /app/pipeline/fetcher/
COPY pipeline/normaliser/ /app/pipeline/normaliser/

ENTRYPOINT ["python", "-m", "pipeline", "worker", "-l", "info"]

使用docker-compose启动Zookeeper,Kafka和Faust服务,以下是docker-compose.yml。

version: "3"
services:
  zookeeper:
    image: "wurstmeister/zookeeper:latest"
    ports:
      - 2181:2181
  kafka:
    image: "wurstmeister/kafka:latest"
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    volumes:
      - /kafka
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - ALLOW_PLAINTEXT_LISTENER=yes
  faust:
    build: .
    depends_on:
      - kafka
    volumes:
      - ./:/app/

数据管道:Faust agents

现在,我们将创建实际的流处理逻辑,基本思路是:

  1. fetcher服务,定期从外部API提取数据,本例使用https://randomuser.me/api/
  2. normaliser服务,负责转换数据并输出到目标主题,过滤掉不想要的数据,重命名字段等。

Faust的流处理器称为代理(Agent),这是流传输管道的关键部分。在实现流处理逻辑之前,需要先定义输入和输出主题以及用于这些主题的模型。

提取数据

如果您不在应用程序pipeline目录中,请移至该目录。我们将创建fetcher文件夹及其关联文件。

cd fetcher && \
touch __init__.py && \
touch agents.py && \
touch models.py

在fetcher目录下定义主题和模型。

models.py文件中,添加以下内容。

# fetcher/models.py
import faust

from pipeline.app import app

class RawUser(faust.Record):
    id: dict
    gender: str
    name: dict
    location: dict
    email: str
    login: dict
    dob: dict
    registered: dict
    phone: str
    cell: str
    picture: dict
    nat: str

output_schema = faust.Schema(
    key_type=str,
    value_type=RawUser,
    key_serializer="json",
    value_serializer="json",
)

output_topic = app.topic("fetcher-0.0.1", schema=output_schema)

以上代码的第一件事是定义输出模型UserRecord,然后定义Faust Schema,该模型定义了键和值类型以及序列化程序。最后使用该模式来定义我们服务的输出主题以及主题名称。

一旦此模型设置就绪,我们就可以定义一个crontab,它将定期从目标API获取原始数据并推送到输出主题。

# fetcher/agents.py
import logging

import pytz
import requests

from pipeline.app import app
from .models import output_topic


logger = logging.getLogger(__name__)

@app.crontab("* * * * *", timezone=pytz.timezone("US/Eastern"), on_leader=True)
async def fetch():
	response = requests.get("https://randomuser.me/api/?results=50")
	response.raise_for_status()

	data = response.json()
	for result in data["results"]:
		key = result["id"]["value"]
		if not key:
		# randomuser.me has some some users with None or empty ID value,
		# we don't want to process these.
			continue

		logger.info("Fetched user with ID %(user_id)s", {"user_id": key})

		await output_topic.send(key=key, value=result)

我们已经设置了初始服务,从外部API提取数据。该服务是一项cron作业( app.crontab装饰器)该,每分钟从目标API获取原始数据,然后推送到output_topic( fetcher-0.0.1 ),这里使用user.id.value作为消息密钥。

数据转换

为了建立适当的数据处理管道,至少需要几个服务,接下来进行数据转换。

# assumes you were in the fetcher/ directory
cd .. && cd normaliser && \
touch __init__.py && \
touch agents.py && \
touch models.py

normaliser/models.py

# normaliser/models.py
import faust

from pipeline.app import app


class RawUser(faust.Record):
    id: dict
    gender: str
    name: dict
    location: dict
    email: str
    login: dict
    dob: dict
    registered: dict
    phone: str
    cell: str
    picture: dict
    nat: str


input_schema = faust.Schema(
    key_type=str,
    value_type=RawUser,
    key_serializer="json",
    value_serializer="json",
)

input_topic = app.topic("fetcher-0.0.1", schema=input_schema)


class NormalisedUser(faust.Record):
    id: str
    name: str
    cell: str
    email: str


output_schema = faust.Schema(
    key_type=str,
    value_type=NormalisedUser,
    key_serializer="json",
    value_serializer="json",
)

output_topic = app.topic("normaliser-0.0.1", schema=output_schema)

normaliser/agents.py

# normaliser/agents.py
import logging

from pipeline.app import app
from .models import input_topic, output_topic


logger = logging.getLogger(__name__)


def normalise_user(raw_user):
    return {
        "id": raw_user["id"]["value"],
        "name": f"{raw_user['name']['first']} {raw_user['name']['last']}",
        "cell": raw_user["cell"],
        "email": raw_user["email"],
    }


@app.agent(input_topic)
async def consume(stream):
    async for record in stream:
        raw_user = record.asdict()

        normalised_user = normalise_user(raw_user)
        key = normalised_user["id"]

        logger.info("Normalised user with ID %(user_id)s", {"user_id": key})

        await output_topic.send(key=key, value=normalised_user)

这里的转换逻辑非常简单,过滤一些字段,并展平其他两个字段,即idname

app.agent装饰器定义Faust代理,它对提取服务产生的原始数据流进行迭代,对每个单独的用户进行数据转换,然后将规范化结果推送到输出主题,等待其它客户端程序处理。

运行管道

至此已经完成了数据管道的全部逻辑,是时候进行测试了,转到顶级目录并运行以下命令:

docker-compose up --abort-on-container-exit

结论

本文的内容相对较多,在阅读完这个简单的案例后,希望您对Faust构建数据管道能有一个深入的理解。


数据黑客:专注数据工程和机器学习,提供开源数据接口。

  • 作者:8mincode
  • 来源:https://www.8mincode.com/
  • 原文:Stream processing with Python Faust: Part I – General Concepts
  • 翻译:数据黑客
 类似资料: