当前位置: 首页 > 知识库问答 >
问题:

在将Spring Cloud Stream与Kafka一起使用时,如何执行优雅的应用程序关闭?

鲁旭
2023-03-14

我有一个使用Spring Cloud Stream(v1.3.0)和Kafka(v1.1.6)的Spring boot(v.1.57)应用程序。我希望能够优雅地关闭它,即关闭时,所有流监听器(即用@StreamListener注释)应该:

  1. 停止轮询新消息
  2. 完成他们的工作
  3. 将偏移提交给Kafka

我注意到ContainerProperties(默认设置为10000ms)中有一个名为“Shutdown Timeout”的属性,所以我尝试通过像这样的反射扩展ConvoltKafkaListenerContainerFactoryConfigrer类来将其修改为30000(因为它有一个@先决条件OnMissingBean注释):

@Slf4j
@Component
public class BehalfConcurrentKafkaListenerContainerFactoryConfigurer extends ConcurrentKafkaListenerContainerFactoryConfigurer {

    @Autowired
    private KafkaProperties kproperties;

    @Override
    public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
                          ConsumerFactory<Object, Object> consumerFactory) {
        PropertyAccessor myAccessor = PropertyAccessorFactory.forDirectFieldAccess(this);
        myAccessor.setPropertyValue("properties", kproperties);

        ContainerProperties containerProperties = listenerContainerFactory
                .getContainerProperties();
        super.configure(listenerContainerFactory, consumerFactory);
        containerProperties.setShutdownTimeout(30000);
    }
}

但这并不成功。还尝试将其(关机超时:30000)放入应用程序。yml在spring cloud stream binder设置下,但同样没有帮助。

有什么方法可以控制关机过程并实现我的目标吗?

共有1个答案

鲜于俊侠
2023-03-14

编辑

不再需要进行这种反射黑客;只需将ListenerContainerCustomizer添加到应用程序上下文中。请参见此处。

EDIT_END

SpringKafka1.1。不再支持x;您应该将1.3.9与boot 1.5一起使用。x。

当前引导1.5。x版本为1.5.21。

你应该立即升级。

然而,所有这些项目都有更新的版本。

Spring Cloud Stream不使用该工厂或引导属性来创建其容器;它没有公开在容器上配置该属性的机制。

Spring Cloud Stream 2.1添加了ListenerContainerCustomizer,它允许您通过在其上设置任何属性来自定义绑定容器。

我建议您升级到Boot 2.1.6和Spring Cloud Stream Germantown(2.2.0)。

编辑

这是一个有点黑客,但它应该工作,直到你可以升级到一个新的流版本。。。

@SpringBootApplication
@EnableBinding(Sink.class)
public class So56883620Application {

    public static void main(String[] args) {
        SpringApplication.run(So56883620Application.class, args).close();
    }

    private final CountDownLatch latch = new CountDownLatch(1);

    @StreamListener(Sink.INPUT)
    public void listen(String in) throws InterruptedException {
        this.latch.countDown();
        System.out.println(in);
        Thread.sleep(6_000);
        System.out.println("exiting");
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            IntStream.range(0,2).forEach(i -> template.send("mytopic", ("foo" + i).getBytes()));
            // wait for listener to start
            this.latch.await(10, TimeUnit.SECONDS);
            System.out.println("Shutting down");
        };
    }

    @Bean
    public SmartLifecycle bindingFixer(BindingService bindingService) {
        return new SmartLifecycle() {

            @Override
            public int getPhase() {
                return Integer.MAX_VALUE;
            }

            @Override
            public void stop() {
                // no op
            }

            @Override
            public void start() {
                @SuppressWarnings("unchecked")
                Map<String, Binding<?>> consumers = (Map<String, Binding<?>>) new DirectFieldAccessor(bindingService)
                        .getPropertyValue("consumerBindings");
                @SuppressWarnings("unchecked")
                Binding<?> inputBinding = ((List<Binding<?>>) consumers.get("input")).get(0);
                ((AbstractMessageListenerContainer<?, ?>) new DirectFieldAccessor(inputBinding)
                        .getPropertyValue("lifecycle.messageListenerContainer"))
                                .getContainerProperties().setShutdownTimeout(30_000L);
            }

            @Override
            public boolean isRunning() {
                return false;
            }

            @Override
            public void stop(Runnable callback) {
                callback.run();
            }

            @Override
            public boolean isAutoStartup() {
                return true;
            }
        };
    }

}
 类似资料:
  • 我正在将FlinkKafkaConsumer010与Flink 1.2.0一起使用,我面临的问题是:如果出现某种情况,是否有办法通过编程关闭整个管道? 一种可能的解决方案是,我可以通过调用FlinkKafkaConsumer010中定义的close()方法关闭kafka消费源,然后调用带有shutdown的管道。对于这种方法,我创建了一个列表,其中包含对所有FlinkKafkaConsumer01

  • 问题内容: 如何在RoR中使用CSS?当我从外部链接时,我永远看不到文件。我将.css文件cp到我能想到的每个文件夹…视图,控制器,模板,似乎没有任何作用。 要使用Rails应用程序启用外部CSS文件,我需要做什么?我是Rails的新手,如果这是基本知识,请原谅我。 问题答案: 将CSS文件放在public / stylesheets中,然后使用: 链接到布局中的样式表或视图中的erb文件。 同样

  • 问题内容: 我有一个Spring Boot REST应用程序,它依赖于Firebase中完成的身份验证。 在客户端,Firebase生成令牌,借此在Spring Boot中,我需要验证。 但是代码处于回调模式,因此如何实现该函数以使其能够完成任务? 之后如何返回?? 问题答案: 这是我自己尝试回答我自己的问题 您也可以尝试以下代码 获取更多详细信息URL https://firebase.goog

  • 我已经定制了我的执行器/信息endpoint,并且我想使用来自头的信息授权对另一个服务的调用。 我在这里实现了InfoContributor:https://www.baeldung.com/spring-boot-info-acture-custom 我想接受方法中的请求头。对于任何用户定义的RESTendpoint,我都可以定义参数并访问头。 但不幸的是,的方法只采用一个参数。 如何访问方法中

  • 我正在尝试让一个简单的HTTP控制台应用程序作为Azure服务应用程序运行。它所做的只是在连接时返回OK。它在我的笔记本电脑上运行良好,我可以使用VS2019发布到Azure ok。问题是用于监听的前缀。 在我的笔记本电脑上,我可以使用超文本传输协议://: 80/; https://: 443/,但在Azure中,我得到一个错误:[例外]访问被拒绝。 本文https://github.com/p

  • 我在Azure的应用服务中使用了OpenCV,但是在安装扩展(pip安装opencv-python)时发生了错误。 似乎我可以在appservice上使用SSH登录,所以在安装模块后,使用SSH登录,然后我键入如下命令。 然后,我键入如下命令。 通过键入上述命令,我可以在Python中使用OpenCV。 我以为它会在写入requirements.txt和部署时自动安装pip,但部署或重启后操作系统