当前位置: 首页 > 知识库问答 >
问题:

使用Spring Boot侦听消息队列SQS不能与标准配置一起工作

邓宜年
2023-03-14

我无法使用Spring Boot和SQS创建works队列侦听器(消息已发送并显示在SQS ui中)

@messagemapping@sqslistener不工作

Java:11
Spring Boot:2.1.7
依赖关系:spring-cloud-aws-messaging

@Configuration
@EnableSqs
public class SqsConfig {

    @Value("#{'${env.name:DEV}'}")
    private String envName;

    @Value("${cloud.aws.region.static}")
    private String region;

    @Value("${cloud.aws.credentials.access-key}")
    private String awsAccessKey;

    @Value("${cloud.aws.credentials.secret-key}")
    private String awsSecretKey;

    @Bean
    public Headers headers() {
        return new Headers();
    }

    @Bean
    public MessageQueue queueMessagingSqs(Headers headers,
                                          QueueMessagingTemplate queueMessagingTemplate) {
        Sqs queue = new Sqs();
        queue.setQueueMessagingTemplate(queueMessagingTemplate);
        queue.setHeaders(headers);
        return queue;
    }

    private ResourceIdResolver getResourceIdResolver() {
        return queueName -> envName + "-" + queueName;
    }

    @Bean
    public DestinationResolver destinationResolver(AmazonSQSAsync amazonSQSAsync) {
        DynamicQueueUrlDestinationResolver destinationResolver = new DynamicQueueUrlDestinationResolver(
                amazonSQSAsync,
                getResourceIdResolver());
        destinationResolver.setAutoCreate(true);
        return destinationResolver;
    }

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQSAsync,
                                                         DestinationResolver destinationResolver) {
        return new QueueMessagingTemplate(amazonSQSAsync, destinationResolver, null);
    }

    @Bean
    public QueueMessageHandlerFactory queueMessageHandlerFactory() {
        QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
        MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
        messageConverter.setStrictContentTypeMatch(false);
        factory.setArgumentResolvers(Collections.singletonList(new PayloadArgumentResolver(messageConverter)));
        return factory;
    }

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSqs);
        factory.setMaxNumberOfMessages(10);
        factory.setWaitTimeOut(2);
        return factory;
    }

}
@RunWith(SpringJUnit4ClassRunner.class)
public class ListenTest {

    @Autowired
    private MessageQueue queue;

    private final String queueName = "test-queue-receive";

    private String result = null;

    @Test
    public void test_listen() {
        // given
        String data = "abc";

        // when
        queue.send(queueName, data).join();

        // then
        Awaitility.await()
                .atMost(10, TimeUnit.SECONDS)
                .until(() -> Objects.nonNull(result));

        Assertions.assertThat(result).equals(data);
    }

    @MessageMapping(value = queueName)
    public void receive(String data) {
        this.result = data;
    }
}

你觉得有什么不对劲吗?

我为示例创建了一个repo:(https://github.com/mmaryo/java-sqs-test)
在测试文件夹中,在'application.yml'中更改aws凭据
然后运行测试

共有1个答案

梁丘波鸿
2023-03-14

我在使用spring-cloud-aws-messaging包时也遇到了同样的问题,但随后我在@sqslistener注释中使用了队列URL而不是队列名称,结果它起了作用。

@SqsListener(value = { "https://full-queue-URL" }, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receive(String message) {
     // do something
}

在使用spring-cloud-starter-aws-messaging包时似乎可以使用队列名。如果您不想使用starter包,我相信有一些配置允许使用队列名而不是URL。

编辑:我注意到该区域被默认为us-west-2,尽管我在属性文件中列出了us-east-1。然后,我创建了一个RegionProvider bean,并在其中将区域设置为us-east-1,现在,当我在@SQSMessaging中使用队列名时,就会在框架代码中找到该队列名并将其正确解析为URL。

 类似资料:
  • 解决方案#3:使用AWS SQS标准队列和AWS ElasticCache(Redis或Memcached)。对于每条消息,“messageID”字段将保存在缓存服务器中,并在以后检查是否重复。存在意味着此消息已被处理。(顺便说一下,“MessageID”在缓存服务器中应该存在多长时间。AWS SQS文档没有提到消息可以重复到多长时间。)

  • 问题内容: 我正在使用EJB 3.1,并且想配置一个MDB来侦听多个队列。 我更喜欢通过XML定义队列名称,而其他通过注释定义。 能做到吗? 问题答案: 实例化后,MDB只能侦听在其目标ActivationConfigProperty中指定的资源,但是您 可以 为同一MDB创建具有不同目标的多个实例(在您的情况下为队列)。 在ejb-jar.xml中创建两个条目,它们具有不同的目的地和ejb-na

  • 我试图弄清楚如何配置jms侦听器来侦听AWS队列并在多个线程中处理消息(同时大约100个线程)。 下面是我的配置。 随着这种混乱,我不断收到以下错误: SQSMessage消费者-30秒后无法终止执行器服务消费者预取,一些正在运行的线程将立即关闭 此外,将消息发布到Amazon SQS实例需要20秒。 我尝试了NumberOfMessagesToPrefetch和CacheLevel的不同组合,但

  • 我有一个模块,它用以指定的时间间隔轮询AWS SQS队列,每次一条消息。方法如下: 接收并处理消息后,使用将其从队列中删除。 我已经创建了一个可执行的jar文件,它部署在大约40个实例中,并且正在积极地轮询队列。我能看到他们每个人都收到信息。但在AWS SQS控制台中,我只能在“飞行中的消息”栏中看到数字0、1、2或3。为什么即使有40多个不同的消费者正在从队列接收消息呢?此外,队列中可用的消息数

  • 兔子配置: 应用概述:每当gitRepository连接到我们的应用程序时,存储库名称就会成为交换名称,在这种情况下,然后存储库的每个分支都会创建自己的队列,这里有两个队列和。现在每次在开发分支中创建拉取请求时,我都需要将信息传递给开发队列,并且应该由特定的侦听器侦听,该侦听器应该仅注册用于开发。我看到了动态队列的示例,但我似乎找不到任何关于如何创建将使用不同线程执行的动态侦听器的示例,我如何实现

  • 我试图使用他们文档中提到的masstransit配置将SNS主题订阅到SQS队列。消息已发布,但不会出现在SQS队列中。SQS队列名称:“测试”,SNS主题名称:“kbbico手动替换”。