rabbitmq发送json消息

韦高谊
2023-12-01

rabbitmq发送json格式消息

package org.secp.rabbitmq.exchange;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName
 * @Author lzp
 * @Date 2022/5/18 19:11
 * @Version 1.0
 **/
@Configuration
public class TopicMqConfig {

    /**
     * 交换机名称
     */
    public static final String SECP_TOPIC_EXCHANGE_NAME = "secp.topic.exchange";

    /**
     * 定义bindingKey,模糊匹配
     */
    public static final String BINGDING_KEY_TEST1 = "*.*.userInfo";

    public static final String BINGDING_KEY_TEST2 = "*.*.basics";

    public static final String BINGDING_KEY_TEST3 = "*.*.organization";

    /**
     * 构建topic类型交换机
     *
     * @return
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(SECP_TOPIC_EXCHANGE_NAME, true, false);
    }

    /**
     * 用户同步队列名称
     */
    public final static String SECP_DATA_USER_INFO = "secp_data_user_info";
    /**
     * 基础数据同步队列名称
     */
    public final static String SECP_DATA_BASICS = "secp_data_basics";
    /**
     * 组织数据同步队列名称
     */
    public final static String SECP_DATA_ORGANIZATION = "secp_data_organization";

    /**
     * 创建队列 设置路由 构建序列 
     * userQueue
     *
     * @return
     */
    @Bean
    public Queue userQueue() {
        // 支持持久化
        return new Queue(SECP_DATA_USER_INFO, true);
    }

    /**
     * 创建队列 设置路由 构建序列 
     * basicsQueue
     *
     * @return
     */
    @Bean
    public Queue basicsQueue() {
        // 支持持久化
        return new Queue(SECP_DATA_BASICS, true);
    }

    /**
     * 创建队列 设置路由 构建序列 
     * organizationQueue
     *
     * @return
     */
    @Bean
    public Queue organizationQueue() {
        // 支持持久化
        return new Queue(SECP_DATA_ORGANIZATION, true);
    }


    @Bean
    public Binding userBinding() {
        return BindingBuilder.bind(userQueue()).to(topicExchange()).with(BINGDING_KEY_TEST1);
    }

    @Bean
    public Binding basicsBinding() {
        return BindingBuilder.bind(basicsQueue()).to(topicExchange()).with(BINGDING_KEY_TEST2);
    }

    @Bean
    public Binding organizationBinding() {
        return BindingBuilder.bind(organizationQueue()).to(topicExchange()).with(BINGDING_KEY_TEST3);
    }

}

 /**
     * 有绑定Key的Exchange发送
     *
     * @param routingKey
     * @param msg
     */
    public void sendMessageToExchange(TopicExchange topicExchange, String routingKey, Object msg) {
        log.info("发送有绑定Key的Exchange的消息");
        Message message = getMessage(MessageProperties.CONTENT_TYPE_JSON, msg);
        rabbitTemplate.send(topicExchange.getName(), routingKey, message);
    }

    /**
     * 没有绑定KEY的Exchange发送
     *
     * @param exchange
     * @param msg
     */
    public void sendMessageToExchange(TopicExchange topicExchange, AbstractExchange exchange, String msg) {
        addExchange(exchange);
        logger.info("RabbitMQ send " + exchange.getName() + "->" + msg);
        rabbitTemplate.convertAndSend(topicExchange.getName(), msg);
    }
/**
     * 指定key
     *
     * @param key
     * @param obj
     * @return
     */
    public void msgKeyData(String key, DataInformVo obj) {
        Assert.hasText(key, "key不能为空!");
        rabbitMqClient.sendMessageToExchange(new TopicExchange(TopicMqConfig.SECP_TOPIC_EXCHANGE_NAME),key,JSONObject.toJSON(obj));
    }
package org.secp.modules.open.listener;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.secp.rabbitmq.core.BaseRabbiMqHandler;
import org.secprabbitmq.exchange.TopicMqConfig;
import org.secprabbitmq.listenter.MqListener;
import org.secp.modules.open.entity.vo.DataInformVo;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

/**
 * @ClassName
 * @Author lzp
 * @Date 2022/5/19 9:21
 * @Version 1.0
 **/
@Component
@Slf4j
public class TestReceiver3 extends BaseRabbiMqHandler<Message> {

 /**
  * 监听test1队列
  *
  * @param message
  * @throws UnsupportedEncodingException
  */
   @RabbitListener(queues = TopicMqConfig.SECP_DATA_USER_INFO)
   public void consumeMessage1(Message message, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag, Channel channel) throws UnsupportedEncodingException {
       log.info("消费的消息");
       String s = new String(message.getBody(), "utf-8");
       try {
           DataInformVo dataInformVo = JSONObject.parseObject(s, DataInformVo.class);
           log.info("SECP_DATA_USER_INFO:{}", dataInformVo);
           //TODO 业务处理
           channel.basicAck(deliveryTag, false);
       } catch (Exception e) {
           log.info("接收消息失败,重新放回队列");
           try {
               channel.basicNack(deliveryTag, false, true);
           } catch (IOException ex) {
               ex.printStackTrace();
           }
       }
   }

   @RabbitListener(queues = TopicMqConfig.SECP_DATA_BASICS)
   public void onMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
       super.onMessage(message, deliveryTag, channel, new MqListener<Message>() {
           @Override
           public void handler(Message message, Channel channel) throws UnsupportedEncodingException {
               //业务处理
               log.info("进行消息消费!!!!");
               String s = new String(message.getBody(), "utf-8");
               DataInformVo dataInformVo = JSONObject.parseObject(s, DataInformVo.class);
               log.info("BusinessType:{}", dataInformVo.getBusinessType());
               log.info("InterfaceName:{}", dataInformVo.getInterfaceName());
               log.info("OperationId:{}", dataInformVo.getOperationId());
               log.info("OperationType:{}", dataInformVo.getOperationType());
               log.info("OperationParentId:{}", dataInformVo.getOperationParentId());
               log.info("SECP_DATA_USER_INFO:{}", dataInformVo);
           }
       });
   }
}

/**
 * 2022/5/19 9:22
 * @author lzp
 */
@Slf4j
public class BaseRabbiMqHandler<T> {


    public void onMessage(T t, Long deliveryTag, Channel channel, MqListener mqListener) {
        try {
            mqListener.handler(t, channel);
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.info("接收消息失败,重新放回队列");
            try {
                channel.basicNack(deliveryTag, false, true);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }

    }
}
package org.secp.rabbitmq.listenter;

import com.rabbitmq.client.Channel;

import java.io.UnsupportedEncodingException;

/**
 * @ClassName
 * @Author lzp
 * @Date 2022/5/19 9:21
 * @Version 1.0
 **/
public interface MqListener<T> {

    default void handler(T map, Channel channel) throws UnsupportedEncodingException {
    }

}

 类似资料: