当前位置: 首页 > 知识库问答 >
问题:

如何在flink环境中初始化flink作业的spring资源

蒲德曜
2023-03-14

我最近遇到了一些关于开发flink作业的问题,它引入了Spring和hibernate,并且作业将在flink集群上运行。所以我需要在运行任务管理器而不是作业管理器上的flink操作符之前初始化Spring资源。但是我找不到任何合适的StreamExecttion环境方法来做到这一点。

我尝试了以下一些方法:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
// etl business logic as flink operators
FlinkOperators.run();  
env.execute();

但是,当并行性不止一个的flink作业执行时,spring初始化不会出现在每个任务管理器进程中。所以我不能在Flink的工作中使用Spring。

有什么方法可以初始化flink作业上的spring资源吗?

谢谢。

顺致敬意,

阿尔文

共有1个答案

常自强
2023-03-14

每次需要为每个任务管理器进行某种上下文初始化时,都要构建一个静态函数(如果使用scala,则为对象内部的函数),将这些初始化值存储在静态变量中。

这应该足够了,因为静态值存储在每个任务管理器的内存中。

我使用此方法在每个任务管理器中加载属性文件,这些属性文件包含每个作业的配置。如果要加载文件,请检查每个taskmanager是否都有要加载的文件的副本。

 类似资料:
  • 我们正在部署一个新的Flink流处理作业,它的状态(存储)需要使用历史数据进行初始化,并且在开始处理任何新的应用程序事件之前,该数据应该在状态存储中可用。我们不想显着修改Flink作业以同时加载历史数据。我们考虑编写另一个单独的Flink作业来处理历史数据,更新其状态存储并创建一个Savepoint并使用此Savepoint在主Flink作业中初始化状态。看起来状态处理器API仅适用于DataSe

  • 我们试图构建一个用例,其中来自流的数据通过计算公式运行,但公式本身也应该(很少)是可更新的。从阅读文档来看,在我看来,Flink broadcast state自然适合这种情况。 作为一个实验,我构建了一个简化的版本:假设我有一个整数流,第二个流包含这些整数的乘法因子(我可以随意发送值)。第二个流的频率很低,很容易在事件之间的几天或几周内出现。目前,这两个都实现为简单的套接字服务器,最终产品将使用

  • 我正在开发基于Apache Flink的金融反欺诈系统。我需要根据金融交易计算许多不同的总量。我使用Kafka作为流数据源。例如,在平均交易金额计算中,我使用MapState存储总交易计数和每张卡的总金额。存储在Apache Accumulo的聚合数据。我知道Flink中的持久状态,但这不是我需要的。在计算开始之前,有没有办法将初始数据加载到Flink中?是否可以通过使用两个连接的流和来自Accu

  • 我正在使用Flink 1.3.2和scala构建一个流媒体应用程序,我的Flink应用程序将监视一个文件夹,并将新文件流到管道中。文件中的每条记录都有一个相关的时间戳。我想使用此时间戳作为事件时间,并使用AssignerWithPeriodicWatermarks构建水印,我的水印生成器如下所示: 但是,由于我的文件夹中有一些旧数据,我不想处理它们。旧文件中记录的时间戳是

  • 我正在使用EMR 5.30.0,并尝试使用以下命令提交Flink(1.10.0)作业 想知道是否每个提交的作业都试图创建一个Flink Yarn会话,而不是使用现有的会话。 谢谢Sateesh

  • 我正在运行一个流式flink作业,它消耗来自kafka的流式数据,在flink映射函数中对数据进行一些处理,并将数据写入Azure数据湖和弹性搜索。对于map函数,我使用了1的并行性,因为我需要在作为全局变量维护的数据列表上逐个处理传入的数据。现在,当我运行该作业时,当flink开始从kafka获取流数据时,它的背压在map函数中变得很高。有什么设置或配置我可以做以避免背压在闪烁?