java springboot集成aws sdk sqs

白泽语
2023-12-01

概述

本文介绍java springboot项目集成aws-sdk2.0版本即AWS SDK for Java 2.x,与1.x比较在编程方面有
很大区别,请注意区分
步骤
  • 1.创建aws账号 Create an AWS account
  • 2.创建IAM用户,申请key_id和access_key
  • 3.配置credentials
  • 4.Java代码集成

本文重点介绍步骤3和4,1和2可到官网进行申请

3.配置credentials
创建credentials文件,aws-sdk会自动读取文件,根据操作系统不同,credentials文件位置不一样

Windows: C:\Users\<yourUserName>\.aws\credentials

Linux, macOS, Unix: ~/.aws/credentials

credentials文件文件内容如下:

[default]
aws_access_key_id = YOUR_AWS_ACCESS_KEY_ID
aws_secret_access_key = YOUR_AWS_SECRET_ACCESS_KEY

创建好credentials文件后,aws sdk会自动读取

4.Java代码集成

我拿aws的sqs消息队列服务举例

引入依赖:
        <!--aws sqs依赖-->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-sqs-java-messaging-lib</artifactId>
            <version>1.0.8</version>
        </dependency>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>aws-sdk-java</artifactId>
            <version>2.14.26</version>
        </dependency>
创建AwsConfig类
@Configuration
public class SqsConfig {
	//配置SqsClient,之后就用这个client连接队列服务
    @Bean
    public SqsClient sqsClient() throws URISyntaxException {
        SqsClientBuilder sqsClientBuilder = SqsClient.builder().region(Region.CN_NORTH_1).endpointOverride(new URI("http://localhost:4566"));
        return sqsClientBuilder.build();
    }
 }

虽然代码很少,但这是经过多次实验总结的,sdk1.x版本和这个有差别推荐使用2.x版本
region:地区,写自己所在region就行
endpoint URI:换成自己的地址就行

创建SqsService类
@Service
public class SqsService {

    private Logger logger = LoggerFactory.getLogger(SqsService.class);

    @Resource
    private SqsClient sqsClient;

    @Resource
    private Executor executor;
    private String queueUrl;

    @PostConstruct
    public void init() {
        //如果队列不存在,才会创建队列
        queueUrl =  createQueue("sms");
        //启动消费者监听器
        consumerListener();
    }
    /**
     * @Description: 创建队列
     * @author 
     * @Date 2021/3/12
     */
    public String createQueue(String queueName) {
        CreateQueueRequest createQueueRequest = CreateQueueRequest.builder().queueName(queueName).build();
        sqsClient.createQueue(createQueueRequest);
        GetQueueUrlResponse getQueueUrlResponse = sqsClient.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build());
        return getQueueUrlResponse.queueUrl();
    }

    /**
     * @Description: 发送消息 Sms类为自己的实体类
     * @author
     * @Date 2021/3/12
     */
    public void sendMessage(Sms sms) {
        try {
            String json = JSON.toJSONString(sms);
            SendMessageRequest sendMessageRequest = SendMessageRequest.builder().queueUrl(queueUrl).messageBody(json).build();
            sqsClient.sendMessage(sendMessageRequest);
        } catch (Exception e) {
            logger.error("sendMessage error:{}",sms,e);
        }
    }

    /**
     * @Description: 接受消息
     * @author 
     * @Date 2021/3/12
     */
    public List<Message> receiveMessage() {
        try {
            ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder().queueUrl(queueUrl).maxNumberOfMessages(3).waitTimeSeconds(20).build();
            List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest).messages();
            return messages;
        } catch (Exception e) {
            logger.error("receiveMessage error ",e);
            return new ArrayList();
        }
    }

    /**
     * @Description: 删除消息
     * @author
     * @Date 2021/3/12
     */
    public void deleteMessages(List<Message> messages) {
        try {
            messages.forEach(message -> {
                DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder().queueUrl(queueUrl).receiptHandle(message.receiptHandle()).build();
                sqsClient.deleteMessage(deleteMessageRequest);
            });
        } catch (Exception e) {
           logger.error("deleteMessages error ",e);
        }
    }

    /**
     * @Description: 消费者监听器
     * @author 
     * @Date 2021/3/9
     */
    public void consumerListener() {
        logger.info("start consumerListener");
        executor.execute(() -> {
            while (true) {
                List<Message> messages = receiveMessage();
                messages.forEach(message -> {
                    String body = message.body();
                    Sms sms = JSON.parseObject(body, Sms.class);
                    System.out.println(sms);
                });
                deleteMessages(messages);
            }
        });
    }
}
最后

aws sqs服务发送消息和接受消息都是主动发送请求的,所以要写消费者不断轮询接受消息,而且接收到消息后,消费之后需要程序主动删除消息
aws-sdk 官方文档
sqs更多操作见 sqs官方文档
sqs工作原理见官网 sqs说明官网

 类似资料: