当前位置: 首页 > 工具软件 > OpenAMQ > 使用案例 >

kotlin+gradle+springboot项目增加amq实现

华凌
2023-12-01

配置amq queue,topic
1.gradle

// activemq
	compile('org.springframework.boot:spring-boot-starter-activemq')

2.application.properties

# activemq
# 是否启用内存模式(也就是不安装MQ,项目启动时同时也启动一个MQ实例)
spring.activemq.in-memory=true
# 是否替换默认的connectionFactory
spring.activemq.pool.enabled=false
# 最大连接数
spring.activemq.pool.maxConnections=2
# 超时时间
spring.activemq.pool.expiryTimeout=0
# 空闲时间
spring.activemq.pool.idleTimeout=30000
# 信任所有的包
spring.activemq.packages.trust-all=true

3.application-***.properties

# activemq
# activeMQ地址
spring.activemq.broker-url=tcp://***:61616
#集群配置
#spring.activemq.broker-url=failover:(tcp://***:61616,tcp://***:61616)
# activeMQ用户名,根据实际情况配置
spring.activemq.user=username
# activeMQ密码,根据实际情况配置
spring.activemq.password=password

4.ActiveMqConfig

package 略

import org.apache.activemq.command.ActiveMQQueue
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.annotation.EnableJms

@Configuration
@EnableJms  //开启JMS 注解的支持
open class ActiveMqConfig {
    @Bean
    fun testQueueTest() = ActiveMQQueue("Test_abc")
    
    @Bean
    fun jmsListenerContainerTopic(connectionFactory: ActiveMQConnectionFactory): JmsListenerContainerFactory<*> {
        val factory = DefaultJmsListenerContainerFactory()
        factory.setPubSubDomain(true) //true表示发布/订阅模式
        factory.setConnectionFactory(connectionFactory)
        return factory
    }
}

5.接收

package 略

import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Service

@Service
class ActiveMqRecieverService {
    private val log = LoggerFactory.getLogger(ActiveMqRecieverService::class.java)

    @JmsListener(destination = "Test_abc")  //监听消息,实现队列消费
    fun crmSelfTest(resp: Map<String, String>) {
        resp.keys.map { k ->
            log.info("Test_abc_key:>>>$k")
            log.info("Test_abc_value:>>>${resp[k]}")
        }
    }
	
    @JmsListener(destination = "***", containerFactory = "jmsListenerContainerTopic")
    fun msgReceiveTopic(resp: Map<String, String>) {
        log.info("msg_key:>>>$k")
        log.info("msg_value:>>>${resp[k]}")
    }
}

6.发送

package 略

import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.jms.annotation.JmsListener
import org.springframework.jms.core.JmsMessagingTemplate
import org.springframework.stereotype.Service
import javax.jms.MessageProducer
import javax.jms.Queue
import javax.jms.Session

@Service
class ActiveMqSenderService {

    private val log = LoggerFactory.getLogger(ActiveMqSenderService::class.java)

    @Autowired(required = false)
    private lateinit var jmsMessagingTemplate: JmsMessagingTemplate

    @Autowired(required = false)
    private lateinit var crmSelfQueueTest: Queue

    @Throws(Exception::class)
    fun sendMsg(msgMap: Map<String, String>, type: String?) {
        val s = jmsMessagingTemplate.getConnectionFactory().createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)
        val message = s.createMapMessage() //消息格式map
        msgMap.keys.map { key ->
            message.setString(key, msgMap.get(key))
        }
        val producer: MessageProducer =  s.createProducer(testQueueTest)
        producer.send(message)
    }

}

7.使用全局变量

class GlobalConfig {
    object Activemq {
        //不同的环境前缀不同,Dev,Test,Prep,Online
        private const val prefix = "Test"
        //接收
        const val MQ_A_NAME = "${prefix}_b_2_a"
        //发送
        const val MQ_B_NAME = "${prefix}_a_2_b"
        //自产自销
        const val MQ_DELAY_NAME = "${prefix}_delay_queue"
    }

改动

    @JmsListener(destination = GlobalConfig.Activemq.MQ_A_NAME)
    ...
    fun testQueueTest() = ActiveMQQueue(GlobalConfig.Activemq.MQ_B_NAME)
 类似资料: