消息队列 RabbitMq kafka RocketMq mqtt

束阳旭
2023-12-01

区别对比

RabbitMq 环境搭建

1.搭建erlang 运行环境

curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/setup.rpm.sh' | sudo -E bash

yum install erlang

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

## install these dependencies from standard OS repositories

yum install socat logrotate -y

### 测试 erlang
erlang -version

  1. npm 安装 rabbitMq

官方下载 npm 包
rabbitMq官方

   ## 安装
rpm -ivh rabbitmq-server-3.8.16-1.el7.noarch.rpm
	
##启动守护进程
chkconfig rabbitmq-server on

##启动
/sbin/service rabbitmq-server start
##或者
systemctl start rabbitmq-server.service	

##状态
/sbin/service rabbitmq-server status
## 或者
systemctl status rabbitmq-server.service	
##停止
/sbin/service rabbitmq-server stop
##或者
systemctl stop rabbitmq-server.service


管理界面:
http://192.168.50.130:15672/
jiazh jiazh

指令

## help 指令
## 查看诊断和健康检测
rabbitmq-diagnostics status

#插件管理
rabbitmq-plugins
#开启操作界面
rabbitmq-plugins enable rabbitmq_management
## 查看已经存在的 插件
rabbitmq-plugins list

## 手动安装插件
## 下载地址 https://www.rabbitmq.com/community-plugins.html 
## 下载    rabbitmq_delayed_message_exchange-3.8.0.ez
## 移动到   /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.16/plugins/

## 安装插件  延迟插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 
 # 服务管理和常规任务操作
rabbitmqctl 
 
# 查看当前所有用户
$ sudo rabbitmqctl list_users
 
# 查看默认guest用户的权限
$ sudo rabbitmqctl list_user_permissions guest
 
# 由于RabbitMQ默认的账号用户名和密码都是guest。为了安全起见, 先删掉默认用户
$ sudo rabbitmqctl delete_user guest
 
# 添加新用户
$ sudo rabbitmqctl add_user username password
 
# 设置用户tag
$ sudo rabbitmqctl set_user_tags username administrator
 
# 设置jiazh管理员
rabbitmqctl set_user_tags jiazh administrator
http://192.168.50.130:15672/
jiazh jiazh
 
# 赋予用户默认vhost的全部操作权限
$ sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
 
# 查看用户的权限
$ sudo rabbitmqctl list_user_permissions username

#######################

## maintenance tasks on queues, in particular quorum queues

rabbitmq-queues 

##maintenance tasks related to upgrades
rabbitmq-upgrade  


springBoot 整合

ConnectionFactory 
RabbitTemplate 
	ReturnCallback 
	ReturnCallback 
	
 CorrelationData 
MessagePostProcessor 


RabbitListenerContainerFactory
  启动类 开启 监听 @EnableRabbit
	  @RabbitListene
	  @RabbitHandler
	  Message
	  Channel
<!--rabbitmq-->
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
  rabbitmq:
    host: 192.168.50.130
    username: jiazh
    password: jiazh
    port: 5672
    connection-timeout: 15000
    publisher-returns: true  #消息未被消费则原封不动返回,不被处理  returnListener  和 mandatory 配合使用

生产者
配置确认机制
配置 returnListener  confirmCallBackListener
ReturnCallback 实现类
ConfirmCallback 实现类

CorrelationData 消息唯一性配置

public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("本次消息的唯一标识是:" + correlationData);
        System.out.println("是否存在消息拒绝接收?" + ack);
        if(ack == false){
            System.out.println("消息拒绝接收的原因是:" + cause);
        }else{
            System.out.println("消息发送成功");
        }

        System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
    }
}


public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("err code :" + replyCode);
        System.out.println("错误消息的描述 :" + replyText);
        System.out.println("错误的交换机是 :" + exchange);
        System.out.println("错误的路右键是 :" + routingKey);


        System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
    }
}
package com.tcloud.mq.service.business.mq.config;



import com.tcloud.mq.service.business.mq.callBack.ConfirmCallBackListener;
import com.tcloud.mq.service.business.mq.callBack.ReturnCallBackListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {
    /**
     * 连接工厂
     */
    @Autowired
    private ConnectionFactory connectionFactory;

    /**
     * 自定义rabbit template用于数据的接收和发送
     * 可以设置消息确认机制和回调
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // template.setMessageConverter(); 可以自定义消息转换器  默认使用的JDK的,所以消息对象需要实现Serializable

        /**若使用confirm-callback或return-callback,
         * 必须要配置publisherConfirms或publisherReturns为true
         * 每个rabbitTemplate只能有一个confirm-callback和return-callback
         */
        template.setConfirmCallback(msgSendConfirmCallBack());

        /**
         * 使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true,
         * 可针对每次请求的消息去确定’mandatory’的boolean值,
         * 只能在提供’return -callback’时使用,与mandatory互斥
         */
        template.setReturnCallback(msgSendReturnCallback());
        template.setMandatory(true);
        return template;
    }

    /**
     * 消息确认机制
     * Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,
     * 哪些可能因为broker宕掉或者网络失败的情况而重新发布。
     * 确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)
     * 在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。
     * @return
     */
    @Bean
    public ConfirmCallBackListener msgSendConfirmCallBack(){
        return new ConfirmCallBackListener();
    }

    @Bean
    public ReturnCallBackListener msgSendReturnCallback(){
        return new ReturnCallBackListener();
    }


}
RabbitTemplate 
	设置callBackApi
		setConfirmCallback
		setReturnCallback
	推送api
		消息处理器 MessagePostProcessor
		
		convertAndSend
		convertSendAndReceive  //消费一个生成一个
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  ObjectMapper mapper=new ObjectMapper();
        TUser tUser = new TUser();
        tUser.setId("hello ll");
        // 声明消息处理器 这个对消息进行处理 可以设置一些参数 对消息进行一些定制化处理 我们这里 来设置消息的编码 以及消息的过期时间
        // 因为在.net 以及其他版本过期时间不一致 这里的时间毫秒值 为字符串
         MessagePostProcessor messagePostProcessor = message -> {
            MessageProperties messageProperties = message.getMessageProperties();
            // 设置编码
            messageProperties.setContentEncoding("utf-8");
            // 设置过期时间10*1000毫秒
             messageProperties.setExpiration("100000");

            // messageProperties.setHeader("x-delay", 10000);
            return message;
        };
        
  	rabbitmqTemplate.convertAndSend(
                "DL_EXCHANGE",//交换机 类型 direct topic fanout
                "DL_KEY",//routing key   topic 的是 可以 指定 order.* 匹配一个单词  order.# 后可以匹配多个单词
                mapper.writeValueAsString(tUser).getBytes(), //消息体
                messagePostProcessor,
                correlationData
        );
消费者
		 <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
消费者	
配置  RabbitListenerContainerFactory 信息处理格式
	@Configuration
	public class ConsumerConfig {
	
	    @Autowired
	    private ConnectionFactory connectionFactory;
	
	    @Bean
	    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
	        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
	        factory.setConnectionFactory(connectionFactory);
	
	        factory.setMessageConverter(new Jackson2JsonMessageConverter());
	        return factory;
	    }
	
	}

配置文件

spring:
  profiles: prod
  rabbitmq:
    host: 192.168.50.130
    username: jiazh
    password: jiazh
    port: 5672
    listener:
      direct:
        acknowledge-mode: manual
        prefetch: 10 #一次性从broker里面取的待消费的消息的个数
      simple:
       concurrency: 5 # listener在初始化的时候设置的并发消费者的个数
       retry:
         enabled: true # 允许消息消费失败的重试
         max-attempts: 3  # 消息最多消费次数3次
         initial-interval: 2000  # 消息多次消费的间隔2秒
启动类 配置  @EnableRabbit
	
消费业务	
		配置监听	
		配置handler
	
/*
  启动类 开启 监听 @EnableRabbit
  @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "REDIRECT_QUEUE",durable = "true"),
                    exchange = @Exchange(value = "DL_EXCHANGE",durable = "true"),
                    key = "KEY_R"
            )
    )
    durable  持久化配置
    @Payload  取到 Message 里的body  需要factory 配置 setMessageConverter
    @Headers 取到  Message 里的 MessageProperties
    @RabbitHandler
  */
	@Service
	public class Accept {
	
		@RabbitListener(queues = {"DL_QUEUE"})
	    @RabbitHandler
	 public  void acceptHandlerDle(
	                                  Message message,
	                                  Channel channel) throws IOException {
	        //消费者操作
	        MessageProperties messageProperties =  message.getMessageProperties();
	        Map<String, Object> headers = messageProperties.getHeaders();
	        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
	        String messaged=new String(message.getBody());
	        ObjectMapper mapper=new ObjectMapper();
	        TUser student=mapper.readValue(messaged.getBytes("utf-8"),TUser.class);
	        System.out.println("订单Id:"+student.getId());
	        channel.basicAck(messageProperties.getDeliveryTag(),true); //手动签收 DELIVERY_TAG  批量操作flage
	    }
	   }

