当前位置: 首页 > 工具软件 > zhihu-spider > 使用案例 >

zhihu-spider之RabbitMQ——zhihu-spider开源项目使用技术详解(其五)

唐泳
2023-12-01

zhihu-spider之RabbitMQ——zhihu-spider开源项目使用技术详解(其五)

1.RabbitMQ简介

  RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。

  RabbitMQ是由RabbitMQ Technologies Ltd开发并且提供商业支持的。该公司在2010年4月被SpringSource(VMWare的一个部门)收购。在2013年5月被并入Pivotal。其实VMWare,Pivotal和EMC本质上是一家的。不同的是VMWare是独立上市子公司,而Pivotal是整合了EMC的某些资源,现在并没有上市。

  官方地址:http://www.rabbitmq.com

2.应用场景

  对于一个大型的软件系统来说,它会有很多的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块的如何通信?这和传统的IPC有很大的区别。传统的IPC很多都是在单一系统上的,模块耦合性很大,不适合扩展(Scalability);如果使用socket那么不同的模块的确可以部署到不同的机器上,但是还是有很多问题需要解决。比如:

  1)信息的发送者和接收者如何维持这个连接,如果一方的连接中断,这期间的数据如何方式丢失?

  2)如何降低发送者和接收者的耦合度?

  3)如何让Priority高的接收者先接到数据?

  4)如何做到load balance?有效均衡接收者的负载?

  5)如何有效的将数据发送到相关的接收者?也就是说将接收者subscribe 不同的数据,如何做有效的filter。

  6)如何做到可扩展,甚至将这个通信模块发到cluster上?

  7)如何保证接收者接收到了完整,正确的数据?

  AMDQ协议解决了以上的问题,而RabbitMQ实现了AMQP。

3.配置RabbitMQ

//gradle依赖如下

compile "org.springframework.boot:spring-boot-starter-amqp:1.4.2.RELEASE"

//maven依赖如下

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>1.4.2.RELEASE</version>
</dependency>

(1).配置文件配置

#在application.yml做如下配置

spring:
  rabbitmq:
    host: 192.168.56.101
    port: 5672
    #username: admin
    #password: 123456

(2).Java Config配置

#在RabbitmqConfiguration.java做如下配置

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 
 * 
 * @author sunzc
 *
 *         2017年6月10日 上午9:41:26
 */
@Configuration
public class RabbitmqConfiguration {

    /**
     * 配置RabbitAdmin bean
     * 
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {

    // 创建并返回RabbitAdmin对象
    return new RabbitAdmin(connectionFactory);
    }

    /**
     * 配置知乎用户资料队列 bean
     * 
     * @param rabbitAdmin
     * @return
     */
    @Bean
    public Queue zhihuUserDataQueue(@Autowired RabbitAdmin rabbitAdmin) {

    // 创建知乎用户资料队列对象
    Queue ubdQueue = new Queue("zhuhu.user.queue");

    // 声明知乎用户资料队列
    rabbitAdmin.declareQueue(ubdQueue);

    // 返回知乎用户资料队列
    return ubdQueue;
    }

    /**
     * 配置知乎问题队列 bean
     * 
     * @param rabbitAdmin
     * @return
     */
    @Bean
    public Queue userBehaviorDataQueue(@Autowired RabbitAdmin rabbitAdmin) {

    // 创建知乎问题队列对象
    Queue ubdQueue = new Queue("zhuhu.question.queue");

    // 声明知乎问题队列
    rabbitAdmin.declareQueue(ubdQueue);

    // 返回知乎问题队列
    return ubdQueue;
    }
}

4.RabbitMQ的消费者

#RabbitMQ的消费者Receiver.java

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.wei.you.zhihu.spider.entity.Question;
import com.wei.you.zhihu.spider.entity.User;
import com.wei.you.zhihu.spider.service.IQuestionService;
import com.wei.you.zhihu.spider.service.IUserService;

/**
 * 消息消费者
 * 
 * @author sunzc
 *
 *         2017年6月10日 上午9:07:58
 */
@Component
public class Receiver {
    // 注入知乎用户资料服务
    @Autowired
    private IUserService userService;

    // 注入知乎问题服务
    @Autowired
    private IQuestionService questionService;

    /**
     * 监听知乎用户资料消息队列
     * 
     * @param user
     */
    @RabbitListener(queues = "zhuhu.user.queue")
    public void processZhihuUser(User user) {
    // 保存知乎用户
    userService.save(user);
    }

    /**
     * 监听知乎问题消息队列
     * 
     * @param user
     */
    @RabbitListener(queues = "zhuhu.question.queue")
    public void processZhihuQuestion(Question question) {
    // 保存知乎用户
    questionService.save(question);
    }
}

5.RabbitMQ的生产者

#RabbitMQ的生产者Sender.java

import java.util.List;
import java.util.Map;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 队列消费者
 * 
 * @author sunzc
 *
 */
@Component
public class Sender {

  /**
   * 注入AmqpTemplate
   */
  @Autowired
  private AmqpTemplate rabbitTemplate;

  /**
   * 发送用户行为数据到消息队列
   * 
   * @param ubdList
   */
  public void ubdSender(List<Map<String, String>> ubdList) {
    // 指定exchange, route 和消息内容
    this.rabbitTemplate.convertAndSend("ubd.queue", ubdList);
  }
}

6.项目的开源地址

https://github.com/sdc1234/zhihu-spider

 类似资料: