qmq-spring-boot-starter

授权协议 Apache
开发语言 Java
所属分类 服务器软件、 JMS/消息中间件
软件类型 开源软件
地区 国产
投 递 者 欧阳俊捷
操作系统 跨平台
开源组织
适用人群 未知
 软件概览

使用方式

引入 Maven 依赖(已上传到中央仓库)

<dependency>
    <groupId>xin.wjtree.qmq</groupId>
    <artifactId>qmq-spring-boot-starter</artifactId>
    <version>1.0.0</version>
</dependency>

添加 Spring Boot 配置(YML)

spring:
  application:
    name: qmq-demo
  qmq:
    # 应用标识 appcode,必填
    app-code: qmq-demo
    # 服务器地址 metaserver,必填
    meta-server: http://127.0.0.1:8080/meta/address

    # 生产者配置,发送消息的线程池的设置,选填
    producer:
      # 发送线程数,默认 3
      send-threads: 3
      # 默认每次发送时最大批量大小,默认 30
      send-batch: 30
      # 如果消息发送失败,重试次数,默认 10
      send-try-count: 10
      # 异步发送队列大小,默认 10000
      max-queue-size: 10000

    # 使用 QmqTemplate 发送消息的默认主题,默认值 default_subject
    template:
      default-subject: default_subject

    # 消费者配置,消费消息的线程池的设置,选填
    consumer:
      # 线程名称前缀,默认 qmq-process
      thread-name-prefix: qmq-process
      # 线程池大小,默认 2
      core-pool-size: 2
      # 最大线程池大小,默认 2
      max-pool-size: 2
      # 线程池队列大小,默认 1000
      queue-capacity: 1000

    # 消息主题和分组配置,选填
    # 使用 QmqConsumer 注解时,可使用 SpEL 表达式引入以下主题和分组
    subject:
      sub1: sub1
      sub2: sub2
      sub3: sub3
      # more subject ...
    group:
      group1: group1
      group2: group2
      group3: group3
      # more group ...

logging:
  level:
    # 设置 qmq-spring-boot-starter 的日志级别
    xin.wjtree.qmq: trace

server:
  port: 8989

发送消息

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import qunar.tc.qmq.Message;
import qunar.tc.qmq.MessageSendStateListener;
import xin.wjtree.qmq.QmqTemplate;
import xin.wjtree.qmq.autoconfigure.QmqProperties;
import xin.wjtree.qmq.constant.QmqTimeUnit;
import xin.wjtree.qmq.internal.QmqAlias;
import xin.wjtree.qmq.internal.QmqIgnore;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)
@SpringBootTest
public class QmqTest {
    @Resource
    private QmqTemplate template;
    @Resource
    private QmqProperties properties;

    /**
     * 发送即时消息
     * @throws InterruptedException
     */
    @Test
    public void sendImmediate() throws InterruptedException {
        // 计数器,执行1次结束
        CountDownLatch latch = new CountDownLatch(1);

        // 一般使用 template.send(properties.getSubject().get("sub1"), getUser()) 即可
        template.withSendStateListener(new MessageSendStateListener() {
            @Override
            public void onSuccess(Message m) {
                latch.countDown();
            }

            @Override
            public void onFailed(Message m) {
                latch.countDown();
            }
        }).send(properties.getSubject().get("sub1"), getUser());

        // 计数器减1
        latch.await();
    }

    /**
     * 发送延时消息
     * @throws InterruptedException
     */
    @Test
    public void sendDelay() throws InterruptedException {
        // 计数器,执行1次结束
        CountDownLatch latch = new CountDownLatch(1);

        // 延时 10 秒发送消息
        // 一般使用 template.sendDelay(properties.getSubject().get("sub1"), getUser(), QmqTimeUnit.TEN_SECONDS) 即可
        template.withSendStateListener(new MessageSendStateListener() {
            @Override
            public void onSuccess(Message m) {
                latch.countDown();
            }

            @Override
            public void onFailed(Message m) {
                latch.countDown();
            }
        }).sendDelay(properties.getSubject().get("sub1"), getUser(), QmqTimeUnit.TEN_SECONDS);

        // 计数器减1
        latch.await();
    }

    /**
     * 发送定时消息
     * @throws InterruptedException
     */
    @Test
    public void sendSchedule() throws InterruptedException, ParseException {
        // 计数器,执行1次结束
        CountDownLatch latch = new CountDownLatch(1);

        // 定时发送的日期时间
        Date date = new SimpleDateFormat("yyyy-MM-dd HH����ss").parse("2019-07-28 00:16:00");

        // 一般使用 template.sendSchedule(properties.getSubject().get("sub1"), getUser(), date) 即可
        template.withSendStateListener(new MessageSendStateListener() {
            @Override
            public void onSuccess(Message m) {
                latch.countDown();
            }

            @Override
            public void onFailed(Message m) {
                latch.countDown();
            }
        }).sendSchedule(properties.getSubject().get("sub1"), getUser(), date);

        // 计数器减1
        latch.await();
    }

    public User getUser() {
        User user = new User();
        user.setId(100000000001L);
        user.setName("张三");
        user.setAge(120);
        user.setSchool("北京大学");
        user.setCompany("中石油");
        user.setDuty("行政总裁");
        user.setSalary(new BigDecimal("1000000"));
        user.setEnable(true);
        return user;
    }

    public static class User {
        @QmqAlias("user_id")
        private Long id;

        private String name;

        private Integer age;

        @QmqAlias("school_name")
        private String school;

        private String company;

        @QmqIgnore
        private String duty;

        private BigDecimal salary;

        private Boolean enable;

        public Long getId() {
            return id;
        }

        public void setId(Long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }

        public String getSchool() {
            return school;
        }

        public void setSchool(String school) {
            this.school = school;
        }

        public String getCompany() {
            return company;
        }

        public void setCompany(String company) {
            this.company = company;
        }

        public String getDuty() {
            return duty;
        }

        public void setDuty(String duty) {
            this.duty = duty;
        }

        public BigDecimal getSalary() {
            return salary;
        }

        public void setSalary(BigDecimal salary) {
            this.salary = salary;
        }

        public Boolean getEnable() {
            return enable;
        }

        public void setEnable(Boolean enable) {
            this.enable = enable;
        }
    }
}

消费消息

启用消费者模式

  • 在配置类上添加 EnableQmq 注解,包括 appCode 和 metaServer 属性
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import qunar.tc.qmq.consumer.annotation.EnableQmq;

@EnableQmq(appCode="${spring.qmq.app-code}", metaServer="${spring.qmq.meta-server}")
@SpringBootApplication
public class QmqApplication {

    public static void main(String[] args) {
        SpringApplication.run(QmqApplication.class, args);
    }

}

配置消费监听器

  • 在方法上添加 QmqConsumer 注解,包括 subject,consumerGroup,executor 等属性
  • executor = QmqConstant.EXECUTOR_NAME 表示消费线程池的 BeanName,该值固定为 qmqExecutor
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import qunar.tc.qmq.Message;
import qunar.tc.qmq.base.BaseMessage;
import qunar.tc.qmq.consumer.annotation.QmqConsumer;
import xin.wjtree.qmq.constant.QmqConstant;

@Slf4j
@Component
public class QmqLinstener {

