当前位置: 首页 > 知识库问答 >
问题:

@kafkaListener的范围

林浩漫
2023-03-14

我只想了解@kafkaListener的范围是什么,原型还是单例。在单个主题的多个消费者的情况下,返回的是单个实例还是多个实例。在我的情况下,我有多个客户订阅单个主题并获得报告。我只是想知道如果

>

  • 多个客户希望同时查询报告。在我的例子中,我在成功使用消息后关闭容器,但同时如果其他人想要获取报告,则容器应该打开。

    如何将作用域更改为与容器的id相关联的原型(如果不是),以便每次都可以生成单独的实例。

    @KafkaListener(id = "id1", topics = "testTopic" )
     public void listen() {
        // code goes here
    }
    
  • 共有2个答案

    长孙深
    2023-03-14

    有关如何使用原型范围的@KafkaListenerbean的示例,请参见此答案。

    沈乐邦
    2023-03-14

    为所有使用线程调用一个侦听器实例。

    注释@KafkaListener不在原型范围内,此注释也不可能。

    4.1.10. Thread Safety
    
    When using a concurrent message listener container, a single listener instance is invoked on all consumer threads. Listeners, therefore, need to be thread-safe, and it is preferable to use stateless listeners. If it is not possible to make your listener thread-safe or adding synchronization would significantly reduce the benefit of adding concurrency, you can use one of a few techniques:
    
        Use n containers with concurrency=1 with a prototype scoped MessageListener bean so that each container gets its own instance (this is not possible when using @KafkaListener).
    
        Keep the state in ThreadLocal<?> instances.
    
        Have the singleton listener delegate to a bean that is declared in SimpleThreadScope (or a similar scope).
    
    To facilitate cleaning up thread state (for the second and third items in the preceding list), starting with version 2.2, the listener container publishes a ConsumerStoppedEvent when each thread exits. You can consume these events with an ApplicationListener or @EventListener method to remove ThreadLocal<?> instances or remove() thread-scoped beans from the scope. Note that SimpleThreadScope does not destroy beans that have a destruction interface (such as DisposableBean), so you should destroy() the instance yourself.
        By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective. 
    

    https://docs.spring.io/spring-kafka/reference/html/

    ==已编辑===

    让我们选择第三个选项(删除SimpleThreadScope并授权给它)

    注册SimpleThreadScope。它不会自动拾取。您需要注册如下:

    @Bean
    public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
        return new BeanFactoryPostProcessor() {
            @Override
            public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    
                beanFactory.registerScope("thread", new SimpleThreadScope());
            }
        };
    }
    

    使用scopeName=“thread”创建组件

        @Component
        @Scope(scopeName = "thread", proxyMode = ScopedProxyMode.TARGET_CLASS)
        public class KafkaDelegate{
    
    
         public void handleMessageFromKafkaListener(String message){
      
                 //Do some stuff here with Message
        }
    }
    

    创建@服务

    public class KafkaListenerService{
    
    
        @Autowired
        private KafkaDelegate kafkaDelegate;
    
        
        @KafkaListener(id = "id1", topics = "testTopic" )
        public void listen(String message) {
            kafkaDelete.handleMessageFromKafkaListener(message);
        }
    
    }
    

    另一个示例:如何使用SpringKafka实现有状态消息侦听器?

     类似资料:
    • 我使用的是spring-boot 2.3.2。使用spring-kafka->2.5.4发布。release kafka-clients->2.5.0 我有以下简单的监听器 null 如果我使用 然后它就会失败,不再有异常循环

    • 我正在与spring boot spring@KafkaListener合作。我期望的行为是:我的Kafka侦听器以10个线程读取消息。因此,如果其中一个线程挂起,其他消息将继续读取和处理消息。 我定义了bean of 和spring启动配置: 我看到所有配置都能正常工作,我在jmx中看到了我的10个线程: 但是我做了这样的测试: 如果版本是 也许我的期望不是真的,这是Kafka听众的正确行为。请

    • 我正在编写一个应用程序(Spring Kotlin),它可以获取Kafka的信息。如果我在声明@KafkaListener时设置autoStartup=“true”,则应用程序可以正常运行,但前提是代理可用。当代理不可用时,应用程序在启动时崩溃。这是不受欢迎的行为。应用程序必须工作并执行其他功能。 为了避免应用程序在启动时崩溃,另一个主题建议在声明@KafkaListener时设置autoStar

    • 我试图在不使用@Kafkalistener的情况下编写kafka consumer,下面是我用于配置侦听器的代码行: 在这里,我如何配置topic和listener方法,我的consumer类可以有多个方法。 另外,我想知道在将@kafkalistener与Kafka流一起使用时是否会遇到任何潜在问题。 附言:我不想使用@KafkaListener。

    • 我正在运行spring boot,KafkaListener是我的客户。问题是我们如何从失败的kafka配置中恢复,并避免应用程序在退出代码为0的过程结束时停止。例如,不正确的配置可能是不正确的endpointurl。如果无法访问Kafka服务器,也会出现同样的情况。因此,在任何情况下,KafkaListner进程都不应该杀死服务器。 ontext.java:895应用程序上下文异常:未能启动be

    • 我想创建一个并发的,它可以处理多个主题,每个主题都有不同数量的分区。 我注意到,对于大多数分区的主题,Spring Kafka每个分区只初始化一个使用者。 示例:我已经将并发设置为8。我得到了一个听以下主题的。主题A有最多的分区-5,所以Spring-Kafka初始化了5个消费者。我期望Spring-Kafka初始化8个消费者,这是根据我的并发属性允许的最大值。 主题A有5个分区 没有初始化更多消