当前位置: 首页 > 工具软件 > QuestDB > 使用案例 >

使用 QuestDB 和 Apache Kafka 处理时间序列数据

白翔
2023-12-01

Apache Kafka 是一个经过实战考验的分布式流处理平台,在金融行业中很受欢迎,用于处理任务关键型事务工作负载。Kafka 处理大量实时市场数据的能力使其成为交易、风险管理和欺诈检测的核心基础设施组件。金融机构使用 Kafka 从市场数据源、交易数据和其他外部来源流式传输数据,以推动决策。

用于摄取和存储财务数据的常见数据管道涉及将实时数据发布到 Kafka,并利用 Kafka Connect 将其流式传输到数据库。例如,市场数据团队可能会不断更新 Kafka 证券的实时报价,交易团队可能会使用该数据来下达买入/卖出订单。然后,处理后的市场数据和订单可以保存到时间序列数据库中以供进一步分析。

在本文中,我们将创建一个示例数据管道,以说明这在实践中是如何工作的。我们将轮询外部数据源(FinnHub)以获取股票和ETF的实时报价,并将该信息发布到Kafka。然后,Kafka Connect将获取这些记录并将其发布到时间序列数据库(QuestDB)进行分析。

先决条件

  • Git
  • Docker Engine: 20.10+
  • Golang: 1.19+
  • FinnHub API Token

设置

若要在本地运行示例,请先克隆存储库。

代码库分为三个部分:

  • Golang 代码位于存储库的根目录。
  • Kafka Connect QuestDB 映像和 Docker Compose YAML 文件的 Dockerfile 位于 docker 下。
  • Kafka Connect 接收器的 JSON 文件位于 kafka-connect-sinks 下。

构建 Kafka Connect QuestDB 映像

我们首先需要使用 QuestDB Sink 连接器构建 Kafka Connect docker 镜像。导航到该目录并运行

docker-compose build

Dockerfile 只是在 Confluent Kafka Connect 基础映像之上通过 Confluent Hub 安装 Kafka QuestDB 连接器:

FROM confluentinc/cp-kafka-connect-base:7.3.2
RUN confluent-hub install --no-prompt questdb/kafka-questdb-connector:0.6

启动 Kafka、Kafka Connect 和 QuestDB

接下来,我们将通过 Docker Compose 设置基础结构。在同一目录中,在后台运行 Docker Compose:

docker-compose up -d

这将启动Kafka + Zookeeper,我们安装了QuestDB连接器的自定义Kafka Connect映像,以及QuestDB。Docker Compose 文件的完整内容请参见原文。

启动 QuestDB Kafka Connect Sink

等待 Docker 容器正常运行(kafka-connect 映像将记录消息),然后我们可以创建 Kafka Connect 接收器。我们将创建两个汇:一个用于特斯拉,一个用于SPY(SPDR S&P 500 ETF),以比较波动股票和整体市场的价格趋势。

发出以下命令以在目录中创建 Tesla 接收器:

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @kafka-connect-sinks/questdb-sink-TSLA.json <http://localhost:8083/connectors>

发出以下命令以在目录中创建 SPY 接收器:

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @kafka-connect-sinks/questdb-sink-SPY.json <http://localhost:8083/connectors>

流式传输实时股票报价

现在我们已经设置了数据管道,我们准备将实时股票报价流式传输到 Kafka 并将其存储在 QuestDB 中。

首先,我们需要从 Finnhub Stock API 获取一个免费的 API 代币。在线创建一个免费帐户并复制 API 密钥。

将该密钥导出到我们的 shell 下:

export FINNHUB_TOKEN=<my-token-here>

实时报价端点返回各种属性,例如当前价格、最高价/最低价/开盘价以及之前的收盘价。由于我们只对当前价格感兴趣,因此我们只获取价格并将票证符号和时间戳添加到 Kafka JSON 消息中。

详细的 Golang 代码请参见原文。

若要开始流式传输数据,请运行代码:

$ go run main.go

要同时获取 SPY 的数据,请打开另一个终端窗口,将符号的代码修改为 SPY,并使用设置的令牌值运行代码。

结果

运行生产者代码后,它将打印出发送给 Kafka 的消息,例如:“Message published to Kafka: {“symbol”:”TSLA”,”price”:174.48,”timestamp”:1678743220215}”。这些数据被发送到Kafka主题topic_TSLA,并通过Kafka Connect接收器发送到QuestDB。

然后我们可以导航到 localhost:9000 以访问 QuestDB 控制台。在topic_TSLA表中搜索所有记录,我们可以看到我们的实时市场报价:

SELECT * FROM ‘topic_TSLA’

我们还可以从以下方面查看 SPY 数据:

SELECT * FROM ‘topic_SPY’

现在数据在 QuestDB 中,我们可以通过获取 2m 窗口的平均价格来查询聚合信息:

SELECT avg(price), timestamp FROM topic_SPY SAMPLE BY 2m;

通过获取 2m 窗口的平均,我们可以比较 SPY 和特斯拉的价格趋势。

 类似资料: