当前位置: 首页 > 工具软件 > zbus > 使用案例 >

ZBus消息中间件和WebSocket的联合使用

梁丘伟
2023-12-01

1、ZBusconfig.java, zbus的启动、生产、回调处理消息的方法。

package com.accenture.icc.zbus.config;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.zbus.broker.Broker;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.mq.Consumer.ConsumerHandler;
import org.zbus.mq.MqAdmin;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;

import com.accenture.icc.pojo.AnalogInputData;
import com.accenture.icc.pojo.DataUnit;
import com.accenture.icc.pojo.TripleDataWrapper;

@Configuration
@PropertySource("classpath:data.properties")
public class ZbusConfig {	
	private static final Logger logger = LoggerFactory.getLogger(ZbusConfig.class);
	
	@Value("${zbus.mq.sub}")
	private String subMq;
	
	@Value("${zbus.mq.recv}")
	private String recvMq;
	
	@Value("${zbus.mq.alarm}")
	private String alarmMq;
	
	@Value("${zbus.mq.subCommonData}")
	private String subIndexDataMq;
	
	@Value("${zbus.mq.recvCommonData}")
	private String indexDataMq;
	
	@Value("${csv.power.maxsize}")
	private int powerMaxListSize;
	@Value("${csv.power.period}")
	private int powerPeriod;
	
	@Value("${csv.consumption.maxsize}")
	private int consumptionMaxListSize;
	@Value("${csv.consumption.period}")
	private int consumptionPeriod;
	
	@Autowired
	private TripleDataWrapper powerWrapper;
	@Autowired
	private TripleDataWrapper consumptionWrapper;
	@Autowired
	private TripleDataWrapper transformerWrapper;
	@Autowired
	private TripleDataWrapper powerFactorWrapper;
	@Autowired
	private TripleDataWrapper unbalanceWrapper;
	@Autowired
	private TripleDataWrapper stationParamWrapper;

	/**
	 * 回调函数
	 * @param messaging
	 * @return
	 */
	@Bean
	public ConsumerHandler consumerHandler(SimpMessageSendingOperations messaging) {		
		ConsumerHandler consumerHandler = new ConsumerHandler() {
			@Override
			public void handle(Message msg, Consumer consumer) throws IOException {
				logger.info("RECEIVING MESSAGE: {}", msg.getBodyString());
				String[] fields = msg.getBodyString().split(",");
				if(fields.length < 5){
					return;
				}
				String tag = fields[0];
				int tableId = 0;
				int recordId = 0;
				int fieldId = 0;
				double value = 0;
				String timeStamp = "";
				try{
					tableId = Integer.parseInt(fields[1]);
					recordId = Integer.parseInt(fields[2]);
					fieldId = Integer.parseInt(fields[3]);
					value = Double.parseDouble(fields[4]);
					if(value < 0.01 || value == 128) {
						value = 0;
					}
					timeStamp = fields[6];
				} catch (NumberFormatException e) {
					logger.info("message format error.");
					return;
				}
				
				AnalogInputData analogInputData = new AnalogInputData(tag, tableId, recordId, fieldId, value, timeStamp);
				
				if(powerWrapper.containsRecord(recordId)){
					List<AnalogInputData> datalist = powerWrapper.getDataListByRecordid(recordId);
					if(datalist.size() == powerMaxListSize) {
						datalist.remove(0);
					}
					datalist.add(analogInputData);
				}
				if(consumptionWrapper.containsRecord(recordId)){
					List<AnalogInputData> datalist = consumptionWrapper.getDataListByRecordid(recordId);
					if(datalist.size() == consumptionMaxListSize) {
						datalist.remove(0);
					}
					datalist.add(analogInputData);
				}
				if(transformerWrapper.containsRecord(recordId)) {
					List<AnalogInputData> datalist = transformerWrapper.getDataListByRecordid(recordId);
					if(datalist.size() == powerMaxListSize) {
						datalist.remove(0);
					}
					datalist.add(analogInputData);
				}
				if(powerFactorWrapper.containsRecord(recordId)) {
					List<AnalogInputData> datalist = powerFactorWrapper.getDataListByRecordid(recordId);
					if(datalist.size() == powerMaxListSize) {
						datalist.remove(0);
					}
					datalist.add(analogInputData);
				}
				if(unbalanceWrapper.containsRecord(recordId)) {
					List<AnalogInputData> datalist = unbalanceWrapper.getDataListByRecordid(recordId);
					if(datalist.size() == powerMaxListSize) {
						datalist.remove(0);
					}
					datalist.add(analogInputData);
				}				
				broadcastAnalog(recordId, messaging);
			}
		};		
		return consumerHandler;
	}
	
