我们的应用程序(spring boot,spring-cloud-stream)侦听多个Kafka主题(有3个分区的TOPIC_A,有1个分区的TOPIC_B,有10个分区的TOPIC_C)即3@StreamListener方法。
@StreamListener(TopicASink.INPUT)
public void processTopicA(Message<String> msg) {
logger.info("** recieved message: {} ", msg.getPayload());
// do some processing
}
@StreamListener(TopicBSink.INPUT)
public void processTopicB(Message<String> msg) {
logger.info("** recieved message: {} ", msg.getPayload());
// do some processing
}
@StreamListener(TopicCSink.INPUT)
public void processTopicC(Message<String> msg) {
logger.info("** recieved message: {} ", msg.getPayload());
// do some processing
}
我们需要定制错误处理和重试机制,因此通过配置ConcurrentKafkaListenerContainerFactory bean来实现这一点。
@Bean
public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory(ConsumerFactory<Object,Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(2); // we need to customize this per topic based on number of partitions
factory.setConsumerFactory(consumerFactory);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(10));
factory.setRetryTemplate(retryTemplate);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, 10)));
return factory;
}
使用下面共享的反射解决方案尝试解决方案后的堆栈跟踪
o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking com.jta.poc.kafkapoc.KafkaStreamPocApplication$MessageProcessor#processInput[1 args]; nested exception is com.jta.poc.kafkapoc.MyNewRetryableException, failedMessage=GenericMessage [payload=byte[35], headers={kafka_timestampType=CREATE_TIME, kafka_receivedTopic=new_input_topic, spanTraceId=e3382bf49eaa5343, spanId=e3382bf49eaa5343, nativeHeaders={spanTraceId=[e3382bf49eaa5343], spanId=[efc90644fc4c7dee], spanSampled=[0], X-B3-TraceId=[e3382bf49eaa5343], X-B3-SpanId=[efc90644fc4c7dee], X-B3-ParentSpanId=[e3382bf49eaa5343], spanParentSpanId=[e3382bf49eaa5343], X-B3-Sampled=[0]}, kafka_offset=26, X-B3-SpanId=e3382bf49eaa5343, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2a011bf8, X-B3-Sampled=0, X-B3-TraceId=e3382bf49eaa5343, id=3c86f652-f16e-2f59-1a59-f3d8601849f0, kafka_receivedPartitionId=1, spanSampled=0, kafka_receivedTimestamp=1586250896206, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = new_input_topic, partition = 1, offset = 26, CreateTime = 1586250896206, serialized key size = -1, serialized value size = 35, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@68df9f80), contentType=application/json, timestamp=1586274368357}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1071)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1051)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:998)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:866)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:724)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.jta.poc.kafkapoc.MyNewRetryableException
at com.jta.poc.kafkapoc.KafkaStreamPocApplication$MessageProcessor.consumeMessage(KafkaStreamPocApplication.java:164)
at com.jta.poc.kafkapoc.KafkaStreamPocApplication$MessageProcessor.lambda$processInput$0(KafkaStreamPocApplication.java:107)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164)
at com.jta.poc.kafkapoc.KafkaStreamPocApplication$MessageProcessor.processInput(KafkaStreamPocApplication.java:105)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 29 more
在此上下文中不使用容器工厂。
添加ListenerContainerCustomizer
@bean
。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
return (container, destination, group) -> { ... };
}
如您所见,您获得了对容器、目标名称和组的引用,这样您就可以确定调用它的绑定。
/**
* If a single bean of this type is in the application context, listener containers
* created by the binder can be further customized after all the properties are set. For
* example, to configure less-common properties.
*
* @param <T> container type
* @author Gary Russell
* @author Oleg Zhurakousky
* @since 2.1
*/
@FunctionalInterface
public interface ListenerContainerCustomizer<T> {
/**
* Configure the container that is being created for the supplied queue name and
* consumer group.
* @param container the container.
* @param destinationName the destination name.
* @param group the consumer group.
*/
void configure(T container, String destinationName, String group);
}
此外,Boot2.0已经寿终正寝,自去年4月以来一直不受支持;所以你真的应该升级。
@Bean
public SmartLifecycle bindingFixer(BindingService bindingService) {
return new SmartLifecycle() {
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
@Override
public void stop() {
// no op
}
@Override
public void start() {
@SuppressWarnings("unchecked")
Map<String, Binding> consumers = (Map<String, Binding>) new DirectFieldAccessor(bindingService)
.getPropertyValue("consumerBindings");
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler();
((ConcurrentMessageListenerContainer<?, ?>) new DirectFieldAccessor(consumers.get("input"))
.getPropertyValue("lifecycle.messageListenerContainer")).getContainerProperties()
.setErrorHandler(errorHandler);
}
@Override
public boolean isRunning() {
return false;
}
@Override
public void stop(Runnable callback) {
callback.run();
}
@Override
public boolean isAutoStartup() {
return true;
}
};
我使用的是Spring Kafka 2.2.7,我已经将配置为,并使用消费消息,一切都按预期运行。 我想添加一个来记录所有消耗的消息,但发现很难配置它。留档指出可以在容器上设置RecordInterceptor,但我不确定如何获取容器的实例。 从版本2.2.7开始,您可以向侦听器容器添加RecordInterceptor;它将在调用允许检查或修改记录的侦听器之前被调用。 我查阅了Spring文档,
有没有办法通过添加额外的属性源来覆盖Spring Boot的PropertySourcesPropertyResolver并扩展Externalize配置? 我试图做的是在当前列表中添加另一个属性源,并能够使用当前机制覆盖此属性。为了扩展PropertySourcesPropertyResolver,因此当Spring映射用@ConfigurationProperties注释的类的属性并请求密钥时
当我使用concurrentKafkaListenerContainerFactory时,有什么方法可以设置主题吗?我根本不想做任何注释。
我如何可以映射和设置索引0为true和其他为false onClick反应js这个地图功能显示所有门票的详细信息,这个想法是当用户单击按钮显示每张门票的详细信息时,我如何根据门票的数量返回按钮,然后点击显示一张门票详细信息,我如何做到这一点,请提供任何建议或解决方案
设置轻应用入口、查看和配置开发者权限等,在菜单栏设置完成即可。群发消息、素材管理、用户留言、自动回复、菜单管理的设置与订阅号相同。
例如,我有一个数据框架: 我希望在时将value 1替换为。所以我想要的结果是: 我可以通过以下代码实现它: 我认为数据量大的时候速度太慢了。我通过<code>来尝试它。loc,但我不知道如何设置值。 我如何通过<code>实现它。loc或其他简单方法?提前谢谢。