当前位置: 首页 > 知识库问答 >
问题:

如何使用Spring Kafka实现有状态消息监听器?

姬和歌
2023-03-14

我想使用Spring Kafka API实现一个有状态侦听器。

鉴于以下情况:

  • ConcurrentKafkaListenerContainerFactory,并发设置为“n”
  • Spring@Service类上的@KafKalistener注释方法

共有1个答案

黄丰
2023-03-14

你是对的;每个容器有一个侦听器实例(不管是配置为@kafkalistener还是配置为messageListener)。

一种解决办法是使用原型范围的MessageListener和n个KafkaMessageListenerContainerbean(每个bean有1个线程)。

然后,每个容器将获得它自己的侦听器实例。

不过,通常使用无状态bean更好。

编辑

我发现了另一个使用SimpleThreadScope的解决方法...

@SpringBootApplication
public class So51658210Application {

    public static void main(String[] args) {
        SpringApplication.run(So51658210Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, ApplicationContext context,
            KafkaListenerEndpointRegistry registry) {
        return args -> {
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
        };
    }

    @Bean
    public ActualListener actualListener() {
        return new ActualListener();
    }

    @Bean
    @Scope("threadScope")
    public ThreadScopedListener listener() {
        return new ThreadScopedListener();
    }

    @Bean
    public static CustomScopeConfigurer scoper() {
        CustomScopeConfigurer configurer = new CustomScopeConfigurer();
        configurer.addScope("threadScope", new SimpleThreadScope());
        return configurer;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so51658210", 3, (short) 1);
    }

    public static class ActualListener {

        @Autowired
        private ObjectFactory<ThreadScopedListener> listener;

        @KafkaListener(id = "foo", topics = "so51658210")
        public void listen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            this.listener.getObject().doListen(in, partition);
        }

    }

    public static class ThreadScopedListener {

        private void doListen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            System.out.println(in + ":"
                    + Thread.currentThread().getName() + ":"
                    + this.hashCode() + ":"
                    + partition);
        }

    }

}
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
 类似资料:
  • 本文向大家介绍iOS CoreTelephony 实现监听通话状态,包括了iOS CoreTelephony 实现监听通话状态的使用技巧和注意事项,需要的朋友参考一下 在程序中如果需要监听电话状态,可以引入CoreTelephony框架,这个框架包含了电话相关的API,可以实现监测来电,查看运营商信息等功能。下面就是具体的实现监测来电的代码。一定要把center写成一个单独的属性,并且是强引用(s

  • 问题内容: 暂时禁用消息侦听器的好方法是什么?我要解决的问题是: 消息侦听器接收到JMS消息 尝试处理该消息时出现错误。 我等待系统再次准备就绪,以便能够处理该消息。 在系统准备就绪之前,我不再需要任何消息,因此… …我想禁用消息监听器。 我的系统已准备好再次处理。 处理失败的消息,并确认JMS消息。 再次启用消息监听器。 现在,我正在使用Sun App Server。我通过在MessageCon

  • 本文向大家介绍Android BroadcastReceiver实现网络状态实时监听,包括了Android BroadcastReceiver实现网络状态实时监听的使用技巧和注意事项,需要的朋友参考一下 前言:最近公司项目重构,为了提高用户的体验,项目中要求添加当前网络状态的实时监听,以便在无网络状态时给用户友好的提醒并修改UI界面。本文将介绍使用四大组件之一的BroadcastReceiver实

  • 我正在阅读这里的文档https://docs.spring.io/spring-kafka/docs/2.2.6.release/reference/html/#retily-deliveries,我不知道用批侦听器实现有状态重试的正确方法是什么 文档中说,“没有为批处理消息侦听器提供重试适配器,因为框架不知道在批处理的哪个地方发生了故障”。 这对我的用例不是问题,因为我只想重试整个批处理。 文档

  • 本文向大家介绍Android scrollview如何监听滑动状态,包括了Android scrollview如何监听滑动状态的使用技巧和注意事项,需要的朋友参考一下 ScrollView 视图的滚动过程,其实是在不断修改原点坐标。当手指触摸后,ScrollView会暂时拦截触摸事件,使用一个计时器。假如在计时器到点后没有发生手指移动事件,那么ScrollView发送tracking events

  • 只有监听设备消息后,在就收到消息数据才会返回消息内容,否则,不返回接收的消息内容。 请求方式: "|4|2|2|\r" 返回值: "|4|2|2|1|\r" 监听成功 Arduino样例: softSerial.print("|4|2|2|\r");