配置amq queue,topic
1.gradle
// activemq
compile('org.springframework.boot:spring-boot-starter-activemq')
2.application.properties
# activemq
# 是否启用内存模式(也就是不安装MQ,项目启动时同时也启动一个MQ实例)
spring.activemq.in-memory=true
# 是否替换默认的connectionFactory
spring.activemq.pool.enabled=false
# 最大连接数
spring.activemq.pool.maxConnections=2
# 超时时间
spring.activemq.pool.expiryTimeout=0
# 空闲时间
spring.activemq.pool.idleTimeout=30000
# 信任所有的包
spring.activemq.packages.trust-all=true
3.application-***.properties
# activemq
# activeMQ地址
spring.activemq.broker-url=tcp://***:61616
#集群配置
#spring.activemq.broker-url=failover:(tcp://***:61616,tcp://***:61616)
# activeMQ用户名,根据实际情况配置
spring.activemq.user=username
# activeMQ密码,根据实际情况配置
spring.activemq.password=password
4.ActiveMqConfig
package 略
import org.apache.activemq.command.ActiveMQQueue
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.annotation.EnableJms
@Configuration
@EnableJms //开启JMS 注解的支持
open class ActiveMqConfig {
@Bean
fun testQueueTest() = ActiveMQQueue("Test_abc")
@Bean
fun jmsListenerContainerTopic(connectionFactory: ActiveMQConnectionFactory): JmsListenerContainerFactory<*> {
val factory = DefaultJmsListenerContainerFactory()
factory.setPubSubDomain(true) //true表示发布/订阅模式
factory.setConnectionFactory(connectionFactory)
return factory
}
}
5.接收
package 略
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Service
@Service
class ActiveMqRecieverService {
private val log = LoggerFactory.getLogger(ActiveMqRecieverService::class.java)
@JmsListener(destination = "Test_abc") //监听消息,实现队列消费
fun crmSelfTest(resp: Map<String, String>) {
resp.keys.map { k ->
log.info("Test_abc_key:>>>$k")
log.info("Test_abc_value:>>>${resp[k]}")
}
}
@JmsListener(destination = "***", containerFactory = "jmsListenerContainerTopic")
fun msgReceiveTopic(resp: Map<String, String>) {
log.info("msg_key:>>>$k")
log.info("msg_value:>>>${resp[k]}")
}
}
6.发送
package 略
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.jms.annotation.JmsListener
import org.springframework.jms.core.JmsMessagingTemplate
import org.springframework.stereotype.Service
import javax.jms.MessageProducer
import javax.jms.Queue
import javax.jms.Session
@Service
class ActiveMqSenderService {
private val log = LoggerFactory.getLogger(ActiveMqSenderService::class.java)
@Autowired(required = false)
private lateinit var jmsMessagingTemplate: JmsMessagingTemplate
@Autowired(required = false)
private lateinit var crmSelfQueueTest: Queue
@Throws(Exception::class)
fun sendMsg(msgMap: Map<String, String>, type: String?) {
val s = jmsMessagingTemplate.getConnectionFactory().createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)
val message = s.createMapMessage() //消息格式map
msgMap.keys.map { key ->
message.setString(key, msgMap.get(key))
}
val producer: MessageProducer = s.createProducer(testQueueTest)
producer.send(message)
}
}
7.使用全局变量
class GlobalConfig {
object Activemq {
//不同的环境前缀不同,Dev,Test,Prep,Online
private const val prefix = "Test"
//接收
const val MQ_A_NAME = "${prefix}_b_2_a"
//发送
const val MQ_B_NAME = "${prefix}_a_2_b"
//自产自销
const val MQ_DELAY_NAME = "${prefix}_delay_queue"
}
改动
@JmsListener(destination = GlobalConfig.Activemq.MQ_A_NAME)
...
fun testQueueTest() = ActiveMQQueue(GlobalConfig.Activemq.MQ_B_NAME)