	private void broadcastAnalog(int recordId, SimpMessageSendingOperations messaging) {
		Map<String, Object> resMap = new HashMap<>();
		resMap.put("status", 1000);
		
		int groupid = -1;
		int factoryid = -1;
		List<DataUnit> dataUnits = null;
		String destination = "";
		if(powerWrapper.containsRecord(recordId)){
			groupid = powerWrapper.getGroupIdByRecord(recordId);
			dataUnits = powerWrapper.getDataUnitsByGroupid(groupid);
			factoryid = powerWrapper.getFactoryIdByRecord(recordId);
			destination = "/topic/analogs/"+factoryid+"/1";
			resMap.put("dataUnits", dataUnits);
			resMap.put("groupid", groupid);
			messaging.convertAndSend(destination, resMap);
		}
		if(consumptionWrapper.containsRecord(recordId)){
			groupid = consumptionWrapper.getGroupIdByRecord(recordId);
			dataUnits = consumptionWrapper.getDataUnitsByGroupid(groupid);
			factoryid = consumptionWrapper.getFactoryIdByRecord(recordId);
			destination = "/topic/consumption/"+factoryid+"/1";
			resMap.put("dataUnits", dataUnits);
			resMap.put("groupid", groupid);
			messaging.convertAndSend(destination, resMap);
		}
		if(transformerWrapper.containsRecord(recordId)) {
			groupid = transformerWrapper.getGroupIdByRecord(recordId);
			dataUnits = transformerWrapper.getDataUnitsByGroupid(groupid);
			factoryid = transformerWrapper.getFactoryIdByRecord(recordId);
			destination = "/topic/analogs/"+factoryid+"/2";
			resMap.put("dataUnits", dataUnits);
			resMap.put("groupid", groupid);
			messaging.convertAndSend(destination, resMap);
		}
		if(powerFactorWrapper.containsRecord(recordId)) {
			groupid = powerFactorWrapper.getGroupIdByRecord(recordId);
			dataUnits = powerFactorWrapper.getDataUnitsByGroupid(groupid);
			factoryid = powerFactorWrapper.getFactoryIdByRecord(recordId);
			destination = "/topic/analogs/"+factoryid+"/3";
			resMap.put("dataUnits", dataUnits);
			resMap.put("groupid", groupid);
			messaging.convertAndSend(destination, resMap);
		}
		if(unbalanceWrapper.containsRecord(recordId)) {
			groupid = unbalanceWrapper.getGroupIdByRecord(recordId);
			dataUnits = unbalanceWrapper.getDataUnitsByGroupid(groupid);
			factoryid = unbalanceWrapper.getFactoryIdByRecord(recordId);
			destination = "/topic/analogs/"+factoryid+"/4";
			resMap.put("dataUnits", dataUnits);
			resMap.put("groupid", groupid);
			messaging.convertAndSend(destination, resMap);
		}
	}
	
	/**
	 * @return
	 * @throws IOException
	 */
	@Bean(destroyMethod="close")
	public Broker broker(@Value("${zbus.address}") String zbusAddress) throws IOException {
		Broker broker = new ZbusBroker(zbusAddress);		
		return broker;
	}
	
	/**
	 * @param consumerHandler
	 * @return
	 * @throws IOException
	 * @throws InterruptedException 
	 */
	@Bean(initMethod="start")
	public Consumer zbusConsumer(Broker broker, ConsumerHandler consumerHandler) throws IOException, InterruptedException {
		MqAdmin mqAdmin = new MqAdmin(broker, recvMq);
		mqAdmin.removeMQ();
		Consumer consumer = new Consumer(broker, recvMq);
		consumer.onMessage(consumerHandler);
				
		return consumer;
	}
	@Bean(initMethod="start")
	public Consumer zbusConsumerAlarm(Broker broker, ConsumerHandler consumerHandlerAlarm) throws IOException, InterruptedException {
		MqAdmin mqAdminAlarm = new MqAdmin(broker, alarmMq);
		mqAdminAlarm.removeMQ();
		Consumer consumerAlarm = new Consumer(broker, alarmMq);
		consumerAlarm.onMessage(consumerHandlerAlarm);
				
		return consumerAlarm;
	}
	
	@Bean(initMethod="start")
	public Consumer zbusConsumerIndexData(Broker broker, ConsumerHandler consumerHandlerIndexData) throws IOException, InterruptedException{
		MqAdmin mqAdmin2 = new MqAdmin(broker, indexDataMq);
		mqAdmin2.removeMQ();
		Consumer consumerIndexData = new Consumer(broker, indexDataMq);
		consumerIndexData.onMessage(consumerHandlerIndexData);
				
		return consumerIndexData;
	}
	
	/**
	 * @return
	 * @throws IOException
	 * @throws InterruptedException
	 */
	@Bean
	public Producer zbusProducer(Broker broker) throws IOException, InterruptedException {
		List<Integer> powerRecords = powerWrapper.getRecordIds();
		List<Integer> consumptionRecords = consumptionWrapper.getRecordIds();
		List<Integer> transformerRecords = transformerWrapper.getRecordIds();
		List<Integer> stationParamRecords = stationParamWrapper.getRecordIds();
		
		Producer producer = new Producer(broker, subMq);
		producer.createMQ();
		
		subscribeData(producer, powerRecords, powerPeriod);
		subscribeData(producer, consumptionRecords, consumptionPeriod);
		subscribeData(producer, transformerRecords, powerPeriod);
		
		producer.setMq(subIndexDataMq);
		producer.createMQ();
		subscribeData(producer, stationParamRecords, powerPeriod);
		
		return producer;
	}
	
	/**
	 * @param producer
	 * @param records
	 * @param period 
	 * @throws IOException
	 * @throws InterruptedException
	 */
	private void subscribeData(Producer producer, List<Integer> records, int period) 
			throws IOException, InterruptedException {
		for(Integer recordid : records) {
			long periodMillis = period * 60000;
			String msgbody = String.format("sub:%d,101,%d,7,%d", recordid, recordid, periodMillis);
			Message message = new Message();
			message.setBody(msgbody);
			message = producer.sendSync(message);
		}
	}

	@Bean
	public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() {
		return new PropertySourcesPlaceholderConfigurer();
	}

}

2、WebSocketStompConfig.java, websocket的配置

package com.accenture.icc.zbus.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;

/**
 * STOMP 配置
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig extends AbstractWebSocketMessageBrokerConfigurer{
	@Override
	public void registerStompEndpoints(StompEndpointRegistry registry) {
		registry.addEndpoint("/message").setAllowedOrigins("*").withSockJS();
	}

	@Override
	public void configureMessageBroker(MessageBrokerRegistry registry) {
		registry.enableSimpleBroker("/topic");
		registry.setApplicationDestinationPrefixes("/app");
	}

	@Bean
	public ServletServerContainerFactoryBean createWebSocketContainer() {
		ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
		container.setMaxTextMessageBufferSize(5242880); //5MB
		container.setMaxBinaryMessageBufferSize(5242880);
		return container;
	}
	
	@Override
	public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
		super.configureWebSocketTransport(registration);
		registration.setMessageSizeLimit(5242880);//5mb, default 64kb
		registration.setSendBufferSizeLimit(5242880);
	}
}


3、页面javascript调用代码如下:

<script type="text/javascript" src="<%=basePath%>resources/js/sockjs.min.js"></script>
<script type="text/javascript" src="<%=basePath%>resources/js/stomp.min.js"></script>
function initUnbalanceDataStomp(){
		var sockurl = "<%=basePath %>views/message";
    		var socket = new SockJS(sockurl);
    		var stompClient = Stomp.over(socket);
    		// callback function to be called when stomp client is connected to server
    		var connectCallback = function() {
   	         console.log('connected!');
 	             // 订阅analog数据
 	             stompClient.subscribe('/topic/analogs/'+'${sessionScope.station.rtuId}'+'/4', function(data){
 	                 var body = JSON.parse(data.body);
 	                 if(body.status==1000) { 	                 	
	                	console.log("do some thing");	 
                    	 }
 	             });
 	         }; 
 	         // callback function to be called when stomp client could not connect to server
 	         var errorCallback = function(error) {
 	        	 // display the error's message header:
  	                console.log(error);
 	         };
 	         stompClient.connect("guest", "guest", connectCallback, errorCallback);
		}



 类似资料: