1.搭建erlang 运行环境
curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/setup.rpm.sh' | sudo -E bash
yum install erlang
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
## install these dependencies from standard OS repositories
yum install socat logrotate -y
### 测试 erlang
erlang -version
官方下载 npm 包
rabbitMq官方
## 安装
rpm -ivh rabbitmq-server-3.8.16-1.el7.noarch.rpm
##启动守护进程
chkconfig rabbitmq-server on
##启动
/sbin/service rabbitmq-server start
##或者
systemctl start rabbitmq-server.service
##状态
/sbin/service rabbitmq-server status
## 或者
systemctl status rabbitmq-server.service
##停止
/sbin/service rabbitmq-server stop
##或者
systemctl stop rabbitmq-server.service
管理界面:
http://192.168.50.130:15672/
jiazh jiazh
## help 指令
## 查看诊断和健康检测
rabbitmq-diagnostics status
#插件管理
rabbitmq-plugins
#开启操作界面
rabbitmq-plugins enable rabbitmq_management
## 查看已经存在的 插件
rabbitmq-plugins list
## 手动安装插件
## 下载地址 https://www.rabbitmq.com/community-plugins.html
## 下载 rabbitmq_delayed_message_exchange-3.8.0.ez
## 移动到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.16/plugins/
## 安装插件 延迟插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 服务管理和常规任务操作
rabbitmqctl
# 查看当前所有用户
$ sudo rabbitmqctl list_users
# 查看默认guest用户的权限
$ sudo rabbitmqctl list_user_permissions guest
# 由于RabbitMQ默认的账号用户名和密码都是guest。为了安全起见, 先删掉默认用户
$ sudo rabbitmqctl delete_user guest
# 添加新用户
$ sudo rabbitmqctl add_user username password
# 设置用户tag
$ sudo rabbitmqctl set_user_tags username administrator
# 设置jiazh管理员
rabbitmqctl set_user_tags jiazh administrator
http://192.168.50.130:15672/
jiazh jiazh
# 赋予用户默认vhost的全部操作权限
$ sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
# 查看用户的权限
$ sudo rabbitmqctl list_user_permissions username
#######################
## maintenance tasks on queues, in particular quorum queues
rabbitmq-queues
##maintenance tasks related to upgrades
rabbitmq-upgrade
ConnectionFactory
RabbitTemplate
ReturnCallback
ReturnCallback
CorrelationData
MessagePostProcessor
RabbitListenerContainerFactory
启动类 开启 监听 @EnableRabbit
@RabbitListene
@RabbitHandler
Message
Channel
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.50.130
username: jiazh
password: jiazh
port: 5672
connection-timeout: 15000
publisher-returns: true #消息未被消费则原封不动返回,不被处理 returnListener 和 mandatory 配合使用
配置确认机制
配置 returnListener confirmCallBackListener
ReturnCallback 实现类
ConfirmCallback 实现类
CorrelationData 消息唯一性配置
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("本次消息的唯一标识是:" + correlationData);
System.out.println("是否存在消息拒绝接收?" + ack);
if(ack == false){
System.out.println("消息拒绝接收的原因是:" + cause);
}else{
System.out.println("消息发送成功");
}
System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
}
}
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("err code :" + replyCode);
System.out.println("错误消息的描述 :" + replyText);
System.out.println("错误的交换机是 :" + exchange);
System.out.println("错误的路右键是 :" + routingKey);
System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
}
}
package com.tcloud.mq.service.business.mq.config;
import com.tcloud.mq.service.business.mq.callBack.ConfirmCallBackListener;
import com.tcloud.mq.service.business.mq.callBack.ReturnCallBackListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
/**
* 连接工厂
*/
@Autowired
private ConnectionFactory connectionFactory;
/**
* 自定义rabbit template用于数据的接收和发送
* 可以设置消息确认机制和回调
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// template.setMessageConverter(); 可以自定义消息转换器 默认使用的JDK的,所以消息对象需要实现Serializable
/**若使用confirm-callback或return-callback,
* 必须要配置publisherConfirms或publisherReturns为true
* 每个rabbitTemplate只能有一个confirm-callback和return-callback
*/
template.setConfirmCallback(msgSendConfirmCallBack());
/**
* 使用return-callback时必须设置mandatory为true,或者在配置中设置mandatory-expression的值为true,
* 可针对每次请求的消息去确定’mandatory’的boolean值,
* 只能在提供’return -callback’时使用,与mandatory互斥
*/
template.setReturnCallback(msgSendReturnCallback());
template.setMandatory(true);
return template;
}
/**
* 消息确认机制
* Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,
* 哪些可能因为broker宕掉或者网络失败的情况而重新发布。
* 确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)
* 在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。
* @return
*/
@Bean
public ConfirmCallBackListener msgSendConfirmCallBack(){
return new ConfirmCallBackListener();
}
@Bean
public ReturnCallBackListener msgSendReturnCallback(){
return new ReturnCallBackListener();
}
}
RabbitTemplate
设置callBackApi
setConfirmCallback
setReturnCallback
推送api
消息处理器 MessagePostProcessor
convertAndSend
convertSendAndReceive //消费一个生成一个
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
ObjectMapper mapper=new ObjectMapper();
TUser tUser = new TUser();
tUser.setId("hello ll");
// 声明消息处理器 这个对消息进行处理 可以设置一些参数 对消息进行一些定制化处理 我们这里 来设置消息的编码 以及消息的过期时间
// 因为在.net 以及其他版本过期时间不一致 这里的时间毫秒值 为字符串
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置编码
messageProperties.setContentEncoding("utf-8");
// 设置过期时间10*1000毫秒
messageProperties.setExpiration("100000");
// messageProperties.setHeader("x-delay", 10000);
return message;
};
rabbitmqTemplate.convertAndSend(
"DL_EXCHANGE",//交换机 类型 direct topic fanout
"DL_KEY",//routing key topic 的是 可以 指定 order.* 匹配一个单词 order.# 后可以匹配多个单词
mapper.writeValueAsString(tUser).getBytes(), //消息体
messagePostProcessor,
correlationData
);
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
消费者
配置 RabbitListenerContainerFactory 信息处理格式
@Configuration
public class ConsumerConfig {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
配置文件
spring:
profiles: prod
rabbitmq:
host: 192.168.50.130
username: jiazh
password: jiazh
port: 5672
listener:
direct:
acknowledge-mode: manual
prefetch: 10 #一次性从broker里面取的待消费的消息的个数
simple:
concurrency: 5 # listener在初始化的时候设置的并发消费者的个数
retry:
enabled: true # 允许消息消费失败的重试
max-attempts: 3 # 消息最多消费次数3次
initial-interval: 2000 # 消息多次消费的间隔2秒
启动类 配置 @EnableRabbit
消费业务
配置监听
配置handler
/*
启动类 开启 监听 @EnableRabbit
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "REDIRECT_QUEUE",durable = "true"),
exchange = @Exchange(value = "DL_EXCHANGE",durable = "true"),
key = "KEY_R"
)
)
durable 持久化配置
@Payload 取到 Message 里的body 需要factory 配置 setMessageConverter
@Headers 取到 Message 里的 MessageProperties
@RabbitHandler
*/
@Service
public class Accept {
@RabbitListener(queues = {"DL_QUEUE"})
@RabbitHandler
public void acceptHandlerDle(
Message message,
Channel channel) throws IOException {
//消费者操作
MessageProperties messageProperties = message.getMessageProperties();
Map<String, Object> headers = messageProperties.getHeaders();
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
String messaged=new String(message.getBody());
ObjectMapper mapper=new ObjectMapper();
TUser student=mapper.readValue(messaged.getBytes("utf-8"),TUser.class);
System.out.println("订单Id:"+student.getId());
channel.basicAck(messageProperties.getDeliveryTag(),true); //手动签收 DELIVERY_TAG 批量操作flage
}
}
http://192.168.50.130:15672/ jiazh jiazh
rabbitmq topic direct
交换机 主题 路由
死信定义:
1. 消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
2. 消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
3. 队列超载
每个队列 都可以通过x-dead-letter-exchange 和 x-dead-letter-routing-key 来指定一个接口该队列处理失败或者超时处理的 死信接受队列。
延迟插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
列如: rabbitmq_delayed_message_exchange-20171215-3.6.x.ez 放到 rabbitmq 插件目录
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
/*
需要安装延迟插件 rabbitmq_delay_message_exchange
特性一:Time To Live(TTL)
RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
RabbitMQ针对队列中的消息过期时间有两种方法可以设置。
A: 通过队列属性设置,队列中所有消息都有相同的过期时间。 如果消息消费失败或者时间到期,都会通过 x-dead-letter-routing-key 和 x-dead-letter-exchange 指定的交换机和队列内
@Bean
public Queue delayTTLQueue() {
Map<String,Object> paramMap = new HashMap<>();
paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME); // 死信存储的队列 交换机名称
paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY); // 死信存储的队列 路由key
paramMap.put("x-message-ttl",3000); // 消息统一过期时间
return new Queue(RabbitMqConfig.QUEUE_TTL_NAME,true,false,false,paramMap);
}
B: 对消息进行单独设置,每条消息TTL可以不同。
@Bean
public Queue delayTTLQueue2() {
Map<String,Object> paramMap = new HashMap<>();
paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
return new Queue(RabbitMqConfig.QUEUE_TTL_NAME2,true,false,false,paramMap);
}
long expiration = i * 1000;
rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,RabbitMqConfig.QUEUE_TTL_ROUTING_KEY2,
message+" the expiration time is "+expiration,new ExpirationMessagePostProcessor(expiration));
特性二:Dead Letter Exchanges(DLX) 死信交换机 来实现消息延迟。
交换机类型 x-delayed-message
交换机参数设置 x-delayed-type : direct
发送消息 head设置 x-delay 指定 消息延迟 时间 单位毫秒。
发送到该交换机 和 指定routeKey 绑定的队列里。实现延迟消息。
*/
@Configuration
public class DeadQueueConfig {
/**
* 死信队列 交换机标识符
*/
private static final String DEAD_LETTER_EXCHANGE_KEY = "x-dead-letter-exchange";
/**
* 死信队列交换机绑定键标识符
*/
private static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
/**
* 转发队列 bind routeKey
*/
private static final String KEY_R = "KEY_R";
/**
* 转发队列 bind 交换机名称
*/
private static final String EXCHANGE_R = "DEAD_REDIRECT_EXCHANGE";
/**
* 死信交换机
* 死信队列跟交换机类型没有关系 不一定为directExchange 不影响该类型交换机的特性.
* durable="true" rabbitmq重启的时候不需要创建新的交换机
*/
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
// return (DirectExchange)
// ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
return new DirectExchange("DL_EXCHANGE", true, false);
}
/**
* 声明一个死信队列.
* 参数 x-dead-letter-exchange 对应 转发队列 绑定的 交换机名称
* 参数 x-dead-letter-routing-key 对应 转发队列 绑定的 routKey
* 可以设置统一超时时间 x-message-ttl 单位毫秒
*/
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 声明 死信转发队列交换机
args.put(DEAD_LETTER_EXCHANGE_KEY, EXCHANGE_R);
// x-dead-letter-routing-key 声明 死信转发队列路由键
args.put(DEAD_LETTER_ROUTING_KEY, KEY_R);
return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
}
/**
* 死信队列通过 DL_KEY 绑定键绑定到死信交换机器上.
*
* @return the binding
*/
@Bean
public Binding deadLetterBinding() {
return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);
}
/**
* 定义死信队列转发队列.
* 场景一 :发送消息到死信队列。订阅转发队列 则实现 消息统一延迟消费的功能
* 场景二 :发送和订阅都是死信队列。则该队列为消息处理失败和超时的 异常信息队列。
* /**
* durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
* auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
* exclusive 表示该消息队列是否只在当前connection生效,默认是false
*
* new Queue(RabbitMqConfig.QUEUE_NAME1,true,false,false);
* @return the queue
*/
@Bean("redirectQueue")
public Queue redirectQueue() {
return QueueBuilder.durable("DEAD_REDIRECT_QUEUE").build();
}
/**
* 转发队列交换机
*/
@Bean("redirectExchange")
public DirectExchange redirectExchange() {
// return (DirectExchange)
// ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
return new DirectExchange(EXCHANGE_R, true, false);
}
/**
* 转发队列 通过通过 KEY_R 绑定键绑定到转发队列交换机上.
*
* @return the binding
*/
@Bean
public Binding redirectBinding() {
return new Binding("DEAD_REDIRECT_QUEUE", Binding.DestinationType.QUEUE, EXCHANGE_R, KEY_R, null);
}
}
延时队列实现思路
1.如果你的消息 TTL 过期值是可变的,可以尝试下使用 Delayed Message 插件,声明 x-delayed-message 类型的 Exchange,消息发送时指定消息头 x-delay 以毫秒为单位将消息进行延迟投递。
延迟交换机
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-delayed-type", "direct");
channel.exchangeDeclare("DELAY_EXCHANGE", "x-delayed-message", false,false, argss);
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 1000*60*30);//30分钟
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("DELAY_EXCHANGE", "DELAY_KEY", props.build(),msg.getBytes());
/*
需要安装延迟插件 rabbitmq_delay_message_exchange
特性二:Dead Letter Exchanges(DLX) 死信交换机 来实现消息延迟。
交换机类型 x-delayed-message
交换机参数设置 x-delayed-type : direct
发送消息 head设置 x-delay 指定 消息延迟 时间 单位毫秒。
发送到该交换机 和 指定routeKey 绑定的队列里。实现延迟消息。
队列属性设置死信属性和转发路由,实现消息处理异常,转发到别的队列。
*/
@Configuration
public class DelayQueueConfig {
/**
* 死信队列 交换机标识符
*/
private static final String DEAD_LETTER_EXCHANGE_KEY = "x-dead-letter-exchange";
/**
* 死信队列交换机绑定键标识符
*/
private static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
/**
* 转发死信 routeKey
*/
private static final String KEY_R = "KEY_R";
/**
* 转发队列 bind 交换机名称
*/
private static final String EXCHANGE_R = "DELAY_DEAD_REDIRECT_EXCHANGE";
/**
* 延迟交换机
* durable="true" rabbitmq重启的时候不需要创建新的交换机 持久化
* autoDelete=true 自动删除
*/
@Bean("delayLetterExchange")
public CustomExchange delayLetterExchange() {
//该头 必须添加 配合 消息失效时间实现 不通消息不同延迟效果
Map<String , Object> exchangeParams = new HashMap<>();
exchangeParams.put("x-delayed-type" , "direct");
return new CustomExchange("DELAY_EXCHANGE", "x-delayed-message" ,true, false ,exchangeParams);
}
/**
* 声明一个延迟队列.
* 设置消息过期和失败 变为死信
* 参数 x-dead-letter-exchange 对应 转发队列 绑定的 交换机名称
* 参数 x-dead-letter-routing-key 对应 转发队列 绑定的 routKey
* 可以设置统一超时时间 x-message-ttl
*/
@Bean("delayLetterQueue")
public Queue delayLetterQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 声明 死信转发队列交换机
args.put(DEAD_LETTER_EXCHANGE_KEY, EXCHANGE_R);
// x-dead-letter-routing-key 声明 死信转发队列路由
args.put(DEAD_LETTER_ROUTING_KEY, KEY_R);
//
return QueueBuilder.durable("DELAY_QUEUE").withArguments(args).build();
}
/**
* 延迟队列 通过 DELAY_KEY 绑定键绑定到延迟交换机上.
*
* @return the binding
*/
@Bean
public Binding delayLetterBinding() {
return new Binding("DELAY_QUEUE", Binding.DestinationType.QUEUE, "DELAY_EXCHANGE", "DELAY_KEY", null);
}
/**
* 定义死信队列转发队列.
* 场景一 :发送消息到死信队列。订阅转发队列 则实现 消息统一延迟消费的功能
* 场景二 :发送和订阅都是死信队列。则该队列为消息处理失败和超时的 异常信息队列。
* /**
* durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
* auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
* exclusive 表示该消息队列是否只在当前connection生效,默认是false
*
* new Queue(RabbitMqConfig.QUEUE_NAME1,true,false,false);
* @return the queue
*/
@Bean("deadRedirectQueue")
public Queue deadRedirectQueue() {
return QueueBuilder.durable("DELAY_DEAD_REDIRECT_QUEUE").build();
}
/**
* 死信转发交换机
* durable="true" rabbitmq重启的时候不需要创建新的交换机 持久化
* autoDelete=true 自动删除
*/
@Bean("deadRedirectExchange")
public DirectExchange deadRedirectExchange() {
return new DirectExchange("DELAY_DEAD_REDIRECT_EXCHANGE", true, false );
}
/**
* 转发队列 KEY_R 绑定键绑定到延死信转发交换机上
*
* @return the binding
*/
@Bean
public Binding deadRedirectBinding() {
return new Binding("DELAY_DEAD_REDIRECT_QUEUE", Binding.DestinationType.QUEUE, EXCHANGE_R, KEY_R, null);
}
}
2.DLX + TTL 做延迟消息,但是适合固定延迟的场景
//normalQueue
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl", 10000);//设置10s过期时间
//x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
map.put("x-dead-letter-exchange", "delay_exchange");
//x-dead-letter-routing-key参数是给这个DLX指定路由键
map.put("x-dead-letter-routing-key", "delay_key");
new Queue(NORMAL_QUEUE, true, false, false, map);
//normalExchange
new DirectExchange("normal_exchange", true, false)
BindingBuilder.bind(normalQueue).to(normalExchange).with("normal_key");
//delayQueue
new Queue("delay_queue")
//delayExchange
new DirectExchange("delay_exchange", true, false)
BindingBuilder.bind(delayQueue).to(delayExchange).with("delay_key")
convertAndSend("normal_exchange", "normal_key", order)
监听delay_queue
发动消息到 normalQueue -> 时间到 -> 消息转到 延迟队列 x-dead-letter-routing-key 绑定的队列 delay_queue
伪集群
生产者只会保证 是否投递成功 confirm机制(确认应答机制)。
消费者采用手动ack应答模式,采用MQ进行补偿重试机制,注意MQ补偿幂等性问题。 只是通知清不清除该消息。 不会通知生产者
rabbitMq 进行事务处理的方式时 ,存在正常的派单队列 和 补单队列(延时队列) 应对 正常派单处理失败,延时再处理一次
交换机采用路由键模式,补单队列和派但队列都绑定同一个路由键
// 1.定义订单队列
@Bean
public Queue directOrderDicQueue() {
return new Queue(ORDER_DIC_QUEUE);
}
// 2.定义补订单队列
@Bean
public Queue directCreateOrderQueue() {
return new Queue(ORDER_CREATE_QUEUE);
}
// 2.定义交换机
@Bean
DirectExchange directOrderExchange() {
return new DirectExchange(ORDER_EXCHANGE_NAME);
}
// 3.订单队列与交换机绑定
@Bean
Binding bindingExchangeOrderDicQueue() {
return BindingBuilder.bind(directOrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");
}
// 3.补单队列与交换机绑定
@Bean
Binding bindingExchangeCreateOrder() {
return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange()).with("orderRoutingKey");
}
分布式系统的三个指标
1、Consistency 一致性
2、Availability 可用性
3、Partition tolerance 分区容错性
eureka基于AP
zookeeper基于CP
分布式事务 实现方式为:
强一致
最终一致
实现的为最终一致性事务控制
A 服务发生事务 -> 发送业务到MQ -> 发送给成功后A 服务进行 事务提交 如果成功 则通知 MQ 进行投递 此期间(消息为半消息)->MQ 得知 A服务已经成功完成事务操作 则进行消息投递-> B服务进行第二部业务操作 ,B成功不成功 都会想办法让他成功(比如如果为网络问题,就进行二次投递,如果为代码问题 进行人工处理)
只支持 pull 模式
官网
kafka节点之间如何复制备份的?
kafka消息是否会丢失?为什么?
kafka最合理的配置是什么?
kafka的leader选举机制是什么?
kafka对硬件的配置有什么要求?
kafka的消息保证有几种方式?
kafka为什么会丢消息?
### 先启动zk
bash /usr/local/zk/bin/zkServer.sh stop
bash /usr/local/zk/bin/zkServer.sh start
bash /usr/local/zk/bin/zkServer.sh status
### 启动service
bash /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
#两个topic:topic1、topic2,其分区和副本数都设置为2
cd /usr/local/kafka/bin
bash /kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic topic2
bash kafka-topics.sh --list --zookeeper 127.0.0.1:2181
## 某队列的描述信息
bash kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic topic1
bash kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>provided<
</dependency>
核心类:
KafkaTemplate
ListenableFuture<SendResult<String, String>>
@KafkaListener
ConsumerFactory
ConcurrentKafkaListenerContainerFactory
KafkaListenerEndpointRegistry
ConsumerRecord<?, ?> record
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
spring:
kafka:
bootstrap-servers: 192.168.50.132:9092
producer:
retries: 1 #设置大于零 会对发送失败的消息重新发送
batch-size: 16384 #每次批量发送的消息数
buffer-memory: 33554432
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
@Component
public class KafkaSender {
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 发送消息到kafka
*@param topic 主题
*@param message 内容体
*/
public void sendMsg(String topic ,String key ,String message){
ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, key, message);
send.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
public void onFailure(Throwable throwable) {
}
public void onSuccess(SendResult<String, String> integerStringSendResult) {
System.out.println(integerStringSendResult.getProducerRecord());
}
});
}
}
//消费者
spring:
kafka:
bootstrap-servers: 192.168.50.132:9092
listener:
missing-topics-fatal: true # 没有队列启动报错
type: single
consumer:
group-id: defaultConsumerGroup
auto-commit-interval: 100
auto-offset-reset: latest
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#max-poll-records: 50 listener 配置bath 处理时设置每次处理的最多消息
@Configuration
public class KafkaCustomerConfig {
@Autowired
private ConsumerFactory consumerFactory;
// 监听器容器工厂(设置禁止KafkaListener自启动)
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止KafkaListener自启动 实现定时开始消费
// container.setAutoStartup(false);
// 设置消息过滤
// 被过滤的消息将被丢弃
container.setAckDiscarded(true);
container.setRecordFilterStrategy(consumerRecord -> {
//针对性的 对key value offset 进行过滤
if (consumerRecord.offset() % 2 == 0) { //只处理但记录消息
return false;
}
//返回true消息则被过滤
return true;
});
return container;
}
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return (message, exception, consumer) -> {
System.out.println("消费异常:"+message.getPayload());
return null;
};
}
}
@Component
public class CustomerService {
@Resource
private KafkaListenerEndpointRegistry registry;
// 定时启动监听器
@Scheduled(cron = "0 42 11 * * ? ")
public void startListener() {
System.out.println("启动监听器...");
// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
Collection<MessageListenerContainer> listenerContainers = registry.getListenerContainers();
for(MessageListenerContainer messageListenerContainer: listenerContainers){
if(!messageListenerContainer.isRunning()){
messageListenerContainer.start();
}
}
//registry.getListenerContainer("timingConsumer").resume();
}
// 定时停止监听器
@Scheduled(cron = "0 45 11 * * ? ")
public void shutDownListener() {
System.out.println("关闭监听器...");
registry.getListenerContainer("timingConsumer").pause();
}
}
配置监听方法 还可以实现 消息转发 消息转发 手动提交变更offset 异常回调方法
@KafkaListener
@KafkaListeners
@SendTo(“topic2”)
@Component
public class KafkaCustomerListener {
@KafkaListener(topics = {"topic1"},id="timingConsumerSingle",errorHandler ="consumerAwareErrorHandler") //配置group 没懂
//@SendTo("topic2") 转发消息
public void listenSingle (ConsumerRecord<?, ?> record) throws Exception {
System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
@KafkaListener(topics = {"topic1"},id="timingConsumerBatch",errorHandler ="consumerAwareErrorHandler")
public void listenBatch (List<ConsumerRecord<?, ?>> records) throws Exception {
for (ConsumerRecord<?, ?> record : records) {
System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}
}
顺序消息
kafka 发送消息 指定 partitionKey 使其发送到同一个 partion 里
<appender name="KafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
<topic>loges</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" />
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
<producerConfig>bootstrap.servers=ip:9093,ip:9094,ip:9095</producerConfig>
</appender>
linux 安装 java + maven 环境
java mvn 打包
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.9.3/rocketmq-4.9.3
rocketmq-4.9.3.tar.gz
rocketmq-4.9.3.zip
Start Name Server
sh bin/mqnamesrv & tail -f ~/logs/rocketmqlogs/namesrv.log
sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log
Shutdown Servers
sh bin/mqshutdown namesrv
sh bin/mqshutdown broker
启动内存 设置(JAVA_OPT=“${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m”) 原设置太大 报out of memory
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<scope>provided</scope>
</dependency>
RocketMQTemplate (发送消息 设置 topic properties tag deploy等)
消费端 指定 订阅的队列 filter 组
@RocketMQMessageListener(selectorExpression = "name is not null and name = jack",topic = "test-topic",consumerGroup = "consumer-group1",selectorType = SelectorType.SQL92 )
@RocketMQMessageListener(selectorExpression = "*",topic = "test-topic",consumerGroup = "consumer-group1",selectorType = SelectorType.TAG )
一个 listener (DefaultRocketMQListenerContainer) 启动一个 consumer handleMessage
/**
* consumerListener
* 核心类 ListenerContainerConfiguration DefaultRocketMQListenerContainer
* @author Jiazh
* @date 2022/03/24 11:17
**/
/**
* 数值比较, 例如>, >=, <, <=, BETWEEN, =;
* 字符比较, 例如 =, <>, IN;
* IS NULL 或者 IS NOT NULL;
* 逻辑语法, AND, OR, NOT;
*
* RocketMQReplyListener 可以再回传一个消息
* RocketMQListener
*/
@Service
@RocketMQMessageListener(selectorExpression = "*",
topic = "test-topic",
consumerGroup = "consumer-group1",
selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING, // 同组内消费模式设置
consumeMode = ConsumeMode.CONCURRENTLY // 客户端接受消息 是并发 还是 顺序
)
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
}
}
• 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
• 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
• 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
• 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。
总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。
通过合理的使用 Topic 和 Tag,可以让业务结构清晰,更可以提高效率。
dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
yml 配置
rocketmq:
nameserver:
producer
实现本地事务监听 即 半消息 投递 控制,实现分布式事务的最终一致性
RocketMQTemplate 发送信息指定tag 是在 tipic “:” 后
RocketMQUtil.getAndWrapMessage 方法解析 了 topic:tags 是topic 和 tag 放在一起的 MessageHeaders 放 userProperties, KEYS,消息类型之类的设置
MessageHeaderAccessor accessor = new MessageHeaderAccessor();
Map<String, Object> stringMap = new HashMap<>();
stringMap.put("KEYS",1111);
stringMap.put("name","jack");
accessor.copyHeaders(stringMap);
//setHeaders 包含 KEYS userPropertis
Message<String> msg = MessageBuilder.withPayload("Hello world!").setHeaders(accessor).build();
//发送事务类型消息 半消息
t.sendMessageInTransaction("Topic2:TagA", msg, null);
//发送消息
t.convertAndSend("Topic1:TagA", "Hello world! ");
//发送spring的Message
t.send("Topic1:TagA", MessageBuilder.withPayload("Hello world! ").build());
//发送异步消息
t.asyncSend("Topic1:TagA", "Hello world!", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
}
});
//发送顺序消息
t.syncSendOrderly("Topic1", "98456237,创建", "98456237");
t.syncSendOrderly("Topic1", "98456237,支付", "98456237");
t.syncSendOrderly("Topic1", "98456237,完成", "98456237");
事务监听类
@RocketMQTransactionListener
class Lis implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("执行本地事务");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("执行事务回查");
return RocketMQLocalTransactionState.COMMIT;
}
}
keys 是每个消息上携带的 信息。 于订阅没直接关系
tag 和 userProperties 可以用于sunscribe 订阅时进行信息筛选。 userProperties 需要使用 实现 MessageFilter 的实体或者 sql过滤语法
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.3</version>
<scope>provided</scope>
</dependency>
pull模式需要我们手动调用consumer拉消息,
而push模式则只需要我们提供一个listener即可实现对消息的监听
push模式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name_push"); //push 模式客户端
consumer 订阅队列, 指定过滤规则 tag 活 sql92 , 和表达式
tag "TAG1 || TAG2"
sql92 :
数值比较, 例如>, >=, <, <=, BETWEEN, =;
字符比较, 例如 =, <>, IN;
IS NULL 或者 IS NOT NULL;
逻辑语法, AND, OR, NOT;
eq
consumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + "and (a is not null and a between 0 and 3)"))
registerMessageListener 设置回调处理 成功 失败 延迟 等状态
customer model
ConsumeConcurrentlyContext 并发消费
MessageListenerOrderly 顺序消费
messageModel
消费端 设置 MessageModel 创建的消费者 广播(同组的消费者都会消费一编) 还是 集群模式(同组的消费者,平坦消息设置 消费位置)
pull模式
1. DefaultMQPullConsumer 过时了 DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group_name_pull");// pull 模式客户端
2. MQPullConsumerScheduleService PullTaskCallback 的实现回调方法 MessageModel
1. setCallbackTable 或者 registerPullTaskCallback 注册某个topic的 回调. 核心时 putTask 方法创建 PullTaskImpl 线程
1.DefaultMQProducer send 方法 Message 可以设置延迟等级 及 其他参数用于过滤
批量发送
Message 可以设置延迟等级 默认 0
private Map<String, String> properties; 里设置的参数 可以用于订阅时候 过滤 consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
2.TransactionMQProducer 发送发送半消息 事务的 客户端
1.设TransactionListener 的实现(executeLocalTransaction 方法时根据 msg信息 和 arg 参数 进行一些的事务操作。checkLocalTransaction 用于校验数据 根据是否成功来 返回 回滚,提交的状态. 用于半消息的完整发送 )
2.调用sendMessageInTransaction 发送 信息和参数
实际上基于zk的分布式锁目前已经有现成的实现框架,Curator就是Netflix开源的一套ZooKeeper客户端框架,它提供了zk场景的绝大部分实现,使用Curator就不必关心其内部算法,Curator提供了来实现分布式锁,用方法获取锁,以及用方法释放锁,同其他锁一样,方法需要放在finakky代码块中,确保锁能正确释放
zk 实现的分布式锁方式,1业务获取锁, 判断锁是否占用,占用则进行等待释放 未占用 创建zk节点 用于锁 释放锁