当前位置: 首页 > 知识库问答 >
问题:

Spring集成DSL与RabbitMQ中的SimpleMessageListenerContainer AbstractMethodError

巫晋鹏
2023-03-14
@Configuration
@IntegrationComponentScan
public class RabbitConfig {

@Autowired // TODO constructor!
private ConnectionFactory connectionFactory;

public RabbitConfig(
        @Value("${article.inbound.queue}") String queueName,
        @Value("${article.inbound.exchange}") String exchangeName,
        @Value("${article.inbound.routingkey}") String routingKey) {
    this.queueName = queueName;
    this.exchangeName = exchangeName;
    this.routingKey = routingKey;
}

@Bean
Exchange exchange() {
    return ExchangeBuilder
            .topicExchange(this.exchangeName)
            .durable(true)
            .build();
}

@Bean
Queue queue() {
    return QueueBuilder.durable(queueName).build();
}

@Bean
Binding binding() {
    return BindingBuilder.bind(queue())
            .to(exchange())
            .with(routingKey)
            .noargs();
}

@Bean
public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
public SimpleMessageListenerContainer articleListenerContainer(
        ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(queue());
    container.setMessageConverter(jsonMessageConverter());
    return container;
}
@Bean
IntegrationFlow fromMessageBroker(SimpleMessageListenerContainer messageListener) {
    return IntegrationFlows.from(Amqp.inboundAdapter(messageListener))
            .log()
            .handle(message -> {
                final MessageHeaders headers = message.getHeaders();
                final Object assetId = headers.get("assetId");
                log.info(assetId);
            })
            .get();
}
2018-09-20 13:11:08.240  INFO 49400 --- [           main] c.p.ftppush.article.MessageConsumer      : Started MessageConsumer in 1.743 seconds (JVM running for 2.386)
2018-09-20 13:11:12.309  INFO 49400 --- [erContainer#0-1] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=byte[0], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=article.original.orgn.123, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b7db656, amqp_receivedExchange=que.article.content.pf.normal.trigger, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1b7db656, amqp_deliveryTag=1, assetId=1qh22m9027k6d1jz29tsi510x5, amqp_consumerQueue=exc.article.content, amqp_redelivered=false, id=0d04f22a-991f-000b-63f5-5cb087b915ab, amqp_consumerTag=amq.ctag-bK2-KaIxWpk57HJeQ_38AQ, timestamp=1537441872308}]
2018-09-20 13:11:12.310  INFO 49400 --- [erContainer#0-1] com.perform.ftppush.article.Flow         : 1qh22m9027k6d1jz29tsi510x5
2018-09-20 13:11:12.317 ERROR 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer thread error, thread abort.

java.lang.AbstractMethodError: org.springframework.integration.channel.interceptor.WireTap.postSend(Lorg/springframework/messaging/Message;Lorg/springframework/messaging/MessageChannel;Z)V
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.postSend(AbstractMessageChannel.java:607) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:460) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:227) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:45) ~[spring-messaging-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:497) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:465) ~[spring-integration-core-5.0.6.RELEASE.jar:5.0.6.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway.access$1000(AmqpInboundGateway.java:66) ~[spring-integration-amqp-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway$Listener.process(AmqpInboundGateway.java:315) ~[spring-integration-amqp-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway$Listener.onMessage(AmqpInboundGateway.java:263) ~[spring-integration-amqp-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) ~[spring-rabbit-2.0.4.RELEASE.jar:2.0.4.RELEASE]
    at java.base/java.lang.Thread.run(Thread.java:844) [na:na]

2018-09-20 13:11:12.322 ERROR 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2018-09-20 13:11:12.322  INFO 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2018-09-20 13:11:12.322  INFO 49400 --- [erContainer#0-1] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.

正如您在日志中看到的那样,流到了最后,然后这个容器崩溃了。不幸的是,异常跟踪中没有有用的信息。我在想为什么会这样。我试图将该流中的消息重定向到MessageChannel中,然后处理它们,但没有帮助。

共有1个答案

凌志学
2023-03-14

我将检查是否有相同版本的库,例如,在stacktrace中,您可能会发现spring-messaging与Spring-Integration的版本不同。也许这就是问题所在?

如果您可以发布您的pom.xml或其他构建配置,我们可能会在需要时进行更多的研究。

您可以在这里找到对此类错误的一些解释https://www.pixelstech.net/article/1469241003-java-abstractmethoderror-explained-and-expressioned

 类似资料:
  • 我无法解决这个问题,现在已经坚持了很长时间。我是一个spring-integration-dsl的初学者,任何帮助都将非常感谢。

  • 我已经建立了一个简单的Spring集成流程,该流程由以下步骤组成: 然后定期轮询一个rest api 对有效载荷做一些处理 并将其置于Kafka主题上。 请遵守以下代码: 这非常有效,然而,我正在努力想出一些好的测试。 我应该如何模拟外部RESTAPI

  • 我有一个要求,我需要保存/缓冲在通道上接收到的消息,并根据消息数量或超时意味着1分钟内没有接收到消息而持久化在数据库中。有没有办法在Spring集成中实现这一点

  • 我已经使用最新的可用版本建立了一个新的Spring Boot Spring Integration Spring Integration Java DSL项目。项目构建正常,但当我运行应用程序时,我得到: 当前使用的依赖项如下: 错误可能是由于jar版本的错误组合吗?我不确定如何调试此错误。