当前位置: 首页 > 工具软件 > spring-jfinal > 使用案例 >

spring集成rabbitmq

勾长卿
2023-12-01

(本文不是讲解的简单demo 而说的是一个web项目中的操作)
准备工作:

1:

: jar(我是通过maven引入的) 因为我的项目使用的是spring 4.3版本的 所以引入的spring-rabbit 为1.6.6
 <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.6.6.RELEASE</version>
              <exclusions>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-messaging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
我这里是因为这个messaging这个jar跟我项目的有冲突,我选择不引入

2:

xml文件:(spring-rabbitmq.xml)
我这里使用的是direct模式 1对1

 <!--连接工厂-->
    <rabbit:connection-factory id="connectionFactory" host="localhost" port="5672" username="guest" password="guest"/>

    <!-- 操作的模板-->
    <rabbit:template id="rmqTemplate" exchange="chatRoomExchange"
                     connection-factory="connectionFactory"
                     message-converter="messageConverter"></rabbit:template>



    <!-- RabbitAdmin 用于管理exchange,queue -->
    <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>

    <!-- 声明一个队列 -->
    <rabbit:queue name="ChatRoomMsgSendQue" id="ChatRoomMsgSendQue"></rabbit:queue>

    <!--消息发送过来的时候    消息处理事件与交换路由绑定  durable:是否持久化  exclusive: 仅创建者可以使用的私有队列,断开后自动删除    auto_delete: 当所有消费客户端连接断开后,是否自动删除队列-->
    <rabbit:direct-exchange name="chatRoomExchange" durable="true" auto-delete="false" id="chatRoomExchange">
        <rabbit:bindings>

            <rabbit:binding queue="ChatRoomMsgSendQue" key="ChatRoomMsgSendQueKey"/>

        </rabbit:bindings>
    </rabbit:direct-exchange>
    <!--消息处理的时候,  队列和事件绑定-->
    <bean id="producer"     class="cn.zfgc.jfinal.rabbitMqProducer.RmqProducer">
        <property name="rmqTemplate" ref="rmqTemplate" />
    </bean>
    <!-- 消息转换器  -->
    <!-- 创建消息转换器为SimpleMessageConverter -->
    <bean id="messageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"></bean>


    <bean id="ChatRoomMsgSendQueConsumer" class="这是包名.rabbitMqConsumer.ChatRoomMsgSendQueConsumer"/>


    <!-- listener 容器 可以绑定多个消费者,一个消费者可以绑定多个queue  queues:监听的队列,多个的话用逗号(,)分隔  ref:监听器 -->
    <rabbit:listener-container connection-factory="connectionFactory"
                               message-converter="messageConverter">

        <rabbit:listener ref="ChatRoomMsgSendQueConsumer" queues="ChatRoomMsgSendQue"/>

    </rabbit:listener-container>

3:
一个发送者,一个接收者
在service层注入 producer 发送模板

@Service
public class MsgSendService {
    @Autowired
    private RmqProducer producer;

    public  void msgIntoMq(String routeKey, ChatMessage chatMessage, RmqProducer producer) {
    //这个chatMessage是我需要发送到mq的对象
    //这个可以是String 也可以是obj    mq的发送需要序列化 如果是对象 则需要impl  Serializable
    //一个发送者对应一个消费者  之间通过 QueueKey 队列key来关联
        RabbitMqMsgDO rabbitMqMsgDO = new RabbitMqMsgDO();
        rabbitMqMsgDO.setParam(new Message(SerializationUtils.serialize(chatMessage), new MessageProperties()));
        rabbitMqMsgDO.setRouteKey(routeKey);
        producer.sendMsgToMq(rabbitMqMsgDO);
    }
}

消费者端

public class ChatRoomMsgSendQueConsumer implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

            byte[] bytes = message.getBody();
            ChatMessage chatMessage = (ChatMessage) SerializationUtils.deserialize(bytes);
            // 给服务端手动返回ack 确认
            //不返回可能会造成消息锁死
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            //到这里我们发送的chatMessage已经接受完了
        }
    }
}
 类似资料: