当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream@ServiceActivator异常时未向errorChannel发送消息

长孙谦
2023-03-14
spring.cloud.stream.bindings.gatewayOutput.destination=received
spring.cloud.stream.bindings.enrichingInput.destination=received
spring.cloud.stream.bindings.enrichingOutput.destination=enriched
spring.cloud.stream.bindings.redeemingInput.destination=enriched
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed
spring.cloud.stream.bindings.error.destination=errors12
spring.cloud.stream.bindings.errorInput.destination=errors12
spring.cloud.stream.bindings.errorOutput.destination=errors12
@Named
@Configuration
@EnableBinding(Channels.class)
@EnableIntegration
public class FulfillingServiceImpl extends AbstractBaseService implements
        FulfillingService {

    @Override
    @Audit(value = "annotatedEvent")
    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false")
    public void fulfill(TrivialRedemption redemption) throws Exception {

        logger.error("FULFILLED!!!!!!");

        throw new Exception("test exception");

    }
}
  • 关于errorChannel没有任何订阅者的投诉
  • Kafka生产者线程日志
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$ReceivingHandler:115 - org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler@2b461688 received message: GenericMessage [payload=byte[400], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18}] - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - preSend on channel 'fulfillingInput', message: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90[endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@64bce7ab] (fulfillingServiceImpl.fulfill.serviceActivator.handler) received message: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90[endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationEvaluationContext' - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationConversionService' - {}
2016-05-13 12:13:14 pool-6-thread-1 ERROR FulfillingServiceImpl$$EnhancerBySpringCGLIB$$9dad62:42 - FULFILLED!!!!!! - {}
2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3373691507, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=400 cap=400]), KafkaMessageMetadata [offset=17, nextOffset=18, Partition[topic='redeemed', id=0]] - {}
...
...
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='enriched', id=0]@18 - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='redeemed', id=0]@18 - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='errors12', id=0]@0 - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}

编辑:以下是我的channels类的内容:

public interface Channels {

    public static final String GATEWAY_OUTPUT = "gatewayOutput";

    public static final String ENRICHING_INPUT = "enrichingInput";
    public static final String ENRICHING_OUTPUT = "enrichingOutput";

    public static final String REDEEMING_INPUT = "redeemingInput";
    public static final String REDEEMING_OUTPUT = "redeemingOutput";

    public static final String FULFILLING_INPUT = "fulfillingInput";
    public static final String FULFILLING_OUTPUT = "fulfillingOutput";

    @Output(GATEWAY_OUTPUT)
    MessageChannel gatewayOutput();

    @Input(ENRICHING_INPUT)
    MessageChannel enrichingInput();

    @Output(ENRICHING_OUTPUT)
    MessageChannel enrichingOutput();

    @Input(REDEEMING_INPUT)
    MessageChannel redeemingInput();

    @Output(REDEEMING_OUTPUT)
    MessageChannel redeemingOutput();

    @Input(FULFILLING_INPUT)
    MessageChannel fulfillingInput();

    @Output(FULFILLING_OUTPUT)
    MessageChannel fulfillingOutput();

共有1个答案

乐正远航
2023-03-14

您不显示channels类,但是绑定器不知道您的“错误”通道是“特殊的”。

绑定器可以配置为重试,并将异常路由到死信主题;请参见1.0.0版本中的这个PR。

或者,您可以在服务激活器之前添加一个“中间流”网关--将其视为Java中的“try/catch”块:

@MessageEndpoint
public static class GatewayInvoker {

    @Autowired
    private ErrorHandlingGateway gw;

    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT)
    public void send(Message<?> message) {
        this.gw.send(message);
    }

}

@Bean
public GatewayInvoker gate() {
    return new GatewayInvoker();
}

@MessagingGateway(defaultRequestChannel = "toService", errorChannel = Channels.ERRORS)
public interface ErrorHandlingGateway {

    void send(Message<?> message);

}
 类似资料:
  • 我可以在日志中看到service activator已经注册并订阅了错误通道。一旦发生运行时异常,所有流都将停止并进入关闭模式。

  • 我有一个BE服务a,它正在使用假客户端向microservice B发送Rest JSON消息: 终点: Rest Endpoint正在向AWS Ses邮件或其他邮件提供商发送邮件。 问题是来自飞格的第一个呼叫可能需要5秒或更长时间。我需要使其异步,以便FE客户端不要等待邮件发送。 我如何可以使从飞度异步发出的Rest调用到超文本传输协议响应OK没有等待时间可以预期?是否有一些更好的解决方案来实现

  • 我在尝试更新 mongodb 中的数据时遇到以下异常。请帮我解决这个问题。 当我查看日志时,我会看到很多错误消息,就像下面的一条,其中驱动程序在连接到mongo时出现套接字错误。该站点仍在运行,不会在每个请求中都发生此错误,也不会在一个需要更长时间的操作中发生此错误。 我使用的版本是C#驱动程序:“2.10.2”和Azure Cosmos版本:3.6”。 向服务器发送消息时发生异常。--- 导致问

  • 主要内容:1 invokeOneway单向发送,1.1 invokeOnewayImpl单向调用,2 sendMessageSync同步发送,2.1 invokeSync同步调用,3 sendMessageAsync异步发送消息,3.1 invokeAsync异步调用,3.2 onExceptionImpl异常处理,4 NettyClientHandler处理服务端消息,4.1 processResponseCommand处理响应,基于RocketMQ release-4.9.3,深入的介绍了P

  • 我想使用firebase云消息将通知从我的应用程序发送到另一个应用程序。所以我使用这个方法retrieveFCMToken(forSenderID:senderid)来处理这个过程。我将以下代码添加到我的应用程序委托中: 这是我的应用程序代理: 我遵循这个场景:我有两个应用程序,分别是“A”和“B”。我想将通知从应用程序“A”发送到应用程序“B”。因此,我将应用程序A的发件人id放入应用程序B代理

  • 在行引发异常: 线程“main”javax.net.ssl.SSLHandShaker异常:Sun.Security.Validator.ValidatoreXception:PKIX路径构建失败:Sun.Security.Provider.CertPath.SunCertPathBuilderException:无法在Sun.Security.SSL.Alerts.GetSleXception(