当前位置: 首页 > 知识库问答 >
问题:

如何在Apache Flink正确初始化任务状态?

潘兴朝
2023-03-14

我正在开发基于Apache Flink的金融反欺诈系统。我需要根据金融交易计算许多不同的总量。我使用Kafka作为流数据源。例如,在平均交易金额计算中,我使用MapState存储总交易计数和每张卡的总金额。存储在Apache Accumulo的聚合数据。我知道Flink中的持久状态,但这不是我需要的。在计算开始之前,有没有办法将初始数据加载到Flink中?是否可以通过使用两个连接的流和来自Accumulo的数据以及最新计算的聚合和事务流来完成?事务流是无限的,按聚合流而不是。我该往哪边挖?非常感谢您的帮助。

我考虑过异步IO,但状态不能与异步函数一起使用。我的想法是:检查内存状态下的聚合。如果这里没有卡的数据-代码调用存储服务,从中提取数据,在内存状态下执行计算和更新,因此,该卡的下一个事务不需要通过调用外部数据服务来处理。但我认为这是一个很大的瓶颈。

共有1个答案

孟彦
2023-03-14

您可以这样尝试:

TASK::setInitialState
    TASK::invoke
        create basic utils (config, etc) and load the chain of operators
        setup-operators
        task-specific-init
        initialize-operator-states
        open-operators
        run
        close-operators
        dispose-operators
        task-specific-cleanup
        common-cleanup
 类似资料:
  • 问题内容: 将log4j添加到我的应用程序后,每次执行我的应用程序时,都会得到以下输出: 看来这意味着缺少配置文件。此配置文件应位于何处,什么是良好的入门内容? 我使用纯Java开发桌面应用程序。因此没有网络服务器等… 问题答案: 默认情况下,在上查找名为或的文件。 您可以按照此处所述通过设置系统属性来控制它用来初始化自身的文件(查找“默认初始化过程”部分)。 例如: 将导致在类路径上查找名为的文

  • 我有一个自定义模板文件,呈现一些产品和它们的添加到购物车按钮,我正在尝试手动实现。当按下Add to Cart时,页面将重新加载,包含一些数据的$_POST变量将添加新产品。“cart_contents_count”还反映添加的项。然而,当我转到购物车页面时,它是空的。请看下面的代码。 然而,当我转到正常的默认商店页面(/shop/)并从那里添加一个产品时,我的购物车页面表明已经添加了该产品。当我

  • 问题内容: 我正在设置用于学习JavaEE7中CDI的基本环境。我有以下代码可以启动。只是启动和关闭。 我正在控制台上关注。 有问题的线是。这仅表示依赖注入将不起作用。但是我不确定是什么问题。我已经添加了。我什至没有达到初始化对象的目的,那为什么会出现这个问题呢? Weld的官方文档还给出了阅读此答案后得到的相同代码。“ Antonio Goncalves”撰写的“ Beginning Java

  • 我们试图构建一个用例,其中来自流的数据通过计算公式运行,但公式本身也应该(很少)是可更新的。从阅读文档来看,在我看来,Flink broadcast state自然适合这种情况。 作为一个实验,我构建了一个简化的版本:假设我有一个整数流,第二个流包含这些整数的乘法因子(我可以随意发送值)。第二个流的频率很低,很容易在事件之间的几天或几周内出现。目前,这两个都实现为简单的套接字服务器,最终产品将使用

  • 我有两个React组件,即和使用了一些重要的UI组件,但我相信它们与我的问题无关。 在,使用effect调用,该函数解析为一个类别数组,例如,。 我的目标是访问父组件()中复选框的状态(选中或未选中)。我采取了这个问题中建议的方法。(见验证答案) 有趣的是,当我记录

  • 我们的项目中有一个不可替代代币状态和不可替代代币合约的自定义实现。我们正在使用下面的代码来发行我们的自定义不可替代代币。 当试图将上面获得的事务构建器转换为有线事务时(< code > builder . towiretransaction(service hub);)我们在下面的堆栈跟踪中得到一个错误。