Faust是一个将Kafka Streams的概念移植到Python的三方库。如果您不熟悉Kafka,那么在阅读该系列之前,最好先阅读Kafka文档和Kafka Streams。
我们先介绍Faust的基础知识,包括核心概念和通用API。
Kafka Streams在Apache Kafka之上为客户端库提供了一些抽象。因为Faust在很大程度上重用了完全相同的概念,所以学习Faust的过程就是学习Kafka Streams的过程。
好的,让我们开始吧!
应用程序(Application)是Faust流处理过程的起点,它是该库的一个实例,并通过Python装饰器提供对Faust大部分核心API的访问。
要创建应用程序,您需要一个应用程序ID,一个代理(broker)和一个驱动程序(driver)以用于持久存储数据(可选)。
import faust
app = faust.App('my-app-id', broker='kafka://', store='rocksdb://')
用Kafka Streams术语来说,Faust代理是一个流处理器,它订阅一个主题并处理每条消息。
在Faust中,代理(Agent)用于装饰异步函数,可以并行处理无限数据流。如果您不熟悉asyncio,则需要先查看asyncio的官方文档。
该代理用作您的处理函数的装饰器,异步函数必须使用异步for循环遍历数据流。
@app.agent(value_type=Order)
async def order(orders):
async for order in orders:
# process infinite stream of orders.
print(f'Order for {order.account_id}: {order.amount}')
流(Stream)本身就是您要处理的数据,您需要在异步函数中进行迭代。
处理器(Processor)是进行数据转换的函数,您可以创建任意数量的处理器,并且可以按顺序链接它们。
def add_default_language(value: MyModel) -> MyModel:
if not value.language:
value.language = 'US'
return value
async def add_client_info(value: MyModel) -> MyModel:
value.client = await get_http_client_info(value.account_id)
return value
s = app.stream(my_topic,
processors=[add_default_language, add_client_info])
根据您的用例,您可能希望保留数据(例如,您想收集每天在链接上的点击次数)。
Table以分片的方式实现这一点,它们使用RocksDB在后台进行分布式键/值存储。在您的代码中,使用普通的Python字典来存储数据即可。
click_topic = app.topic('clicks', key_type=str, value_type=int)
# default value for missing URL will be 0 with `default=int`
counts = app.Table('click_counts', default=int)
@app.agent(click_topic)
async def count_click(clicks):
async for url, count in clicks.items():
counts[url] += count
模型(Model)描述数据结构的字段,主要用于订阅数据的反序列化。
在编写本文时,唯一支持的类型是Record类型,它等效于Python字典(键值映射)。
本文对Python Faust和Kafka Streams进行了简单的介绍,希望您已经理解了Faust的一般概念,在下一篇文章中,我们将尝试做一些有趣的应用。
数据黑客:专注数据工程和机器学习,提供开源数据接口。