当前位置: 首页 > 知识库问答 >
问题:

如何利用历史数据集丰富Flink数据流

雍飞雨
2023-03-14

我正在用Flink做一个实时项目,我需要用以前的交易丰富每一张卡的状态,以计算如下的交易特性:

对于每一张卡,我都有一个功能,可以统计过去24小时内的交易次数。另一方面,我有两个数据源:

transactionsHistory.connect(transactionsStream).flatMap(new         
RichCoFlatMapFunction<History, Tuple2<String, Transaction>,         
ExtractedFeatures>() {
    private ValueState<History> history;
    @Override
    public void open(Configuration config) throws Exception {
        this.history = getRuntimeContext().getState(new 
    ValueStateDescriptor<>("card history", History.class));
    }
    //historical data 
    @Override
    public void flatMap1(History history, 
    Collector<ExtractedFeatures> collector) throws Exception {
        this.history.update(history);
    }
    //new transactions from stream 
    @Override
    public void flatMap2(Tuple2<String, Transaction> 
    transactionTuple, Collector<ExtractedFeatures> collector) throws 
    Exception {
        History history = this.history.value();
        Transaction transaction = transactionTuple.f1;
        ArrayList<History> prevDayHistoryList = 
        history.prevDayTransactions;

        // This function returns transactions which are in 24 hours 
        //window of the current transaction and their count.
        Tuple2<ArrayList<History>, Integer> prevDayHistoryTuple = 
        findHistoricalDate(prevDayHistoryList,
                transaction.transactionLocalDate);
        prevDayHistoryList = prevDayHistoryTuple.f0;
        history.prevDayTransactions = prevDayHistoryList;
        this.history.update(history);
        ExtractedFeatures ef = new ExtractedFeatures();
        ef.updateFeatures(transaction, prevDayHistoryTuple.f1);
        collector.collect(ef);
    }
});

在Flink流中使用静态数据集丰富数据流

任何帮助都是非常感激的。

共有1个答案

梁丘书
2023-03-14

但是,由于数据库数据本身不是流,因此输出是不正确的。

当然,可以使用来自关系数据库的信息来丰富流数据。然而,最棘手的是如何保证浓缩数据在需要之前就被吸收了。通常,您可能需要缓冲要充实的流,直到充实数据被引导/摄取。例如,有时采取的一种方法是

  1. 在禁用“要充实的流”的情况下运行应用程序
  2. 一旦浓缩数据被完全摄取并以闪烁状态存储,就获取保存点
  3. 在启用了要充实的流的情况下,从保存点重新启动应用程序
    null

对这个用例的更好的支持计划在未来的版本中,BTW。

 类似资料:
  • 我正在编写一个Flink流程序,其中我需要使用一些静态数据集(信息库,IB)来丰富用户事件的数据流。 对于例如。假设我们有一个买家的静态数据集,并且我们有一个事件的clickstream,对于每个事件,我们要添加一个布尔标志,指示事件的实施者是否是买家。 另一个选择可以是使用托管操作员状态来存储购买者设置,但是我如何保持按用户id分配的该状态,以避免在单个事件查找中使用网络I/O呢?在内存状态后端

  • 数据丰富是指用于增强,改进和改进原始数据的一系列过程。 它指的是有用的数据转换(原始数据到有用信息)。 数据丰富过程的重点是使数据成为现代企业或企业的宝贵数据资产。 最常见的数据丰富过程包括通过使用特定的决策算法来纠正数据库中的拼写错误或印刷错误。 数据丰富工具为简单数据表添加有用信息。 考虑以下代码进行单词拼写纠正 - import re from collections import Coun

  • 在我的应用程序中,我想丰富一个无限的事件流。流本身是通过对ID进行散列来并行的。对于每个事件,都可能有一个对外部源(例如REST和DB)的调用。这个呼叫本质上是阻塞的。必须保持一个流分区内事件的顺序。 我的想法是创建一个RichMapFunction,它设置连接,然后轮询每个事件的外部源。阻塞调用通常耗时不长,但在最坏的情况下,服务可能会关闭。

  • 任何push到open-falcon中的数据,事后都可以通过api组件提供的restAPI,来查询得到。 具体请参考API文档

  • 问题是如何在例如主详细表上实现数据变化的跟踪,即Spring Boot/Spring数据中一对多关系中的两个实体。 在存储数据后,能够获得主实体及其特定版本的详细信息,并具有将其还原到特定版本的功能。

  • 我正在开发一个应用程序,希望在实时事件和过去事件上运行Flink SQL。我尝试了一个POC,其中Flink在Kafka等流源上运行SQL,SQL查询只返回新的事件/更改。但是,我想对所有数据运行SQL,有些数据可能会随着时间的推移而改变。基本上我的要求就是连续查询整个数据。如何通过Flink或其他流媒体解决方案实现这一点?