我想使用Spring Kafka API实现一个有状态侦听器。
鉴于以下情况:
你是对的;每个容器有一个侦听器实例(不管是配置为@kafkalistener
还是配置为messageListener
)。
一种解决办法是使用原型范围的MessageListener
和n个KafkaMessageListenerContainer
bean(每个bean有1个线程)。
然后,每个容器将获得它自己的侦听器实例。
不过,通常使用无状态bean更好。
编辑
我发现了另一个使用SimpleThreadScope
的解决方法...
@SpringBootApplication
public class So51658210Application {
public static void main(String[] args) {
SpringApplication.run(So51658210Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template, ApplicationContext context,
KafkaListenerEndpointRegistry registry) {
return args -> {
template.send("so51658210", 0, "", "foo");
template.send("so51658210", 1, "", "bar");
template.send("so51658210", 2, "", "baz");
template.send("so51658210", 0, "", "foo");
template.send("so51658210", 1, "", "bar");
template.send("so51658210", 2, "", "baz");
};
}
@Bean
public ActualListener actualListener() {
return new ActualListener();
}
@Bean
@Scope("threadScope")
public ThreadScopedListener listener() {
return new ThreadScopedListener();
}
@Bean
public static CustomScopeConfigurer scoper() {
CustomScopeConfigurer configurer = new CustomScopeConfigurer();
configurer.addScope("threadScope", new SimpleThreadScope());
return configurer;
}
@Bean
public NewTopic topic() {
return new NewTopic("so51658210", 3, (short) 1);
}
public static class ActualListener {
@Autowired
private ObjectFactory<ThreadScopedListener> listener;
@KafkaListener(id = "foo", topics = "so51658210")
public void listen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
this.listener.getObject().doListen(in, partition);
}
}
public static class ThreadScopedListener {
private void doListen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(in + ":"
+ Thread.currentThread().getName() + ":"
+ this.hashCode() + ":"
+ partition);
}
}
}
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
本文向大家介绍iOS CoreTelephony 实现监听通话状态,包括了iOS CoreTelephony 实现监听通话状态的使用技巧和注意事项,需要的朋友参考一下 在程序中如果需要监听电话状态,可以引入CoreTelephony框架,这个框架包含了电话相关的API,可以实现监测来电,查看运营商信息等功能。下面就是具体的实现监测来电的代码。一定要把center写成一个单独的属性,并且是强引用(s
问题内容: 暂时禁用消息侦听器的好方法是什么?我要解决的问题是: 消息侦听器接收到JMS消息 尝试处理该消息时出现错误。 我等待系统再次准备就绪,以便能够处理该消息。 在系统准备就绪之前,我不再需要任何消息,因此… …我想禁用消息监听器。 我的系统已准备好再次处理。 处理失败的消息,并确认JMS消息。 再次启用消息监听器。 现在,我正在使用Sun App Server。我通过在MessageCon
本文向大家介绍Android BroadcastReceiver实现网络状态实时监听,包括了Android BroadcastReceiver实现网络状态实时监听的使用技巧和注意事项,需要的朋友参考一下 前言:最近公司项目重构,为了提高用户的体验,项目中要求添加当前网络状态的实时监听,以便在无网络状态时给用户友好的提醒并修改UI界面。本文将介绍使用四大组件之一的BroadcastReceiver实
我正在阅读这里的文档https://docs.spring.io/spring-kafka/docs/2.2.6.release/reference/html/#retily-deliveries,我不知道用批侦听器实现有状态重试的正确方法是什么 文档中说,“没有为批处理消息侦听器提供重试适配器,因为框架不知道在批处理的哪个地方发生了故障”。 这对我的用例不是问题,因为我只想重试整个批处理。 文档
本文向大家介绍Android scrollview如何监听滑动状态,包括了Android scrollview如何监听滑动状态的使用技巧和注意事项,需要的朋友参考一下 ScrollView 视图的滚动过程,其实是在不断修改原点坐标。当手指触摸后,ScrollView会暂时拦截触摸事件,使用一个计时器。假如在计时器到点后没有发生手指移动事件,那么ScrollView发送tracking events
只有监听设备消息后,在就收到消息数据才会返回消息内容,否则,不返回接收的消息内容。 请求方式: "|4|2|2|\r" 返回值: "|4|2|2|1|\r" 监听成功 Arduino样例: softSerial.print("|4|2|2|\r");