pull || push

http://192.168.50.130:15672/  jiazh jiazh 

rabbitmq  topic  direct 
交换机 主题  路由

死信

死信定义:
1. 消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
2. 消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
3. 队列超载

 每个队列 都可以通过x-dead-letter-exchange 和  x-dead-letter-routing-key 来指定一个接口该队列处理失败或者超时处理的 死信接受队列。

延迟插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

列如: rabbitmq_delayed_message_exchange-20171215-3.6.x.ez 放到 rabbitmq 插件目录

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

rabbitmq-plugins disable rabbitmq_delayed_message_exchange


/*
需要安装延迟插件 rabbitmq_delay_message_exchange
特性一:Time To Live(TTL)

RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
RabbitMQ针对队列中的消息过期时间有两种方法可以设置。
A: 通过队列属性设置,队列中所有消息都有相同的过期时间。 如果消息消费失败或者时间到期,都会通过 x-dead-letter-routing-key 和 x-dead-letter-exchange 指定的交换机和队列内
 @Bean
    public Queue delayTTLQueue() {
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME); // 死信存储的队列 交换机名称
        paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY); // 死信存储的队列 路由key
        paramMap.put("x-message-ttl",3000); // 消息统一过期时间
        return new Queue(RabbitMqConfig.QUEUE_TTL_NAME,true,false,false,paramMap);
    }
B: 对消息进行单独设置,每条消息TTL可以不同。

    @Bean
    public Queue delayTTLQueue2() {
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
        paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
        return new Queue(RabbitMqConfig.QUEUE_TTL_NAME2,true,false,false,paramMap);
        }

   long expiration = i * 1000;
            rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,RabbitMqConfig.QUEUE_TTL_ROUTING_KEY2,
                    message+" the expiration time is "+expiration,new ExpirationMessagePostProcessor(expiration));


特性二:Dead Letter Exchanges(DLX) 死信交换机 来实现消息延迟。
    交换机类型 x-delayed-message
    交换机参数设置  x-delayed-type : direct

    发送消息 head设置  x-delay 指定 消息延迟 时间 单位毫秒。
    发送到该交换机 和 指定routeKey 绑定的队列里。实现延迟消息。


 */
@Configuration
public class DeadQueueConfig {


    /**
     * 死信队列 交换机标识符
     */
    private static final String DEAD_LETTER_EXCHANGE_KEY = "x-dead-letter-exchange";

    /**
     * 死信队列交换机绑定键标识符
     */
    private static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

    /**
     * 转发队列 bind routeKey
     */
    private static final String KEY_R = "KEY_R";

    /**
     * 转发队列 bind 交换机名称
     */
    private static final String EXCHANGE_R = "DEAD_REDIRECT_EXCHANGE";

    /**
     * 死信交换机
     * 死信队列跟交换机类型没有关系 不一定为directExchange 不影响该类型交换机的特性.
     *   durable="true" rabbitmq重启的时候不需要创建新的交换机
     */
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        // return (DirectExchange)
        // ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
        return new DirectExchange("DL_EXCHANGE", true, false);
    }

    /**
     * 声明一个死信队列.
     * 参数 x-dead-letter-exchange 对应 转发队列 绑定的 交换机名称
     * 参数 x-dead-letter-routing-key 对应 转发队列 绑定的  routKey
     * 可以设置统一超时时间  x-message-ttl  单位毫秒
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange 声明 死信转发队列交换机
        args.put(DEAD_LETTER_EXCHANGE_KEY, EXCHANGE_R);
        // x-dead-letter-routing-key 声明 死信转发队列路由键
        args.put(DEAD_LETTER_ROUTING_KEY, KEY_R);
        return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
    }

    /**
     * 死信队列通过 DL_KEY 绑定键绑定到死信交换机器上.
     *
     * @return the binding
     */
    @Bean
    public Binding deadLetterBinding() {
        return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);
    }

    /**
     * 定义死信队列转发队列.
     * 场景一 :发送消息到死信队列。订阅转发队列 则实现 消息统一延迟消费的功能
     * 场景二 :发送和订阅都是死信队列。则该队列为消息处理失败和超时的 异常信息队列。
     * /**
     *      durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
     *      auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
     *      exclusive  表示该消息队列是否只在当前connection生效,默认是false
     *
     *       new Queue(RabbitMqConfig.QUEUE_NAME1,true,false,false);
     * @return the queue
     */
    @Bean("redirectQueue")
    public Queue redirectQueue() {
        return QueueBuilder.durable("DEAD_REDIRECT_QUEUE").build();
    }

    /**
     * 转发队列交换机
     */
    @Bean("redirectExchange")
    public DirectExchange redirectExchange() {
        // return (DirectExchange)
        // ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
        return new DirectExchange(EXCHANGE_R, true, false);
    }
    /**
     * 转发队列 通过通过 KEY_R 绑定键绑定到转发队列交换机上.
     *
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() {
        return new Binding("DEAD_REDIRECT_QUEUE", Binding.DestinationType.QUEUE, EXCHANGE_R, KEY_R, null);
    }

}

延迟消息

延时队列实现思路

1.如果你的消息 TTL 过期值是可变的,可以尝试下使用 Delayed Message 插件,声明 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay 以毫秒为单位将消息进行延迟投递。
延迟交换机

Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-delayed-type", "direct");
channel.exchangeDeclare("DELAY_EXCHANGE", "x-delayed-message", false,false, argss);


Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 1000*60*30);//30分钟

AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);

channel.basicPublish("DELAY_EXCHANGE", "DELAY_KEY", props.build(),msg.getBytes());
/*
需要安装延迟插件 rabbitmq_delay_message_exchange

特性二:Dead Letter Exchanges(DLX) 死信交换机 来实现消息延迟。

    交换机类型 x-delayed-message
    交换机参数设置  x-delayed-type : direct

    发送消息 head设置  x-delay 指定 消息延迟 时间 单位毫秒。
    发送到该交换机 和 指定routeKey 绑定的队列里。实现延迟消息。

    队列属性设置死信属性和转发路由,实现消息处理异常,转发到别的队列。

 */
@Configuration
public class DelayQueueConfig {


    /**
     * 死信队列 交换机标识符
     */
    private static final String DEAD_LETTER_EXCHANGE_KEY = "x-dead-letter-exchange";

    /**
     * 死信队列交换机绑定键标识符
     */
    private static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

    /**
     * 转发死信 routeKey
     */
    private static final String KEY_R = "KEY_R";

    /**
     * 转发队列 bind 交换机名称
     */
    private static final String EXCHANGE_R = "DELAY_DEAD_REDIRECT_EXCHANGE";

    /**
     *   延迟交换机
     *   durable="true" rabbitmq重启的时候不需要创建新的交换机 持久化
     *   autoDelete=true 自动删除
     */
    @Bean("delayLetterExchange")
    public CustomExchange delayLetterExchange() {
        //该头 必须添加  配合 消息失效时间实现 不通消息不同延迟效果
        Map<String , Object> exchangeParams = new HashMap<>();
        exchangeParams.put("x-delayed-type" , "direct");
        return new CustomExchange("DELAY_EXCHANGE", "x-delayed-message" ,true, false ,exchangeParams);
    }

    /**
     * 声明一个延迟队列.
     * 设置消息过期和失败 变为死信
     * 参数 x-dead-letter-exchange 对应 转发队列 绑定的 交换机名称
     * 参数 x-dead-letter-routing-key 对应 转发队列 绑定的  routKey
     * 可以设置统一超时时间  x-message-ttl
     */
    @Bean("delayLetterQueue")
    public Queue delayLetterQueue() {

        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange 声明 死信转发队列交换机
        args.put(DEAD_LETTER_EXCHANGE_KEY, EXCHANGE_R);
        // x-dead-letter-routing-key 声明 死信转发队列路由
        args.put(DEAD_LETTER_ROUTING_KEY, KEY_R);
        //
        return QueueBuilder.durable("DELAY_QUEUE").withArguments(args).build();
    }

    /**
     * 延迟队列 通过 DELAY_KEY 绑定键绑定到延迟交换机上.
     *
     * @return the binding
     */
    @Bean
    public Binding delayLetterBinding() {
        return new Binding("DELAY_QUEUE", Binding.DestinationType.QUEUE, "DELAY_EXCHANGE", "DELAY_KEY", null);
    }

    /**
     * 定义死信队列转发队列.
     * 场景一 :发送消息到死信队列。订阅转发队列 则实现 消息统一延迟消费的功能
     * 场景二 :发送和订阅都是死信队列。则该队列为消息处理失败和超时的 异常信息队列。
     * /**
     *      durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
     *      auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
     *      exclusive  表示该消息队列是否只在当前connection生效,默认是false
     *
     *       new Queue(RabbitMqConfig.QUEUE_NAME1,true,false,false);
     * @return the queue
     */
    @Bean("deadRedirectQueue")
    public Queue deadRedirectQueue() {
        return QueueBuilder.durable("DELAY_DEAD_REDIRECT_QUEUE").build();
    }

    /**
     *   死信转发交换机
     *   durable="true" rabbitmq重启的时候不需要创建新的交换机 持久化
     *   autoDelete=true 自动删除
     */
    @Bean("deadRedirectExchange")
    public DirectExchange deadRedirectExchange() {
        return new DirectExchange("DELAY_DEAD_REDIRECT_EXCHANGE", true, false );
    }

    /**
     * 转发队列 KEY_R 绑定键绑定到延死信转发交换机上
     *
     * @return the binding
     */
    @Bean
    public Binding deadRedirectBinding() {
        return new Binding("DELAY_DEAD_REDIRECT_QUEUE", Binding.DestinationType.QUEUE, EXCHANGE_R, KEY_R, null);
    }
}

2.DLX + TTL 做延迟消息,但是适合固定延迟的场景

   //normalQueue
   
   Map<String, Object> map = new HashMap<>();
    map.put("x-message-ttl", 10000);//设置10s过期时间
    //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
    map.put("x-dead-letter-exchange", "delay_exchange");
    //x-dead-letter-routing-key参数是给这个DLX指定路由键
    map.put("x-dead-letter-routing-key", "delay_key");
    new Queue(NORMAL_QUEUE, true, false, false, map);
	
//normalExchange

	new DirectExchange("normal_exchange", true, false)
	
	BindingBuilder.bind(normalQueue).to(normalExchange).with("normal_key");
	
//delayQueue	
    new Queue("delay_queue")
//delayExchange	
    new DirectExchange("delay_exchange", true, false)
 
	BindingBuilder.bind(delayQueue).to(delayExchange).with("delay_key")
	

  convertAndSend("normal_exchange", "normal_key", order)

  监听delay_queue
  
  发动消息到  normalQueue -> 时间到 -> 消息转到 延迟队列 x-dead-letter-routing-key 绑定的队列 delay_queue

 伪集群

消息过滤

消息应答机制

生产者只会保证 是否投递成功  confirm机制(确认应答机制)。
消费者采用手动ack应答模式,采用MQ进行补偿重试机制,注意MQ补偿幂等性问题。  只是通知清不清除该消息。 不会通知生产者

rabbitMq 进行事务处理的方式时 ,存在正常的派单队列 和 补单队列(延时队列) 应对  正常派单处理失败,延时再处理一次
 交换机采用路由键模式,补单队列和派但队列都绑定同一个路由键
 // 1.定义订单队列
    @Bean
    public Queue directOrderDicQueue() {
        return new Queue(ORDER_DIC_QUEUE);
    }

    // 2.定义补订单队列
    @Bean
    public Queue directCreateOrderQueue() {
        return new Queue(ORDER_CREATE_QUEUE);
    }

    // 2.定义交换机
    @Bean
    DirectExchange directOrderExchange() {
        return new DirectExchange(ORDER_EXCHANGE_NAME);
    }

    // 3.订单队列与交换机绑定
    @Bean
    Binding bindingExchangeOrderDicQueue() {
        return BindingBuilder.bind(directOrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }

    // 3.补单队列与交换机绑定
    @Bean
    Binding bindingExchangeCreateOrder() {
        return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange()).with("orderRoutingKey");
    }

实现分布式事务

分布式系统的三个指标
	1、Consistency	一致性
	2、Availability	可用性
	3、Partition tolerance	分区容错性
	
	eureka基于AP
	zookeeper基于CP
	
分布式事务 实现方式为:
		强一致
		最终一致

实现的为最终一致性事务控制
A 服务发生事务 -> 发送业务到MQ -> 发送给成功后A 服务进行 事务提交 如果成功 则通知 MQ 进行投递 此期间(消息为半消息)->MQ 得知 A服务已经成功完成事务操作 则进行消息投递-> B服务进行第二部业务操作 ,B成功不成功 都会想办法让他成功(比如如果为网络问题,就进行二次投递,如果为代码问题 进行人工处理) 

KafKa

只支持 pull 模式
官网

kafka节点之间如何复制备份的?
kafka消息是否会丢失?为什么?
kafka最合理的配置是什么?
kafka的leader选举机制是什么?
kafka对硬件的配置有什么要求?
kafka的消息保证有几种方式?
kafka为什么会丢消息?
### 先启动zk
bash /usr/local/zk/bin/zkServer.sh stop
bash /usr/local/zk/bin/zkServer.sh start
bash /usr/local/zk/bin/zkServer.sh status

### 启动service
bash /usr/local/kafka/bin/kafka-server-start.sh   -daemon  /usr/local/kafka/config/server.properties

#两个topic:topic1、topic2,其分区和副本数都设置为2
 cd /usr/local/kafka/bin
 bash  /kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic topic2
 bash kafka-topics.sh --list --zookeeper 127.0.0.1:2181
## 某队列的描述信息
 bash kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic topic1
 bash kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
SDK 集成
   <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <scope>provided<
  </dependency>

集成springBoot

核心类:

 	KafkaTemplate   
	 ListenableFuture<SendResult<String, String>>
	 
	@KafkaListener 
    ConsumerFactory
    ConcurrentKafkaListenerContainerFactory
    KafkaListenerEndpointRegistry
    
    ConsumerRecord<?, ?> record
    
依赖
 <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
 </dependency>

生产端

spring:
  kafka:
    bootstrap-servers: 192.168.50.132:9092
    producer:
      retries: 1 #设置大于零 会对发送失败的消息重新发送
      batch-size: 16384 #每次批量发送的消息数
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

@Component
public class KafkaSender {
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 发送消息到kafka
     *@param topic 主题
     *@param message 内容体
     */
    public void sendMsg(String topic ,String key ,String message){
        ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, key, message);
        send.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            public void onFailure(Throwable throwable) {

            }
            public void onSuccess(SendResult<String, String> integerStringSendResult) {
                System.out.println(integerStringSendResult.getProducerRecord());
            }
        });

    }
}
消费端
//消费者
spring:
  kafka:
    bootstrap-servers: 192.168.50.132:9092
    listener:
      missing-topics-fatal: true # 没有队列启动报错
      type: single
    consumer:
      group-id: defaultConsumerGroup
      auto-commit-interval: 100
      auto-offset-reset: latest
      enable-auto-commit: true
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #max-poll-records: 50  listener 配置bath 处理时设置每次处理的最多消息
      
消费配置 消息过滤和自动启动监听 和异常回调方法
@Configuration
public class KafkaCustomerConfig {

    @Autowired
    private ConsumerFactory consumerFactory;

    // 监听器容器工厂(设置禁止KafkaListener自启动)
    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止KafkaListener自启动 实现定时开始消费
       // container.setAutoStartup(false);
        // 设置消息过滤
        // 被过滤的消息将被丢弃
        container.setAckDiscarded(true);
        container.setRecordFilterStrategy(consumerRecord -> {
            //针对性的 对key value offset 进行过滤
            if (consumerRecord.offset() % 2 == 0) {  //只处理但记录消息
                return false;
            }
            //返回true消息则被过滤
            return true;
        });
        return container;
    }


    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
        return (message, exception, consumer) -> {
            System.out.println("消费异常:"+message.getPayload());
            return null;
        };
    }


}

配置固定时间开启消费,固定时间关闭消费 需要配置关闭启动自动开启任务
@Component
public class CustomerService {
    @Resource
    private KafkaListenerEndpointRegistry registry;
    // 定时启动监听器

    @Scheduled(cron = "0 42 11 * * ? ")
    public void startListener() {
        System.out.println("启动监听器...");
        // "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
        Collection<MessageListenerContainer> listenerContainers = registry.getListenerContainers();
        for(MessageListenerContainer messageListenerContainer: listenerContainers){
            if(!messageListenerContainer.isRunning()){
                messageListenerContainer.start();
            }
        }
        //registry.getListenerContainer("timingConsumer").resume();
    }

    // 定时停止监听器
    @Scheduled(cron = "0 45 11 * * ? ")
    public void shutDownListener() {
        System.out.println("关闭监听器...");
        registry.getListenerContainer("timingConsumer").pause();
    }

}

