maven:
<!-- zubs --> <dependency> <groupId>org.zbus</groupId> <artifactId>zbus</artifactId> <version>6.2.6</version> </dependency>生产者:
@Component public class ZbusProducerHolder { private static org.slf4j.Logger Logger = LoggerFactory.getLogger(ZbusProducerHolder.class); @Value("${zbus.broker.address}") private String mqAddress; @Value("${zbus.broker.name}") private String brokerName; private Broker broker; private Producer producer; @PostConstruct public void init() { try { BrokerConfig config=new BrokerConfig(); config.setServerAddress(mqAddress); broker=new SingleBroker(config); producer=new Producer(broker,brokerName); producer.createMQ(); }catch (Exception e){ } } /** * 发送 */ public void sendMsg(String datas) throws Exception{ Message msg=new Message(); msg.setBody(datas); producer.sendSync(msg); } }消费:
@Component public class ZbusConsumer { private static org.slf4j.Logger Logger = LoggerFactory.getLogger(ZbusConsumer.class); private static final Integer CUP_COUNT = Runtime.getRuntime().availableProcessors() * 2; @Value("${zbus.broker.address}") private String mqAddress; private Broker broker; @Value("${zbus.broker.message.send.name}") private String messageSendBrokerName; @Value("${zbus.broker.user.info.name}") private String userInfoBrokerName; @Inject private MsgSendBiz msgSendBiz; @Inject private UserInfoBiz userInfoBiz; @PostConstruct public void init() { try { // 创建Broker代表 BrokerConfig brokerConfig = new BrokerConfig(); brokerConfig.setServerAddress(mqAddress); broker = new SingleBroker(brokerConfig); MqConfig config1 = new MqConfig(); config1.setBroker(broker); config1.setMq(messageSendBrokerName); MqConfig config2 = new MqConfig(); config2.setBroker(broker); config2.setMq(userInfoBrokerName); for (int i = 0; i < CUP_COUNT; i++) { final Consumer c1 = new Consumer(config1); c1.onMessage((msg, sess) -> msgSendBiz.sendTemplateMsg(msg.getBodyString())); c1.start(); final Consumer c2 = new Consumer(config2); c2.onMessage((msg, sess) -> userInfoBiz.saveUserInfo(msg.getBodyString())); c2.start(); } } catch (Exception e) { Logger.error("zbus消费队列出错:{}", e); } } @PreDestroy public void destroy() { try { broker.close(); Logger.error("zbus消费队列服务停止:{}", new Date()); } catch (IOException e) { Logger.error("zbus消费队列停止出错:{}", e); } }