当前位置: 首页 > 知识库问答 >
问题:

让sqs消费者在sqs中检测receiveMessage事件是否可扩展

丁宏浚
2023-03-14

我使用aws sqs作为消息队列。在<代码>sqs之后。sendMessage发送数据,我想检测sqs。通过无限循环或事件触发以可扩展的方式接收消息。然后我接触了sqs消费者来处理sqs。接收消息(receiveMessage)事件,即它接收消息的那一刻。但我想知道,这是处理微服务之间消息传递的最合适的方法还是有其他更好的方法来处理这件事?

共有1个答案

步骏
2023-03-14

我用java编写了代码,用于使用SQSBufferedAsyncClient从sqs队列中获取数据,使用此API的优点是以异步模式缓冲消息。

/**
 * 
 */
package com.sxm.aota.tsc.config;

import java.net.UnknownHostException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.retry.RetryPolicy.BackoffStrategy;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.buffered.AmazonSQSBufferedAsyncClient;
import com.amazonaws.services.sqs.buffered.QueueBufferConfig;

@Configuration
public class SQSConfiguration {

    /** The properties cache config. */
    @Autowired
    private PropertiesCacheConfig propertiesCacheConfig;

    @Bean
    public AmazonSQSAsync amazonSQSClient() {
        // Create Client Configuration
        ClientConfiguration clientConfig = new ClientConfiguration()
            .withMaxErrorRetry(5)
            .withConnectionTTL(10_000L)
            .withTcpKeepAlive(true)
            .withRetryPolicy(new RetryPolicy(
                    null, 
                new BackoffStrategy() {                 
                    @Override
                    public long delayBeforeNextRetry(AmazonWebServiceRequest req, 
                            AmazonClientException exception, int retries) {
                        // Delay between retries is 10s unless it is UnknownHostException 
                        // for which retry is 60s
                        return exception.getCause() instanceof UnknownHostException ? 60_000L : 10_000L;
                    }
                }, 10, true));
        // Create Amazon client
        AmazonSQSAsync asyncSqsClient = null;
        if (propertiesCacheConfig.isIamRole()) {
            asyncSqsClient = new AmazonSQSAsyncClient(new InstanceProfileCredentialsProvider(true), clientConfig);
        } else {
            asyncSqsClient = new AmazonSQSAsyncClient(
                    new BasicAWSCredentials("sceretkey", "accesskey"));
        }
        final Regions regions = Regions.fromName(propertiesCacheConfig.getRegionName());
        asyncSqsClient.setRegion(Region.getRegion(regions));
        asyncSqsClient.setEndpoint(propertiesCacheConfig.getEndPoint());

        // Buffer for request batching
        final QueueBufferConfig bufferConfig = new QueueBufferConfig();
        // Ensure visibility timeout is maintained
        bufferConfig.setVisibilityTimeoutSeconds(20);
        // Enable long polling
        bufferConfig.setLongPoll(true);
        // Set batch parameters
//      bufferConfig.setMaxBatchOpenMs(500);
        // Set to receive messages only on demand
//      bufferConfig.setMaxDoneReceiveBatches(0);
//      bufferConfig.setMaxInflightReceiveBatches(0);

        return new AmazonSQSBufferedAsyncClient(asyncSqsClient, bufferConfig);
    }

}

然后编写调度程序,每隔2秒执行一次,并从队列中提取数据,在可见性超时之前对其进行处理并将其从队列中删除,否则当可见性超时再次过期时,它将准备好再次处理。

package com.sxm.aota.tsc.sqs;

import java.util.List;
import java.util.concurrent.CountDownLatch;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.fasterxml.jackson.databind.ObjectMapper;


    /**
     * The Class TSCDataSenderScheduledTask.
     * 
     * Sends the aggregated Vehicle data to TSC in batches
     */
    @EnableScheduling
    @Component("sqsScheduledTask")
    @DependsOn({ "propertiesCacheConfig", "amazonSQSClient" })
    public class SQSScheduledTask {

        private static final Logger LOGGER = LoggerFactory.getLogger(SQSScheduledTask.class);
        @Autowired
        private PropertiesCacheConfig propertiesCacheConfig;
        @Autowired
        public AmazonSQSAsync amazonSQSClient;

        /**
         * Timer Task that will run after specific interval of time Majorly
         * responsible for sending the data in batches to TSC.
         */
        private String queueUrl;
        private final ObjectMapper mapper = new ObjectMapper();

        @PostConstruct
        public void initialize() throws Exception {
            LOGGER.info("SQS-Publisher", "Publisher initializing for queue " + propertiesCacheConfig.getSQSQueueName(),
                    "Publisher initializing for queue " + propertiesCacheConfig.getSQSQueueName());
            // Get queue URL
            final GetQueueUrlRequest request = new GetQueueUrlRequest().withQueueName(propertiesCacheConfig.getSQSQueueName());
            final GetQueueUrlResult response = amazonSQSClient.getQueueUrl(request);
            queueUrl = response.getQueueUrl();

            LOGGER.info("SQS-Publisher", "Publisher initialized for queue " + propertiesCacheConfig.getSQSQueueName(),
                    "Publisher initialized for queue " + propertiesCacheConfig.getSQSQueueName() + ", URL = " + queueUrl);
        }

        @Scheduled(fixedDelayString = "${sqs.consumer.delay}")
        public void timerTask() {

            final ReceiveMessageResult receiveResult = getMessagesFromSQS();
            String messageBody = null;
            if (receiveResult != null && receiveResult.getMessages() != null && !receiveResult.getMessages().isEmpty()) {
                try {
                    messageBody = receiveResult.getMessages().get(0).getBody();
                    String messageReceiptHandle = receiveResult.getMessages().get(0).getReceiptHandle();
                    Vehicles vehicles = mapper.readValue(messageBody, Vehicles.class);
                    processMessage(vehicles.getVehicles(),messageReceiptHandle);
                } catch (Exception e) {
                    LOGGER.error("Exception while processing SQS message : {}", messageBody);
                    // Message is not deleted on SQS and will be processed again after visibility timeout
                }
            }
        }

        public void processMessage(List<Vehicle> vehicles,String messageReceiptHandle) throws InterruptedException {
            //processing code
            //delete the sqs message as the processing is completed
            //Need to create atomic counter that will be increamented by all TS.. Once it will be 0 then we will be deleting the messages

                    amazonSQSClient.deleteMessage(new DeleteMessageRequest(queueUrl, messageReceiptHandle));

        }

        private ReceiveMessageResult getMessagesFromSQS() {
            try {
                // Create new request and fetch data from Amazon SQS queue
                final ReceiveMessageResult receiveResult = amazonSQSClient
                        .receiveMessage(new ReceiveMessageRequest().withMaxNumberOfMessages(1).withQueueUrl(queueUrl));
                return receiveResult;
            } catch (Exception e) {
                LOGGER.error("Error while fetching data from SQS", e);
            }
            return null;
        }

    }
 类似资料:
  • 我正在查看关于使用Quarkus从SQS消费的指南。 问题是我想在无休止的循环中执行它,例如每10秒获取一次新消息,并使用Hibernate Reactive从消息中插入一些数据到数据库中。 我创建了一个Quarkus调度程序,但由于它不支持返回Uni,我不得不阻止Hibernate Responsive的响应,因此出现了这个错误 使用Quarkus和reactive实现我所需的最佳方法是什么?

  • 我的应用程序有一个生产者和一个消费者。我的生产者不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者监听队列,当有消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果我的消费者没有完成处理当前消息,我不希望他接受队列中的另一条消息。 我认为AKKA和AWS SQS可以满足我的需求。通过阅读文档和示例,akka-camel似乎可以简化我的工作。 我在github

  • 我知道可以使用多个线程使用SQS队列。我想保证每封邮件都会被消费一次。我知道可以更改消息的可见性超时,例如,等于我的处理时间。如果我的进程花费的时间超过可见性超时时间(例如连接速度慢),则其他线程可以使用相同的消息。 保证消息只处理一次的最佳方法是什么?

  • 我用java编写我所有的微服务。我想在Amazon SQS中使用多个消费者,但每个消费者在负载均衡器后面的AWS上有多个实例。 我使用SNS作为输入流 我在SNS之后使用SQS标准队列。 我在stackoverflow上发现了同样的问题(使用多个消费者的Amazon SQS) 此示例为 https://aws.amazon.com/fr/blogs/aws/queues-and-notificat

  • 我的问题是,我无法足够快地轮询我的队列,以保持我的队列为空或接近空。我最初的想法是,我可以让使用者以x/s的速率通过Camel从SQS接收消息。从那里,我可以简单地创建更多的消费者,以达到我需要的消息处理速度。 我的消费者: 如图所示,我设置了和以提高消息的速率,但是我无法生成具有相同endpoint的多个使用者。 我在文档中读到,我相信SQSendpoint也是如此,因为生成多个使用者将只给我一

  • 我有一个基于服务的应用程序,它使用Amazon SQS,具有多个队列和多个消费者。我这样做是为了实现一个基于事件的架构,并解耦所有服务,其中不同的服务对其他系统状态的变化做出反应。例如: 注册服务: 当新用户注册时,发出事件“registration new” 在用户更新时发出事件'user-更新'。 从队列“registration new”(注册新)中读取,并为搜索中的用户编制索引 从“注册-