<dependency>
<groupId>xin.wjtree.qmq</groupId>
<artifactId>qmq-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
spring:
application:
name: qmq-demo
qmq:
# 应用标识 appcode,必填
app-code: qmq-demo
# 服务器地址 metaserver,必填
meta-server: http://127.0.0.1:8080/meta/address
# 生产者配置,发送消息的线程池的设置,选填
producer:
# 发送线程数,默认 3
send-threads: 3
# 默认每次发送时最大批量大小,默认 30
send-batch: 30
# 如果消息发送失败,重试次数,默认 10
send-try-count: 10
# 异步发送队列大小,默认 10000
max-queue-size: 10000
# 使用 QmqTemplate 发送消息的默认主题,默认值 default_subject
template:
default-subject: default_subject
# 消费者配置,消费消息的线程池的设置,选填
consumer:
# 线程名称前缀,默认 qmq-process
thread-name-prefix: qmq-process
# 线程池大小,默认 2
core-pool-size: 2
# 最大线程池大小,默认 2
max-pool-size: 2
# 线程池队列大小,默认 1000
queue-capacity: 1000
# 消息主题和分组配置,选填
# 使用 QmqConsumer 注解时,可使用 SpEL 表达式引入以下主题和分组
subject:
sub1: sub1
sub2: sub2
sub3: sub3
# more subject ...
group:
group1: group1
group2: group2
group3: group3
# more group ...
logging:
level:
# 设置 qmq-spring-boot-starter 的日志级别
xin.wjtree.qmq: trace
server:
port: 8989
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import qunar.tc.qmq.Message;
import qunar.tc.qmq.MessageSendStateListener;
import xin.wjtree.qmq.QmqTemplate;
import xin.wjtree.qmq.autoconfigure.QmqProperties;
import xin.wjtree.qmq.constant.QmqTimeUnit;
import xin.wjtree.qmq.internal.QmqAlias;
import xin.wjtree.qmq.internal.QmqIgnore;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
@RunWith(SpringRunner.class)
@SpringBootTest
public class QmqTest {
@Resource
private QmqTemplate template;
@Resource
private QmqProperties properties;
/**
* 发送即时消息
* @throws InterruptedException
*/
@Test
public void sendImmediate() throws InterruptedException {
// 计数器,执行1次结束
CountDownLatch latch = new CountDownLatch(1);
// 一般使用 template.send(properties.getSubject().get("sub1"), getUser()) 即可
template.withSendStateListener(new MessageSendStateListener() {
@Override
public void onSuccess(Message m) {
latch.countDown();
}
@Override
public void onFailed(Message m) {
latch.countDown();
}
}).send(properties.getSubject().get("sub1"), getUser());
// 计数器减1
latch.await();
}
/**
* 发送延时消息
* @throws InterruptedException
*/
@Test
public void sendDelay() throws InterruptedException {
// 计数器,执行1次结束
CountDownLatch latch = new CountDownLatch(1);
// 延时 10 秒发送消息
// 一般使用 template.sendDelay(properties.getSubject().get("sub1"), getUser(), QmqTimeUnit.TEN_SECONDS) 即可
template.withSendStateListener(new MessageSendStateListener() {
@Override
public void onSuccess(Message m) {
latch.countDown();
}
@Override
public void onFailed(Message m) {
latch.countDown();
}
}).sendDelay(properties.getSubject().get("sub1"), getUser(), QmqTimeUnit.TEN_SECONDS);
// 计数器减1
latch.await();
}
/**
* 发送定时消息
* @throws InterruptedException
*/
@Test
public void sendSchedule() throws InterruptedException, ParseException {
// 计数器,执行1次结束
CountDownLatch latch = new CountDownLatch(1);
// 定时发送的日期时间
Date date = new SimpleDateFormat("yyyy-MM-dd HH����ss").parse("2019-07-28 00:16:00");
// 一般使用 template.sendSchedule(properties.getSubject().get("sub1"), getUser(), date) 即可
template.withSendStateListener(new MessageSendStateListener() {
@Override
public void onSuccess(Message m) {
latch.countDown();
}
@Override
public void onFailed(Message m) {
latch.countDown();
}
}).sendSchedule(properties.getSubject().get("sub1"), getUser(), date);
// 计数器减1
latch.await();
}
public User getUser() {
User user = new User();
user.setId(100000000001L);
user.setName("张三");
user.setAge(120);
user.setSchool("北京大学");
user.setCompany("中石油");
user.setDuty("行政总裁");
user.setSalary(new BigDecimal("1000000"));
user.setEnable(true);
return user;
}
public static class User {
@QmqAlias("user_id")
private Long id;
private String name;
private Integer age;
@QmqAlias("school_name")
private String school;
private String company;
@QmqIgnore
private String duty;
private BigDecimal salary;
private Boolean enable;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getSchool() {
return school;
}
public void setSchool(String school) {
this.school = school;
}
public String getCompany() {
return company;
}
public void setCompany(String company) {
this.company = company;
}
public String getDuty() {
return duty;
}
public void setDuty(String duty) {
this.duty = duty;
}
public BigDecimal getSalary() {
return salary;
}
public void setSalary(BigDecimal salary) {
this.salary = salary;
}
public Boolean getEnable() {
return enable;
}
public void setEnable(Boolean enable) {
this.enable = enable;
}
}
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import qunar.tc.qmq.consumer.annotation.EnableQmq;
@EnableQmq(appCode="${spring.qmq.app-code}", metaServer="${spring.qmq.meta-server}")
@SpringBootApplication
public class QmqApplication {
public static void main(String[] args) {
SpringApplication.run(QmqApplication.class, args);
}
}
QmqConstant.EXECUTOR_NAME
表示消费线程池的 BeanName,该值固定为 qmqExecutor
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import qunar.tc.qmq.Message;
import qunar.tc.qmq.base.BaseMessage;
import qunar.tc.qmq.consumer.annotation.QmqConsumer;
import xin.wjtree.qmq.constant.QmqConstant;
@Slf4j
@Component
public class QmqLinstener {
@QmqConsumer(subject = "${spring.qmq.subject.sub1}", consumerGroup = "${spring.qmq.group.group1}",
executor = QmqConstant.EXECUTOR_NAME)
public void onMessage(Message message) {
log.info("qmq 消费主题:{},消费消息:{}", message.getSubject(), ((BaseMessage) message).getAttrs());
}
}
原文地址:https://blog.csdn.net/qq_41018959/article/details/81076055 https://blog.csdn.net/hao114500043/article/details/81742849 1,pom配置,引入相关jar: <dependency> <groupId>org.springframework.integration</
maven依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!--
RabbitMQ的应用之spring-boot-starter-amqp 前言 上一篇 RabbitMQ 已对RabbitMQ和AMQP的概念进行了学习 这一篇的目的主要是使用Spring Boot的spring-boot-starter-amqp整合RabbitMQ,以达到加深理解的目的。 系列 同步异步(wx.request+ajax) 消息队列的学习与理解 消息队列之RabbitMQ的学习
Springboot 整合MQTT。 MQTT服务器采用的是Apache ActiveMQ。 启动Apache ActiveMQ会有多个传输协议,这里采用的是tcp的方式,默认端口1883 1.创建配置文件config.properties mqtt.host=tcp://127.0.0。1:1883 mqtt.clientid=mqttjs_e8022a4d0b mqtt.username=ad
在springBoot项目中使用rabbitMQ是很方便的,spring提供了spring-boot-starter-amqp依赖,只需要简单的配置即可与spring无缝整合 本文不介绍rabbitMQ理论知识,主要介绍在spring中如何使用rabbirMQ 依赖与配置 maven <dependencies> <dependency> <groupI
导入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 发送方 代码 @Autowired //注入操作模板 private AmqpTemplate amqpTem
brew更新到最新版本,执行:brew update 安装Erlang,执行:brew install erlang 安装RabbitMQ Server,执行:brew install rabbitmq .bash_profile或.profile文件中增加下面内容: PATH=$PATH:/usr/local/sbin 通过rabbitmq-server命令来启动RabbitMQ的服务端 打开浏
1.下载Emqx安装包,配置Emqx环境 下载地址:免费试用 EMQ 产品 下载压缩包解压,打开cmd,进入emqx/bin目录,输入emqx start,启动服务。 2.创建SpringBoot项目Demo,添加pom引入jar包 <!-- mqtt --> <dependency> <groupId>org.springframework.integration</groupId>
1.依赖类 <!-- mqtt --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <d
QMQ 是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。目前在公司内部日常消息 qps 在 60W 左右,生产上承载将近 4W+ 消息 topic ,消息的端到端延迟可以控制在 10ms 以内。 QMQ 主要提供以下特性: 异步实时消息 延迟/定时消息 基于 Tag 的服务端过滤 Consum
我正在努力学习spring boot,我注意到有两种选择。 > spring-boot-starter-web-根据文档,它支持全栈web开发,包括Tomcat和web-mvc spring-Boot-Starter-Tomcat 既然#1支持Tomcat,为什么要使用#2呢? 有什么不同? 谢谢
主要内容:starter,spring-boot-starter-parent传统的 Spring 项目想要运行,不仅需要导入各种依赖,还要对各种 XML 配置文件进行配置,十分繁琐,但 Spring Boot 项目在创建完成后,即使不编写任何代码,不进行任何配置也能够直接运行,这都要归功于 Spring Boot 的 starter 机制。本节我们将对 stater 进行介绍。 starter Spring Boot 将日常企业应用研发中的各种场景都抽取出来,做成一个个的
Spring Boot 项目旨在简化创建产品级的 Spring 应用和服务。你可通过它来选择不同的 Spring 平台。可创建独立的 Java 应用和 Web 应用,同时提供了命令行工具来允许 'spring scripts'. 下图显示 Spring Boot 在 Spring 生态中的位置: 该项目主要的目的是: 为 Spring 的开发提供了更快更广泛的快速上手 使用默认方式实现快速开发 提
当我试图用启动我的spring boot项目时,我遇到了这个特殊的错误。奇怪的是,我的应用程序会在添加几个存储库和服务之前启动,但我似乎无法缩小为什么spring不能初始化一个在添加之前正在工作的存储库的范围。 以下是相关错误: 最有趣的是这句话: 11:38:43.583错误org.springframework.boot.context.embedde.Tomcat.tomcatstarter
我的项目root目录下pom文件添加了springboot的依赖 前端的目录下pom中添加了,要用到@restcontroller 然后一堆包冲突,是不是我的做法有错误,还是版本问题?