1.RabbitMQ简介
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。
RabbitMQ是由RabbitMQ Technologies Ltd开发并且提供商业支持的。该公司在2010年4月被SpringSource(VMWare的一个部门)收购。在2013年5月被并入Pivotal。其实VMWare,Pivotal和EMC本质上是一家的。不同的是VMWare是独立上市子公司,而Pivotal是整合了EMC的某些资源,现在并没有上市。
2.应用场景
对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块的如何通信?这和传统的IPC有很大的区别。传统的IPC很多都是在单一系统上的,模块耦合性很大,不适合扩展(Scalability);如果使用socket那么不同的模块的确可以部署到不同的机器上,但是还是有很多问题需要解决。比如:
1)信息的发送者和接收者如何维持这个连接,如果一方的连接中断,这期间的数据如何方式丢失?
2)如何降低发送者和接收者的耦合度?
3)如何让Priority高的接收者先接到数据?
4)如何做到load balance?有效均衡接收者的负载?
5)如何有效的将数据发送到相关的接收者?也就是说将接收者subscribe 不同的数据,如何做有效的filter。
6)如何做到可扩展,甚至将这个通信模块发到cluster上?
7)如何保证接收者接收到了完整,正确的数据?
AMDQ协议解决了以上的问题,而RabbitMQ实现了AMQP。
3.配置RabbitMQ
//gradle依赖如下
compile "org.springframework.boot:spring-boot-starter-amqp:1.4.2.RELEASE"
//maven依赖如下
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.4.2.RELEASE</version>
</dependency>
(1).配置文件配置
#在application.yml做如下配置
spring:
rabbitmq:
host: 192.168.56.101
port: 5672
#username: admin
#password: 123456
(2).Java Config配置
#在RabbitmqConfiguration.java做如下配置
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
*
*
* @author sunzc
*
* 2017年6月10日 上午9:41:26
*/
@Configuration
public class RabbitmqConfiguration {
/**
* 配置RabbitAdmin bean
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
// 创建并返回RabbitAdmin对象
return new RabbitAdmin(connectionFactory);
}
/**
* 配置知乎用户资料队列 bean
*
* @param rabbitAdmin
* @return
*/
@Bean
public Queue zhihuUserDataQueue(@Autowired RabbitAdmin rabbitAdmin) {
// 创建知乎用户资料队列对象
Queue ubdQueue = new Queue("zhuhu.user.queue");
// 声明知乎用户资料队列
rabbitAdmin.declareQueue(ubdQueue);
// 返回知乎用户资料队列
return ubdQueue;
}
/**
* 配置知乎问题队列 bean
*
* @param rabbitAdmin
* @return
*/
@Bean
public Queue userBehaviorDataQueue(@Autowired RabbitAdmin rabbitAdmin) {
// 创建知乎问题队列对象
Queue ubdQueue = new Queue("zhuhu.question.queue");
// 声明知乎问题队列
rabbitAdmin.declareQueue(ubdQueue);
// 返回知乎问题队列
return ubdQueue;
}
}
4.RabbitMQ的消费者
#RabbitMQ的消费者Receiver.java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.wei.you.zhihu.spider.entity.Question;
import com.wei.you.zhihu.spider.entity.User;
import com.wei.you.zhihu.spider.service.IQuestionService;
import com.wei.you.zhihu.spider.service.IUserService;
/**
* 消息消费者
*
* @author sunzc
*
* 2017年6月10日 上午9:07:58
*/
@Component
public class Receiver {
// 注入知乎用户资料服务
@Autowired
private IUserService userService;
// 注入知乎问题服务
@Autowired
private IQuestionService questionService;
/**
* 监听知乎用户资料消息队列
*
* @param user
*/
@RabbitListener(queues = "zhuhu.user.queue")
public void processZhihuUser(User user) {
// 保存知乎用户
userService.save(user);
}
/**
* 监听知乎问题消息队列
*
* @param user
*/
@RabbitListener(queues = "zhuhu.question.queue")
public void processZhihuQuestion(Question question) {
// 保存知乎用户
questionService.save(question);
}
}
5.RabbitMQ的生产者
#RabbitMQ的生产者Sender.java
import java.util.List;
import java.util.Map;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 队列消费者
*
* @author sunzc
*
*/
@Component
public class Sender {
/**
* 注入AmqpTemplate
*/
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* 发送用户行为数据到消息队列
*
* @param ubdList
*/
public void ubdSender(List<Map<String, String>> ubdList) {
// 指定exchange, route 和消息内容
this.rabbitTemplate.convertAndSend("ubd.queue", ubdList);
}
}
6.项目的开源地址