我试图用SQS FIFO队列探索SNS FIFO主题,这就是我刚才尝试的。我创建了SNS FIFO主题和SQS FIFO队列,并将FIFO队列订阅到FIFO主题。根据文档,对于上述设置,每当我们将消息发布到SNS FIFO队列时,它都应该将该消息扇出到SQS队列,但它没有发生。我能够获得PublishResult#getMessageId()表示发布部分正在成功进行,但队列中没有任何消息。由于SNS FIFO主题不支持电子邮件协议订阅,因此我可以断言此发布子架构的唯一方法是轮询队列中的消息。由于没有发生扇出,队列似乎总是空的。
完整的代码块:
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.CreateTopicRequest;
import com.amazonaws.services.sns.model.CreateTopicResult;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.UUID;
class FifoTopicsITest {
@Test
void test() {
final String topicName = UUID.randomUUID().toString().substring(15);
//creating sns client
AmazonSNS amazonSNS = AmazonSNSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
"<accessKey>", "<secretKey>")))
.withEndpointConfiguration(new AwsClientBuilder
.EndpointConfiguration("https://sns.us-west-1.amazonaws.com",
"us-west-1")).build();
//creating sqs client
AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
"<accessKey>", "<secretKey>")))
.withEndpointConfiguration(new AwsClientBuilder
.EndpointConfiguration("https://sqs.us-west-1.amazonaws.com",
"us-west-1")).build();
//creating SNS topic
CreateTopicRequest createTopicRequest = new CreateTopicRequest().withName(topicName + ".fifo");
createTopicRequest
.addAttributesEntry("FifoTopic", "true")
.addAttributesEntry("ContentBasedDeduplication", "false");
CreateTopicResult topicResult = amazonSNS.createTopic(createTopicRequest);
String topicArn = topicResult.getTopicArn();
//creating dead-letter sqs queue
CreateQueueRequest createDLQQueueRequest = new CreateQueueRequest();
createDLQQueueRequest.addAttributesEntry("FifoQueue", "true");
createDLQQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
createDLQQueueRequest.withQueueName(topicName + "_DLQ_" + ".fifo");
CreateQueueResult createDeadLetterQueueResult = amazonSQS.createQueue(createDLQQueueRequest);
//getting ARN value of dead-letter queue
GetQueueAttributesResult getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(createDeadLetterQueueResult.getQueueUrl())
.withAttributeNames("QueueArn"));
String deleteQueueArn = getQueueAttributesResult.getAttributes().get("QueueArn");
//creating sqs queue
CreateQueueRequest createQueueRequest = new CreateQueueRequest();
createQueueRequest.addAttributesEntry("FifoQueue", "true");
createQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
createQueueRequest.withQueueName(topicName + ".fifo");
String reDrivePolicy = "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\""
+ deleteQueueArn + "\"}";
createQueueRequest.addAttributesEntry("RedrivePolicy", reDrivePolicy);
CreateQueueResult createQueueResult = amazonSQS.createQueue(createQueueRequest);
String queueUrl = createQueueResult.getQueueUrl();
//getting ARN value of queue
getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("QueueArn"));
String queueArn = getQueueAttributesResult.getAttributes().get("QueueArn");
//Subscribe FIFO queue to FIFO Topic
SubscribeRequest subscribeRequest = new SubscribeRequest();
subscribeRequest.withProtocol("sqs")
.withTopicArn(topicArn)
.withEndpoint(queueArn);
SubscribeResult subscribeResult = amazonSNS.subscribe(subscribeRequest);
Assertions.assertNotNull(subscribeResult.getSubscriptionArn());
//Publishing 4 sample message to FIFO SNS Topic
for (int i = 0; i < 5; i++) {
PublishRequest publishRequest = new PublishRequest()
.withTopicArn(topicArn)
.withMessage("Test Message" + i)
.withMessageGroupId(topicName)
.withMessageDeduplicationId(UUID.randomUUID().toString());
PublishResult publishResult = amazonSNS.publish(publishRequest);
Assertions.assertNotNull(publishResult.getMessageId());
}
//Getting ApproximateNumberOfMessages no of messages from the FIFO Queue
getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("All"));
String approximateNumberOfMessages = getQueueAttributesResult.getAttributes()
.get("ApproximateNumberOfMessages");
//My expectation here is SNS FIFO topic should have fanout the 4 published message to SQS FIFO Queue
Assertions.assertEquals(4, Integer.valueOf(approximateNumberOfMessages));
}
}
SNS访问策略(权限)
{
"Version": "2008-10-17",
"Id": "__default_policy_ID",
"Statement": [
{
"Sid": "__default_statement_ID",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": [
"SNS:GetTopicAttributes",
"SNS:SetTopicAttributes",
"SNS:AddPermission",
"SNS:RemovePermission",
"SNS:DeleteTopic",
"SNS:Subscribe",
"SNS:ListSubscriptionsByTopic",
"SNS:Publish",
"SNS:Receive"
],
"Resource": "arn:aws:sns:us-west-1:<account>:<topicName>.fifo",
"Condition": {
"StringEquals": {
"AWS:SourceOwner": "<account>"
}
}
}
]
}
SQS访问策略(权限)
{
"Version": "2012-10-17",
"Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}
我错过了什么?为什么消息不在SQS队列中。我应该对以下SQS队列权限做些什么?
{
"Id": "Policy1611770719125",
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1611770707743",
"Action": [
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ListQueueTags",
"sqs:ListQueues",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:SendMessageBatch",
"sqs:SetQueueAttributes"
],
"Effect": "Allow",
"Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo",
"Principal": {
"AWS": "*"
}
}
]
}
如果您正在寻找一种非编程解决方案,并且更愿意使用AWS控制台,请查看AWS官方文档中的“如何使用SNS FIFO主题”部分(下面的链接),其中他们提到您需要向您尝试扇出的FIFO SQS队列的访问策略添加一条语句。这个新添加的语句授予FIFO SNS主题权限,以将消息发送到队列。
使用他们的示例,如果您有一个名为updates.fifo的FIFO SNS主题,并且您试图将消息扇出到customer.fifo和loyalty.fifo两个队列,在将队列订阅到该主题后,您将导航到控制台中的customer.fifo队列,并通过添加上面提到的语句来编辑访问策略:
{
"Effect": "Allow",
"Principal": {
"Service": "sns.amazonaws.com"
},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:us-east-2:123412341234:customer.fifo",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:sns:us-east-2:123412341234:updates.fifo"
}
}
}
这将适用于忠诚度。fifo队列:
{
"Effect": "Allow",
"Principal": {
"Service": "sns.amazonaws.com"
},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:us-east-2:123412341234:loyalty.fifo",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:sns:us-east-2:123412341234:updates.fifo"
}
}
}
资源:介绍亚马逊SNS FIFO
分享我的答案给后代,因为怀疑实际问题与访问策略
有关,当我们创建FIFO SNS队列并使用AWS SDK V1订阅SQS FIFO队列时,默认访问策略如下
{
"Version": "2012-10-17",
"Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}
即使我尝试使用AWS SDK v2链接创建SQS FIFO队列,上述访问策略也将是相同的。因此,当我按以下方式手动更改访问策略时,问题已经解决,FIFO SNS主题扇出按规定发生:
{
"Statement": [
{
"Action": [
"sqs:*"
],
"Effect": "Allow",
"Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo",
"Principal": {
"AWS": "*"
}
}
]
}
为每个FIFO队列添加上述访问策略
的代码块:
Policy policy = new Policy().withStatements(
new Statement(Statement.Effect.Allow)
.withPrincipals(Principal.AllUsers)
.withResources(new Resource(queueArn))
.withActions(SQSActions.AllSQSActions));
Map<String, String> policyQueueAttributes = new HashMap<>();
policyQueueAttributes.put(QueueAttributeName.Policy.toString(), policy.toJson());
amazonSQS.setQueueAttributes(new SetQueueAttributesRequest(queueUrl, policyQueueAttributes));
在创建SQS FIFO队列后添加了上述代码块,最终解决了问题。
我设置了一个EJB项目,使用JMS将持久性实体对象发送到MDB。我使用JBoss EAP 7,使用Apache ActiveMQ作为消息传递提供程序。我像这样设置ConnectionFactory和队列: 这是我的消息生成器,它接收“Account”实体对象作为参数并将其发送到队列: EntityEnqueueBean。Java语言 MDB从队列接收消息并对其进行处理: java账户 不确定我做错
我有一个两节点的Kafka集群(EC2实例),其中每个节点用作单独的代理。当我使用以下命令在leader实例上运行生成器时: 用列出主题表明主题存在。 主题的说明: 退货 谁能帮忙吗?
我的应用程序中有以下代码片段,用于向Azure service bus主题队列发送消息。在发送消息的过程中,我会随机收到通用ServiceBusException。 这是我的例外, 服务无法处理请求;请重试操作。有关异常类型和正确异常处理的更多信息,请参阅http://go.microsoft.com/fwlink/?LinkId=761101TrackingId: 24e3ca8e-a74f-4
我正在学习Kafka,如果有人能帮助我理解一件事。“制作人”向Kafka主题发送消息。它会在那里停留一段时间(默认为7天,对吗?)。 但是“消费者”收到这样的信息,永远保持它在那里没有多大意义。我预计当消费者收到它们时,这些信息会消失。否则,当我再次连接到Kafka时,我将再次下载相同的消息。所以我必须管理重复的避免。 它背后的逻辑是什么? 问候
本文向大家介绍使用消息队列会带来哪些问题?相关面试题,主要包含被问及使用消息队列会带来哪些问题?时的应答技巧和注意事项,需要的朋友参考一下 系统复杂度提高,可用性降低,不仅需要考虑消息队列的可用性,还要考虑数据的一致性
问题内容: 我试图将字符串消息发送到在weblogic服务器中创建的JMS队列中。我使用Eclipse IDE,当我运行Web应用程序时,出现以下错误,tomcat服务器关闭。错误是 请帮助我。谢谢和最诚挚的问候 问题答案: 基于对该问题的一些快速研究,它似乎是由于在应用服务器和客户端之间使用不同的JDK级别引起的。我看到的大多数示例都表明,在Java 5上运行Weblogic时在客户端上使用Ja