当前位置: 首页 > 工具软件 > Faust > 使用案例 >

如何用Python Faust构建流数据管道(上篇)

潘彦
2023-12-01

Faust是一个将Kafka Streams的概念移植到Python的三方库。如果您不熟悉Kafka,那么在阅读该系列之前,最好先阅读Kafka文档Kafka Streams

我们先介绍Faust的基础知识,包括核心概念和通用API。

Kafka Streams在Apache Kafka之上为客户端库提供了一些抽象。因为Faust在很大程度上重用了完全相同的概念,所以学习Faust的过程就是学习Kafka Streams的过程。

好的,让我们开始吧!

应用(Application)

应用程序(Application)是Faust流处理过程的起点,它是该库的一个实例,并通过Python装饰器提供对Faust大部分核心API的访问。

要创建应用程序,您需要一个应用程序ID,一个代理(broker)和一个驱动程序(driver)以用于持久存储数据(可选)。

import faust

app = faust.App('my-app-id', broker='kafka://', store='rocksdb://')

代理(Agent),流(Stream)和处理器(Processor)

用Kafka Streams术语来说,Faust代理是一个流处理器,它订阅一个主题并处理每条消息。

在Faust中,代理(Agent)用于装饰异步函数,可以并行处理无限数据流。如果您不熟悉asyncio,则需要先查看asyncio的官方文档

该代理用作您的处理函数的装饰器,异步函数必须使用异步for循环遍历数据流。

@app.agent(value_type=Order)
async def order(orders):
    async for order in orders:
        # process infinite stream of orders.
        print(f'Order for {order.account_id}: {order.amount}')

流(Stream)本身就是您要处理的数据,您需要在异步函数中进行迭代。

处理器(Processor)是进行数据转换的函数,您可以创建任意数量的处理器,并且可以按顺序链接它们。

def add_default_language(value: MyModel) -> MyModel:
    if not value.language:
        value.language = 'US'
    return value

async def add_client_info(value: MyModel) -> MyModel:
    value.client = await get_http_client_info(value.account_id)
    return value

s = app.stream(my_topic,
               processors=[add_default_language, add_client_info])

Table

根据您的用例,您可能希望保留数据(例如,您想收集每天在链接上的点击次数)。

Table以分片的方式实现这一点,它们使用RocksDB在后台进行分布式键/值存储。在您的代码中,使用普通的Python字典来存储数据即可。

click_topic = app.topic('clicks', key_type=str, value_type=int)

# default value for missing URL will be 0 with `default=int`
counts = app.Table('click_counts', default=int)

@app.agent(click_topic)
async def count_click(clicks):
    async for url, count in clicks.items():
        counts[url] += count

模型(Model)和记录(Record)

模型(Model)描述数据结构的字段,主要用于订阅数据的反序列化。

在编写本文时,唯一支持的类型是Record类型,它等效于Python字典(键值映射)。

结论

本文对Python Faust和Kafka Streams进行了简单的介绍,希望您已经理解了Faust的一般概念,在下一篇文章中,我们将尝试做一些有趣的应用。

参考


数据黑客:专注数据工程和机器学习,提供开源数据接口。

  • 作者:8mincode
  • 来源:https://www.8mincode.com/
  • 原文:Stream processing with Python Faust: Part I – General Concepts
  • 翻译:数据黑客
 类似资料: