我有一个Spring Boot应用程序,希望从多个AWS SQS队列接收消息。这些队列都有自己的凭据(遗憾的是,我对此无能为力)。这些凭据都不能访问其他队列之一,它们都仅限于一个队列。
因为只有一个队列和凭证,所以很简单。我只需要提供作为AWSCredentialsProvider的凭据,并用启用SQS注释我的方法
但我不知道如何使用多个凭据来执行此操作。
SqsListener注释无法提供凭据、预配置的AmazonSqs对象或任何其他有用的内容。
我通过扩展CredentialsProvider或AmazonSqs客户端,寻找一种将队列映射到凭据的方法,但没有成功<我甚至尝试在AmazonHttpClient的头中注入credendials,但这也是不可能的。
我尝试创建手动侦听SQS队列所需的所有内容。但我一直在为SimpleMessageListenerContainer创建MessageHandler
所需的QueueMessageHandler只有在创建为bean并带有应用程序上下文时才能工作。否则,它将不会查找用SqsListener注释的方法<遗憾的是,我能找到的唯一教程或示例要么使用JMS,我想避免使用JMS,要么只使用带有一个队列的注释。
是否有其他方法可以为多个队列提供不同的凭据?
我的测试代码:
@Component
@Slf4j
public class TestOneQueueA {
public static final String QUEUE_A = "TestOneQueueA";
public TestOneQueueA(Cloud cloud, ResourceIdResolver resourceIdResolver) {
SqsServiceInfo serviceInfo = (SqsServiceInfo) cloud.getServiceInfo(QUEUE_A);
AWSStaticCredentialsProvider credentialsProvider =
new AWSStaticCredentialsProvider(new BasicAWSCredentials(serviceInfo.getAccessKey(),
serviceInfo.getSecretAccessKey()));
AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(serviceInfo.getRegion()).build();
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(client);
queueMessageHandlerFactory.setMessageConverters(Collections.singletonList(new MappingJackson2MessageConverter()));
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
queueMessageHandler.afterPropertiesSet(); // won't do anything because of no ApplicationContext
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(client);
factory.setResourceIdResolver(resourceIdResolver);
factory.setQueueMessageHandler(queueMessageHandler);
SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
try {
simpleMessageListenerContainer.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
simpleMessageListenerContainer.start();
simpleMessageListenerContainer.start(QUEUE_A); // fails with "Queue with name 'TestOneQueueA' does not exist"
}
@SqsListener(value = QUEUE_A, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
log.info("Received SQS Message: \nSubject: %s \n%s", subject, dto);
}
}
编辑:
在尝试了更多之后,我能够将我的Amazon SQS客户端注入两个单独的SimpleMessageListenerContainer
中。然后问题变成了QueueMessageHandler
。
如果我手动创建它,没有bean上下文,它根本不会查找任何带有@SqsListener
注释的方法。并且没有办法手动设置处理程序。
如果我将其创建为bean,它会查看每个bean的注释。因此它还会找到它不应该寻找的队列的方法。然后它会崩溃,因为凭据不起作用。
我无法找到一种方法来仅为单个SqsListener方法创建QueueMessageHandler
。
和SimpleMessageListenerContainer
除了QueueMessageHandler
之外不会接受任何东西。
您可以为希望与自定义@Qualifier
一起使用的帐户声明不同的@Bean
。假设您在两个不同的帐户中有两个SQS队列。然后声明两个类型为Amazon SQS
的bean。
@Qualifier("first")
@Bean public AmazonSQS amazonSQSClient() {
return AmazonSQSClient.builder()
.withCredentials(credentialsProvider())
.withRegion(Regions.US_EAST_1)
.build();
}
@Qualifier("second")
@Bean public AmazonSQS amazonSQSClient() {
return AmazonSQSClient.builder()
.withCredentials(anotherCredentialsProvider())
.withRegion(Regions.US_EAST_1)
.build();
}
然后在您的服务中,您可以@Autow的
它们。
@Autowired @Qualifier("second") private AmazonSQS sqsSecond;
在花了一些时间寻找更好的解决方案后,我坚持以下几点:
package test;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.AwsRegionProvider;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import test.TestDto;
import test.CustomQueueMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationSubject;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
@Component
public class TestQueue {
private static final String QUEUE_NAME = "TestQueue";
private static final Logger log = LoggerFactory.getLogger(TestQueue.class);
public TestQueue(AWSCredentialsProvider credentialsProvider, AwsRegionProvider regionProvider) {
AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(regionProvider.getRegion())
.build();
// custom QueueMessageHandler to initialize only this queue
CustomQueueMessageHandler queueMessageHandler = new CustomQueueMessageHandler();
queueMessageHandler.init(this);
queueMessageHandler.afterPropertiesSet();
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(client);
factory.setQueueMessageHandler(queueMessageHandler);
SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
try {
simpleMessageListenerContainer.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
simpleMessageListenerContainer.start();
}
@SqsListener(value = QUEUE_NAME, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
log.info("Received SQS Message: \nSubject: {} \n{}", subject, dto);
}
}
和自定义QueueMessageHandler
:
package test;
import java.util.Collections;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
public class CustomQueueMessageHandler extends QueueMessageHandler {
public void init(Object handler) {
detectHandlerMethods(handler);
}
}
CustomQueueMessageHandler
的唯一目的是传递一个对象,它应该在其中扫描SQS注释。由于我不使用Spring Context启动它,它不会在每个bean中搜索@SqsListener
注释。但所有初始化都隐藏在受保护的方法后面。这就是为什么我需要覆盖类,以访问这些init方法。
我不认为这是一个非常优雅的解决方案,手动创建所有AWS客户端内容,并调用bean init方法。但这是我能找到的唯一仍然可以访问AWS SQS库的所有功能的解决方案,例如转换传入消息和通知、删除策略、队列轮询(包括故障处理)等。
我最近开始学习Spring和spring-amqp,所以这个问题可能看起来很基本,所以请原谅我。 null 或者有一种方法可以Spring加载所有我的队列配置类,然后只使用如下所示的对象: 那么,如何在不执行每次的情况下获取确切队列的amqpTemplate呢? 每次请求到达我的服务时都做新的AnnotationConfigApplicationContext有什么害处?[我想为每个请求创建一个新
在我们的应用程序中,我们使用RabbitMQ和spring-amqp(1.4.3.发行版)。我们那里排了两个队。它们都配置了TTL(60000和100000)。当我们启动应用程序时,它会给出以下错误: [pool-4-thread-1]错误org.springFramework.amqp.rabbit.connection.cachingConnectionFactory-通道关闭:通道错误;协议
问题内容: 我有一个带有HornetQ的JBoss-6服务器和一个队列: 有一个不同的消费者(在不同的机器)连接到这个队列中,但只有一个 单一的 消费者是活动的时间。如果我关闭此使用者,则消息将立即由其他使用者之一处理。 由于我的消息需要一些耗时的处理,因此我希望多个使用者同时处理其唯一消息。 我记得在早期版本的JBoss中也有类似的情况,该设置可以正常工作。在Jboss-6中,消息传递系统运行良
我正在用spring security saml扩展实现一个多租户应用程序。 我为每个租户提供了一个服务提供商(SP)。所有SP都运行在与SP特定的第二级域公开的同一服务器上: sp1。myapp。com/myapi/1/ 在每个SP元数据文件中,我都配置了特定于租户的AssertionConsumerService。 当我测试SSO登录时,当SP端收到身份提供者(IDP)的响应时,我会得到一个K
我的查询是针对产品Red Hat AMQ7.x(我使用的是7.2),它基于Apache ActiveMQ Artemis和一个使用AMQP协议连接到队列的.NET客户机。 Artemis的一篇文章讨论了单播(点对点)、多播(发布-订阅)以及这些寻址的组合:https://activemq.apache.org/Artemis/docs/2.0.0/address-model.html 它没有详细说
我想知道,在Spring AMQP中,是否可以根据负载类型在多个类中接收来自同一队列的消息。 我知道在类中使用@RabbitListener注释,然后将@RabbitHandler放在方法上,但我希望在保持单个队列的同时将消息处理的复杂性拆分为多个类。 当前使用的版本:Spring AMQP v2.0.3以及RabbitMQ。