当前位置: 首页 > 知识库问答 >
问题:

带阿帕奇梁的Spring

万坚壁
2023-03-14
 public static void main(String[] args) {
    initSpringApplicationContext();

    GcmOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(GcmOptions.class);
    Pipeline pipeline = Pipeline.create(options);
    // pipeline definition
}

我想将spring应用程序上下文注入到每个ParDo函数中。

共有1个答案

曹焱
2023-03-14

这里的问题是,ApplicationContext在任何辅助机器上都不可用,因为main方法只在构造作业时调用,而不是在任何辅助机器上调用。因此,InitSpringApplicationContext永远不会在任何工作者上调用。

我从未尝试过在Apache Beam中使用Spring,但我想在静态初始化器块中移动initSpringApplicationContext会得到预期的结果。

public class ApplicationContextHolder {

    private static final ApplicationContext CTX;

    static {
        CTX = initApplicationContext();
    }

    public static ApplicationContext getContext() {
        return CTX;
    }
}

请注意,这不应该被认为是在Apache Beam中使用Spring的最佳实践,因为它在Apache Beam的生命周期中没有很好地集成。例如,当在初始化应用程序上下文期间发生错误时,它将出现在使用applicationcontextholder的第一个位置。因此,我建议从静态初始化器块中提取initapplicationcontext并针对Apache Beam的生命周期显式调用它。设置阶段将是一个很好的地方。

 类似资料:
  • Apache Kafka:分布式消息传递系统 Apache Storm:实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据? 在实时数据管道方面,我觉得两者做的工作是一样的。如何在数据管道上同时使用这两种技术?

  • 我正在使用Flink从Apache Pulsar读取数据。我在pulsar中有一个分区主题,有8个分区。在本主题中,我生成了1000条消息,分布在8个分区中。我的笔记本电脑中有8个内核,因此我有8个子任务(默认情况下,并行度=#个内核)。在执行Eclipse中的代码后,我打开了Flink UI,发现一些子任务没有收到任何记录(空闲)。我希望所有8个子任务都能得到利用(我希望每个子任务都映射到我的主

  • 我们需要的是直接的API来设置和使用集群消息队列。我们最初的计划是使用Camel在集群JMS或ActiveMQ队列上进行消费/生产。Kafka如何使这项任务变得更容易?在任何一种情况下,应用程序本身都将在WebLogic服务器上运行。 消息传递将是点对点类型,其中有多个相同服务的实例在运行,但根据负载平衡策略,只有一个实例应该处理消息并发出结果。消息队列也是群集的,因此服务实例或队列实例的失败都不

  • 目前我正在研究Apache spark和Apache ignite框架。 这篇文章介绍了它们之间的一些原则差异,但我意识到我仍然不理解它们的目的。 我的意思是,哪一个问题更容易产生火花而不是点燃,反之亦然?

  • 我正在做一个学术项目,涉及传感器的流数据。我已经包围了苍鹭(Storm的接班人)和尼菲。两者都支持内置背压,这对我的项目至关重要。Apache Nifi和Heron之间的主要区别是什么? 哪款更适合物联网应用?

  • 我正试图找出这两种设置之间的区别。大小和缓冲区。Kafka制作人的记忆。 据我所知。大小:这是可以发送的批次的最大大小。 文档描述了缓冲区。memory as:生产者可以用来缓冲等待发送的记录的内存字节。 我不明白这两者之间的区别。有人能解释一下吗? 谢啦