rabbitmq发送json格式消息
package org.secp.rabbitmq.exchange;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName
* @Author lzp
* @Date 2022/5/18 19:11
* @Version 1.0
**/
@Configuration
public class TopicMqConfig {
/**
* 交换机名称
*/
public static final String SECP_TOPIC_EXCHANGE_NAME = "secp.topic.exchange";
/**
* 定义bindingKey,模糊匹配
*/
public static final String BINGDING_KEY_TEST1 = "*.*.userInfo";
public static final String BINGDING_KEY_TEST2 = "*.*.basics";
public static final String BINGDING_KEY_TEST3 = "*.*.organization";
/**
* 构建topic类型交换机
*
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(SECP_TOPIC_EXCHANGE_NAME, true, false);
}
/**
* 用户同步队列名称
*/
public final static String SECP_DATA_USER_INFO = "secp_data_user_info";
/**
* 基础数据同步队列名称
*/
public final static String SECP_DATA_BASICS = "secp_data_basics";
/**
* 组织数据同步队列名称
*/
public final static String SECP_DATA_ORGANIZATION = "secp_data_organization";
/**
* 创建队列 设置路由 构建序列
* userQueue
*
* @return
*/
@Bean
public Queue userQueue() {
// 支持持久化
return new Queue(SECP_DATA_USER_INFO, true);
}
/**
* 创建队列 设置路由 构建序列
* basicsQueue
*
* @return
*/
@Bean
public Queue basicsQueue() {
// 支持持久化
return new Queue(SECP_DATA_BASICS, true);
}
/**
* 创建队列 设置路由 构建序列
* organizationQueue
*
* @return
*/
@Bean
public Queue organizationQueue() {
// 支持持久化
return new Queue(SECP_DATA_ORGANIZATION, true);
}
@Bean
public Binding userBinding() {
return BindingBuilder.bind(userQueue()).to(topicExchange()).with(BINGDING_KEY_TEST1);
}
@Bean
public Binding basicsBinding() {
return BindingBuilder.bind(basicsQueue()).to(topicExchange()).with(BINGDING_KEY_TEST2);
}
@Bean
public Binding organizationBinding() {
return BindingBuilder.bind(organizationQueue()).to(topicExchange()).with(BINGDING_KEY_TEST3);
}
}
/**
* 有绑定Key的Exchange发送
*
* @param routingKey
* @param msg
*/
public void sendMessageToExchange(TopicExchange topicExchange, String routingKey, Object msg) {
log.info("发送有绑定Key的Exchange的消息");
Message message = getMessage(MessageProperties.CONTENT_TYPE_JSON, msg);
rabbitTemplate.send(topicExchange.getName(), routingKey, message);
}
/**
* 没有绑定KEY的Exchange发送
*
* @param exchange
* @param msg
*/
public void sendMessageToExchange(TopicExchange topicExchange, AbstractExchange exchange, String msg) {
addExchange(exchange);
logger.info("RabbitMQ send " + exchange.getName() + "->" + msg);
rabbitTemplate.convertAndSend(topicExchange.getName(), msg);
}
/**
* 指定key
*
* @param key
* @param obj
* @return
*/
public void msgKeyData(String key, DataInformVo obj) {
Assert.hasText(key, "key不能为空!");
rabbitMqClient.sendMessageToExchange(new TopicExchange(TopicMqConfig.SECP_TOPIC_EXCHANGE_NAME),key,JSONObject.toJSON(obj));
}
package org.secp.modules.open.listener;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.secp.rabbitmq.core.BaseRabbiMqHandler;
import org.secprabbitmq.exchange.TopicMqConfig;
import org.secprabbitmq.listenter.MqListener;
import org.secp.modules.open.entity.vo.DataInformVo;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
/**
* @ClassName
* @Author lzp
* @Date 2022/5/19 9:21
* @Version 1.0
**/
@Component
@Slf4j
public class TestReceiver3 extends BaseRabbiMqHandler<Message> {
/**
* 监听test1队列
*
* @param message
* @throws UnsupportedEncodingException
*/
@RabbitListener(queues = TopicMqConfig.SECP_DATA_USER_INFO)
public void consumeMessage1(Message message, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag, Channel channel) throws UnsupportedEncodingException {
log.info("消费的消息");
String s = new String(message.getBody(), "utf-8");
try {
DataInformVo dataInformVo = JSONObject.parseObject(s, DataInformVo.class);
log.info("SECP_DATA_USER_INFO:{}", dataInformVo);
//TODO 业务处理
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.info("接收消息失败,重新放回队列");
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
@RabbitListener(queues = TopicMqConfig.SECP_DATA_BASICS)
public void onMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
super.onMessage(message, deliveryTag, channel, new MqListener<Message>() {
@Override
public void handler(Message message, Channel channel) throws UnsupportedEncodingException {
//业务处理
log.info("进行消息消费!!!!");
String s = new String(message.getBody(), "utf-8");
DataInformVo dataInformVo = JSONObject.parseObject(s, DataInformVo.class);
log.info("BusinessType:{}", dataInformVo.getBusinessType());
log.info("InterfaceName:{}", dataInformVo.getInterfaceName());
log.info("OperationId:{}", dataInformVo.getOperationId());
log.info("OperationType:{}", dataInformVo.getOperationType());
log.info("OperationParentId:{}", dataInformVo.getOperationParentId());
log.info("SECP_DATA_USER_INFO:{}", dataInformVo);
}
});
}
}
/**
* 2022/5/19 9:22
* @author lzp
*/
@Slf4j
public class BaseRabbiMqHandler<T> {
public void onMessage(T t, Long deliveryTag, Channel channel, MqListener mqListener) {
try {
mqListener.handler(t, channel);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.info("接收消息失败,重新放回队列");
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
package org.secp.rabbitmq.listenter;
import com.rabbitmq.client.Channel;
import java.io.UnsupportedEncodingException;
/**
* @ClassName
* @Author lzp
* @Date 2022/5/19 9:21
* @Version 1.0
**/
public interface MqListener<T> {
default void handler(T map, Channel channel) throws UnsupportedEncodingException {
}
}