    @QmqConsumer(subject = "${spring.qmq.subject.sub1}", consumerGroup = "${spring.qmq.group.group1}",
            executor = QmqConstant.EXECUTOR_NAME)
    public void onMessage(Message message) {
        log.info("qmq 消费主题:{},消费消息:{}", message.getSubject(), ((BaseMessage) message).getAttrs());
    }

}
  • 原文地址:https://blog.csdn.net/qq_41018959/article/details/81076055 https://blog.csdn.net/hao114500043/article/details/81742849 1,pom配置,引入相关jar: <dependency> <groupId>org.springframework.integration</

  • maven依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!--

  • RabbitMQ的应用之spring-boot-starter-amqp 前言 上一篇 RabbitMQ 已对RabbitMQ和AMQP的概念进行了学习 这一篇的目的主要是使用Spring Boot的spring-boot-starter-amqp整合RabbitMQ,以达到加深理解的目的。 系列 同步异步(wx.request+ajax) 消息队列的学习与理解 消息队列之RabbitMQ的学习

  • Springboot 整合MQTT。 MQTT服务器采用的是Apache ActiveMQ。 启动Apache ActiveMQ会有多个传输协议,这里采用的是tcp的方式,默认端口1883 1.创建配置文件config.properties mqtt.host=tcp://127.0.0。1:1883 mqtt.clientid=mqttjs_e8022a4d0b mqtt.username=ad

  • 在springBoot项目中使用rabbitMQ是很方便的,spring提供了spring-boot-starter-amqp依赖,只需要简单的配置即可与spring无缝整合 本文不介绍rabbitMQ理论知识,主要介绍在spring中如何使用rabbirMQ 依赖与配置 maven <dependencies> <dependency> <groupI

  • 导入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 发送方 代码 @Autowired //注入操作模板 private AmqpTemplate amqpTem

  • brew更新到最新版本,执行:brew update 安装Erlang,执行:brew install erlang 安装RabbitMQ Server,执行:brew install rabbitmq .bash_profile或.profile文件中增加下面内容: PATH=$PATH:/usr/local/sbin 通过rabbitmq-server命令来启动RabbitMQ的服务端 打开浏

  • 1.下载Emqx安装包,配置Emqx环境 下载地址:免费试用 EMQ 产品 下载压缩包解压,打开cmd,进入emqx/bin目录,输入emqx start,启动服务。 2.创建SpringBoot项目Demo,添加pom引入jar包 <!-- mqtt --> <dependency> <groupId>org.springframework.integration</groupId>

  • 1.依赖类 <!-- mqtt --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <d

 相关资料
  • QMQ

    QMQ 是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。目前在公司内部日常消息 qps 在 60W 左右,生产上承载将近 4W+ 消息 topic ,消息的端到端延迟可以控制在 10ms 以内。 QMQ 主要提供以下特性: 异步实时消息 延迟/定时消息 基于 Tag 的服务端过滤 Consum

  • 我正在努力学习spring boot,我注意到有两种选择。 > spring-boot-starter-web-根据文档,它支持全栈web开发,包括Tomcat和web-mvc spring-Boot-Starter-Tomcat 既然#1支持Tomcat,为什么要使用#2呢? 有什么不同? 谢谢

  • 主要内容:starter,spring-boot-starter-parent传统的 Spring 项目想要运行,不仅需要导入各种依赖,还要对各种 XML 配置文件进行配置,十分繁琐,但 Spring Boot 项目在创建完成后,即使不编写任何代码,不进行任何配置也能够直接运行,这都要归功于 Spring Boot 的 starter 机制。本节我们将对 stater 进行介绍。 starter Spring Boot 将日常企业应用研发中的各种场景都抽取出来,做成一个个的

  • Spring Boot 项目旨在简化创建产品级的 Spring 应用和服务。你可通过它来选择不同的 Spring 平台。可创建独立的 Java 应用和 Web 应用,同时提供了命令行工具来允许 'spring scripts'. 下图显示 Spring Boot 在 Spring 生态中的位置: 该项目主要的目的是: 为 Spring 的开发提供了更快更广泛的快速上手 使用默认方式实现快速开发 提

  • 当我试图用启动我的spring boot项目时,我遇到了这个特殊的错误。奇怪的是,我的应用程序会在添加几个存储库和服务之前启动,但我似乎无法缩小为什么spring不能初始化一个在添加之前正在工作的存储库的范围。 以下是相关错误: 最有趣的是这句话: 11:38:43.583错误org.springframework.boot.context.embedde.Tomcat.tomcatstarter

  • 我的项目root目录下pom文件添加了springboot的依赖 前端的目录下pom中添加了,要用到@restcontroller 然后一堆包冲突,是不是我的做法有错误,还是版本问题?