配置监听方法 还可以实现 消息转发 消息转发 手动提交变更offset 异常回调方法
@KafkaListener
@KafkaListeners
@SendTo(“topic2”)

@Component
public class KafkaCustomerListener {

    @KafkaListener(topics = {"topic1"},id="timingConsumerSingle",errorHandler ="consumerAwareErrorHandler")  //配置group 没懂
    //@SendTo("topic2") 转发消息
    public void listenSingle (ConsumerRecord<?, ?> record) throws Exception {
        System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
    }


    @KafkaListener(topics = {"topic1"},id="timingConsumerBatch",errorHandler ="consumerAwareErrorHandler")
    public void listenBatch (List<ConsumerRecord<?, ?>> records) throws Exception {
        for (ConsumerRecord<?, ?> record : records) {
            System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
        }
    }

}

顺序消息

顺序消息

kafka 发送消息 指定 partitionKey 使其发送到同一个 partion 里

kafka接收 log4j 日志 (未实现)

    <appender name="KafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
        <topic>loges</topic>
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" />
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
        <producerConfig>bootstrap.servers=ip:9093,ip:9094,ip:9095</producerConfig>
    </appender>

问题处理

RocketMq

官方Doc

linux 安装 java + maven 环境

linux windows 安装

下载地址

  1. java mvn 打包
    mvn -Prelease-all -DskipTests clean install -U

    cd distribution/target/rocketmq-4.9.3/rocketmq-4.9.3
    rocketmq-4.9.3.tar.gz
    rocketmq-4.9.3.zip

  2. Start Name Server

sh bin/mqnamesrv & tail -f ~/logs/rocketmqlogs/namesrv.log

  1. Start Broker

sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log

Shutdown Servers

 sh bin/mqshutdown namesrv
 sh bin/mqshutdown broker

启动内存 设置(JAVA_OPT=“${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m”) 原设置太大 报out of memory

SDK 集成
   <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <scope>provided</scope>
	</dependency>
SpringBoot 集成
RocketMQTemplate  (发送消息 设置 topic properties tag  deploy等)
		
消费端 指定 订阅的队列 filter  组
@RocketMQMessageListener(selectorExpression = "name is not null and name = jack",topic = "test-topic",consumerGroup = "consumer-group1",selectorType = SelectorType.SQL92 )

@RocketMQMessageListener(selectorExpression = "*",topic = "test-topic",consumerGroup = "consumer-group1",selectorType = SelectorType.TAG )

 一个 listener    (DefaultRocketMQListenerContainer) 启动一个 consumer    handleMessage
/**
 * consumerListener
 *  核心类 ListenerContainerConfiguration  DefaultRocketMQListenerContainer
 * @author Jiazh
 * @date 2022/03/24 11:17
 **/
/**
 * 数值比较, 例如>, >=, <, <=, BETWEEN, =;
 * 字符比较, 例如 =, <>, IN;
 * IS NULL 或者 IS NOT NULL;
 * 逻辑语法, AND, OR, NOT;
 *
 * RocketMQReplyListener 可以再回传一个消息
 * RocketMQListener
 */

@Service
@RocketMQMessageListener(selectorExpression = "*",
        topic = "test-topic",
        consumerGroup = "consumer-group1",
        selectorType = SelectorType.TAG,
        messageModel = MessageModel.CLUSTERING, // 同组内消费模式设置
        consumeMode = ConsumeMode.CONCURRENTLY // 客户端接受消息 是并发 还是 顺序
)
public class ConsumerListener implements RocketMQListener<String>  {
    @Override
    public void onMessage(String s) {

    }
}

• 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
• 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
• 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
• 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。
总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。
通过合理的使用 Topic 和 Tag,可以让业务结构清晰,更可以提高效率。

dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>


yml 配置

	  rocketmq:
		nameserver:
		producer

实现本地事务监听 即 半消息 投递 控制,实现分布式事务的最终一致性

RocketMQTemplate 发送信息指定tag 是在 tipic “:” 后

 RocketMQUtil.getAndWrapMessage 方法解析 了 topic:tags   是topic 和 tag 放在一起的   MessageHeaders 放 userProperties, KEYS,消息类型之类的设置
  MessageHeaderAccessor accessor = new MessageHeaderAccessor();
        Map<String, Object> stringMap =  new HashMap<>();
            stringMap.put("KEYS",1111);
            stringMap.put("name","jack");
        accessor.copyHeaders(stringMap);
     //setHeaders 包含 KEYS userPropertis  
	 Message<String> msg = MessageBuilder.withPayload("Hello world!").setHeaders(accessor).build();
	 //发送事务类型消息 半消息
	 t.sendMessageInTransaction("Topic2:TagA", msg, null);

  //发送消息
    t.convertAndSend("Topic1:TagA", "Hello world! ");

    //发送spring的Message
    t.send("Topic1:TagA", MessageBuilder.withPayload("Hello world! ").build());

    //发送异步消息
    t.asyncSend("Topic1:TagA", "Hello world!", new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("发送成功");
        }

        @Override
        public void onException(Throwable throwable) {
            System.out.println("发送失败");
        }
    });

    //发送顺序消息
    t.syncSendOrderly("Topic1", "98456237,创建", "98456237");
    t.syncSendOrderly("Topic1", "98456237,支付", "98456237");
    t.syncSendOrderly("Topic1", "98456237,完成", "98456237");

事务监听类


@RocketMQTransactionListener
class Lis implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("执行本地事务");
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        System.out.println("执行事务回查");
        return RocketMQLocalTransactionState.COMMIT;
    }
}
tag keys userProperties 作用
keys 是每个消息上携带的 信息。 于订阅没直接关系
tag 和 userProperties 可以用于sunscribe 订阅时进行信息筛选。  userProperties 需要使用 实现 MessageFilter 的实体或者 sql过滤语法
rocketmq-client 集成
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.9.3</version>
      <scope>provided</scope>
    </dependency>
	
rocketMq 消费端 pull push 模式
pull模式需要我们手动调用consumer拉消息,
而push模式则只需要我们提供一个listener即可实现对消息的监听

push模式

         DefaultMQPushConsumer   consumer = new DefaultMQPushConsumer("group_name_push"); //push 模式客户端 
		  
		 consumer 订阅队列, 指定过滤规则 tag 活 sql92 , 和表达式
		 
		 tag  "TAG1 || TAG2"
		 sql92 :
				 数值比较, 例如>, >=, <, <=, BETWEEN, =;
				字符比较, 例如 =, <>, IN;
				IS NULL 或者 IS NOT NULL;
				逻辑语法, AND, OR, NOT;
		
			 eq
			    consumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + "and (a is not null and a between 0 and 3)"))
		 
		
		 registerMessageListener 设置回调处理   成功 失败 延迟 等状态
		 
		  customer model
				ConsumeConcurrentlyContext   并发消费   
				MessageListenerOrderly       顺序消费   
				
		 messageModel
		   消费端 设置 MessageModel 创建的消费者 广播(同组的消费者都会消费一编)  还是 集群模式(同组的消费者,平坦消息设置 消费位置)  

pull模式

		 1.  DefaultMQPullConsumer  过时了   DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group_name_pull");//  pull 模式客户端 

		 2.  MQPullConsumerScheduleService    PullTaskCallback 的实现回调方法   MessageModel   
					1. setCallbackTable  或者  registerPullTaskCallback 注册某个topic的 回调.      核心时 putTask 方法创建   PullTaskImpl  线程 
生产端 producer
1.DefaultMQProducer   send 方法  Message  可以设置延迟等级 及 其他参数用于过滤     
	 批量发送
	 
     Message  可以设置延迟等级 默认 0 
		 private Map<String, String> properties;   里设置的参数 可以用于订阅时候 过滤  consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");


2.TransactionMQProducer  发送发送半消息 事务的 客户端

    1.设TransactionListener 的实现(executeLocalTransaction 方法时根据 msg信息 和 arg 参数 进行一些的事务操作。checkLocalTransaction   用于校验数据 根据是否成功来 返回 回滚,提交的状态. 用于半消息的完整发送 )
    2.调用sendMessageInTransaction  发送 信息和参数 

zookeeper

实际上基于zk的分布式锁目前已经有现成的实现框架,Curator就是Netflix开源的一套ZooKeeper客户端框架,它提供了zk场景的绝大部分实现,使用Curator就不必关心其内部算法,Curator提供了来实现分布式锁,用方法获取锁,以及用方法释放锁,同其他锁一样,方法需要放在finakky代码块中,确保锁能正确释放

	zk 实现的分布式锁方式,1业务获取锁, 判断锁是否占用,占用则进行等待释放 未占用 创建zk节点 用于锁 释放锁
 类似资料: