当前位置: 首页 > 知识库问答 >
问题:

Spring Boot中具有不同凭据的多个AWS SQS队列

邬安邦
2023-03-14

我有一个Spring Boot应用程序,希望从多个AWS SQS队列接收消息。这些队列都有自己的凭据(遗憾的是,我对此无能为力)。这些凭据都不能访问其他队列之一,它们都仅限于一个队列。

因为只有一个队列和凭证,所以很简单。我只需要提供作为AWSCredentialsProvider的凭据,并用启用SQS注释我的方法
但我不知道如何使用多个凭据来执行此操作。

SqsListener注释无法提供凭据、预配置的AmazonSqs对象或任何其他有用的内容。

我通过扩展CredentialsProvider或AmazonSqs客户端,寻找一种将队列映射到凭据的方法,但没有成功<我甚至尝试在AmazonHttpClient的头中注入credendials,但这也是不可能的。

我尝试创建手动侦听SQS队列所需的所有内容。但我一直在为SimpleMessageListenerContainer创建MessageHandler
所需的QueueMessageHandler只有在创建为bean并带有应用程序上下文时才能工作。否则,它将不会查找用SqsListener注释的方法<遗憾的是,我能找到的唯一教程或示例要么使用JMS,我想避免使用JMS,要么只使用带有一个队列的注释。

是否有其他方法可以为多个队列提供不同的凭据?

我的测试代码:

@Component
@Slf4j
public class TestOneQueueA {

  public static final String QUEUE_A = "TestOneQueueA";

  public TestOneQueueA(Cloud cloud, ResourceIdResolver resourceIdResolver) {
    SqsServiceInfo serviceInfo = (SqsServiceInfo) cloud.getServiceInfo(QUEUE_A);
    AWSStaticCredentialsProvider credentialsProvider =
        new AWSStaticCredentialsProvider(new BasicAWSCredentials(serviceInfo.getAccessKey(),
            serviceInfo.getSecretAccessKey()));

    AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
        .withCredentials(credentialsProvider)
        .withRegion(serviceInfo.getRegion()).build();

    QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
    queueMessageHandlerFactory.setAmazonSqs(client);
    queueMessageHandlerFactory.setMessageConverters(Collections.singletonList(new MappingJackson2MessageConverter()));

    QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
    queueMessageHandler.afterPropertiesSet(); // won't do anything because of no ApplicationContext

    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(client);
    factory.setResourceIdResolver(resourceIdResolver);
    factory.setQueueMessageHandler(queueMessageHandler);

    SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    try {
      simpleMessageListenerContainer.afterPropertiesSet();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    simpleMessageListenerContainer.start();
    simpleMessageListenerContainer.start(QUEUE_A); // fails with "Queue with name 'TestOneQueueA' does not exist"
  }

  @SqsListener(value = QUEUE_A, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
  public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
    log.info("Received SQS Message: \nSubject: %s \n%s", subject, dto);
  }
}

编辑:

在尝试了更多之后,我能够将我的Amazon SQS客户端注入两个单独的SimpleMessageListenerContainer中。然后问题变成了QueueMessageHandler

如果我手动创建它,没有bean上下文,它根本不会查找任何带有@SqsListener注释的方法。并且没有办法手动设置处理程序。
如果我将其创建为bean,它会查看每个bean的注释。因此它还会找到它不应该寻找的队列的方法。然后它会崩溃,因为凭据不起作用。
我无法找到一种方法来仅为单个SqsListener方法创建QueueMessageHandler
SimpleMessageListenerContainer除了QueueMessageHandler之外不会接受任何东西。

共有2个答案

宦书
2023-03-14

您可以为希望与自定义@Qualifier一起使用的帐户声明不同的@Bean。假设您在两个不同的帐户中有两个SQS队列。然后声明两个类型为Amazon SQSbean

@Qualifier("first")
@Bean public AmazonSQS amazonSQSClient() {
    return AmazonSQSClient.builder()
            .withCredentials(credentialsProvider())
            .withRegion(Regions.US_EAST_1)
            .build();
}

@Qualifier("second")
@Bean public AmazonSQS amazonSQSClient() {
    return AmazonSQSClient.builder()
            .withCredentials(anotherCredentialsProvider())
            .withRegion(Regions.US_EAST_1)
            .build();
}

然后在您的服务中,您可以@Autow的它们。

@Autowired @Qualifier("second") private AmazonSQS sqsSecond;
益明朗
2023-03-14

在花了一些时间寻找更好的解决方案后,我坚持以下几点:

package test;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.AwsRegionProvider;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import test.TestDto;
import test.CustomQueueMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationSubject;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;

@Component
public class TestQueue {

  private static final String QUEUE_NAME = "TestQueue";
  private static final Logger log = LoggerFactory.getLogger(TestQueue.class);

  public TestQueue(AWSCredentialsProvider credentialsProvider, AwsRegionProvider regionProvider) {
    AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
        .withCredentials(credentialsProvider)
        .withRegion(regionProvider.getRegion())
        .build();

    // custom QueueMessageHandler to initialize only this queue
    CustomQueueMessageHandler queueMessageHandler = new CustomQueueMessageHandler();
    queueMessageHandler.init(this);
    queueMessageHandler.afterPropertiesSet();

    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(client);
    factory.setQueueMessageHandler(queueMessageHandler);

    SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    try {
      simpleMessageListenerContainer.afterPropertiesSet();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    simpleMessageListenerContainer.start();
  }

  @SqsListener(value = QUEUE_NAME, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
  public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
    log.info("Received SQS Message: \nSubject: {} \n{}", subject, dto);
  }
}

和自定义QueueMessageHandler

package test;

import java.util.Collections;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;

public class CustomQueueMessageHandler extends QueueMessageHandler {

  public void init(Object handler) {
    detectHandlerMethods(handler);
  }
}

CustomQueueMessageHandler的唯一目的是传递一个对象,它应该在其中扫描SQS注释。由于我不使用Spring Context启动它,它不会在每个bean中搜索@SqsListener注释。但所有初始化都隐藏在受保护的方法后面。这就是为什么我需要覆盖类,以访问这些init方法。

我不认为这是一个非常优雅的解决方案,手动创建所有AWS客户端内容,并调用bean init方法。但这是我能找到的唯一仍然可以访问AWS SQS库的所有功能的解决方案,例如转换传入消息和通知、删除策略、队列轮询(包括故障处理)等。

 类似资料:
  • 我最近开始学习Spring和spring-amqp,所以这个问题可能看起来很基本,所以请原谅我。 null 或者有一种方法可以Spring加载所有我的队列配置类,然后只使用如下所示的对象: 那么,如何在不执行每次的情况下获取确切队列的amqpTemplate呢? 每次请求到达我的服务时都做新的AnnotationConfigApplicationContext有什么害处?[我想为每个请求创建一个新

  • 在我们的应用程序中,我们使用RabbitMQ和spring-amqp(1.4.3.发行版)。我们那里排了两个队。它们都配置了TTL(60000和100000)。当我们启动应用程序时,它会给出以下错误: [pool-4-thread-1]错误org.springFramework.amqp.rabbit.connection.cachingConnectionFactory-通道关闭:通道错误;协议

  • 问题内容: 我有一个带有HornetQ的JBoss-6服务器和一个队列: 有一个不同的消费者(在不同的机器)连接到这个队列中,但只有一个 单一的 消费者是活动的时间。如果我关闭此使用者,则消息将立即由其他使用者之一处理。 由于我的消息需要一些耗时的处理,因此我希望多个使用者同时处理其唯一消息。 我记得在早期版本的JBoss中也有类似的情况,该设置可以正常工作。在Jboss-6中,消息传递系统运行良

  • 我正在用spring security saml扩展实现一个多租户应用程序。 我为每个租户提供了一个服务提供商(SP)。所有SP都运行在与SP特定的第二级域公开的同一服务器上: sp1。myapp。com/myapi/1/ 在每个SP元数据文件中,我都配置了特定于租户的AssertionConsumerService。 当我测试SSO登录时,当SP端收到身份提供者(IDP)的响应时,我会得到一个K

  • 我的查询是针对产品Red Hat AMQ7.x(我使用的是7.2),它基于Apache ActiveMQ Artemis和一个使用AMQP协议连接到队列的.NET客户机。 Artemis的一篇文章讨论了单播(点对点)、多播(发布-订阅)以及这些寻址的组合:https://activemq.apache.org/Artemis/docs/2.0.0/address-model.html 它没有详细说

  • 我想知道,在Spring AMQP中,是否可以根据负载类型在多个类中接收来自同一队列的消息。 我知道在类中使用@RabbitListener注释,然后将@RabbitHandler放在方法上,但我希望在保持单个队列的同时将消息处理的复杂性拆分为多个类。 当前使用的版本:Spring AMQP v2.0.3以及RabbitMQ。